此页面由 Cloud Translation API 翻译。
Switch to English

联合学习以生成文本

在TensorFlow.org上查看 在GitHub上查看源代码

本教程以“图像分类联合学习”教程中的概念为基础 ,并演示了联合学习的其他几种有用方法。

特别是,我们加载先前训练的Keras模型,并在(模拟的)分散数据集中使用联合训练对其进行优化。这实际上很重要,原因有几个。使用序列化模型的能力使将联合学习与其他ML方法轻松结合起来。此外,这允许使用越来越多的预训练模型-例如,由于现在广泛使用了许多预训练模型,因此从零开始训练语言模型几乎是没有必要的(例如,参见TF Hub )。取而代之的是,从预训练的模型开始,并使用联合学习对其进行优化,以适应特定应用程序的分散数据的特定特征,这才有意义。

对于本教程,我们从生成ASCII字符的RNN开始,然后通过联合学习对其进行优化。我们还将展示如何将最终权重反馈给原始Keras模型,从而允许使用标准工具轻松评估和生成文本。

 
!pip install --quiet --upgrade tensorflow_federated_nightly
!pip install --quiet --upgrade nest_asyncio

import nest_asyncio
nest_asyncio.apply()
 
 import collections
import functools
import os
import time

import numpy as np
import tensorflow as tf
import tensorflow_federated as tff

np.random.seed(0)

# Test the TFF is working:
tff.federated_computation(lambda: 'Hello, World!')()
 
b'Hello, World!'

加载预训练的模型

我们加载了一个TensorFlow教程文本训练后经过预训练的模型,该模型使用具有急切执行力的RNN进行文本生成 。但是,我们没有对《莎士比亚全集》进行培训,而是根据查尔斯·狄更斯的《 两个城市的故事》《圣诞节颂歌 》对文本进行了预训练。

除了进行语音扩展之外,我们没有修改原始教程,因此该初始模型不是最新的模型,但是它可以产生合理的预测,并且足以满足我们的教程目的。最终模型使用tf.keras.models.save_model(include_optimizer=False)保存。

在本教程中,我们将使用联邦学习来使用TFF提供的数据的联合版本为莎士比亚微调此模型。

生成vocab查找表

 # A fixed vocabularly of ASCII chars that occur in the works of Shakespeare and Dickens:
vocab = list('dhlptx@DHLPTX $(,048cgkoswCGKOSW[_#\'/37;?bfjnrvzBFJNRVZ"&*.26:\naeimquyAEIMQUY]!%)-159\r')

# Creating a mapping from unique characters to indices
char2idx = {u:i for i, u in enumerate(vocab)}
idx2char = np.array(vocab)
 

加载预训练的模型并生成一些文本

 def load_model(batch_size):
  urls = {
      1: 'https://storage.googleapis.com/tff-models-public/dickens_rnn.batch1.kerasmodel',
      8: 'https://storage.googleapis.com/tff-models-public/dickens_rnn.batch8.kerasmodel'}
  assert batch_size in urls, 'batch_size must be in ' + str(urls.keys())
  url = urls[batch_size]
  local_file = tf.keras.utils.get_file(os.path.basename(url), origin=url)  
  return tf.keras.models.load_model(local_file, compile=False)
 
 def generate_text(model, start_string):
  # From https://www.tensorflow.org/tutorials/sequences/text_generation
  num_generate = 200
  input_eval = [char2idx[s] for s in start_string]
  input_eval = tf.expand_dims(input_eval, 0)
  text_generated = []
  temperature = 1.0

  model.reset_states()
  for i in range(num_generate):
    predictions = model(input_eval)
    predictions = tf.squeeze(predictions, 0)
    predictions = predictions / temperature
    predicted_id = tf.random.categorical(
        predictions, num_samples=1)[-1, 0].numpy()
    input_eval = tf.expand_dims([predicted_id], 0)
    text_generated.append(idx2char[predicted_id])

  return (start_string + ''.join(text_generated))
 
 # Text generation requires a batch_size=1 model.
keras_model_batch1 = load_model(batch_size=1)
print(generate_text(keras_model_batch1, 'What of TensorFlow Federated, you ask? '))
 
Downloading data from https://storage.googleapis.com/tff-models-public/dickens_rnn.batch1.kerasmodel
16195584/16193984 [==============================] - 0s 0us/step
16203776/16193984 [==============================] - 0s 0us/step
What of TensorFlow Federated, you ask? Sall
yesterday. Received the Bailey."

"Mr. Lorry, grimmering himself, or low varked thends the winter, and the eyes of Monsieur
Defarge. "Let his mind, hon in his
life and message; four declare 

加载和预处理联合莎士比亚数据

tff.simulation.datasets包提供了各种数据集,这些数据集被划分为“客户端”,其中每个客户端对应于特定设备上可能参与联合学习的数据集。

这些数据集提供了逼真的非IID数据分布,这些分布在模拟中复制了对真实分散数据进行训练的挑战。使用Leaf项目github )中的工具对该数据进行了一些预处理。

 train_data, test_data = tff.simulation.datasets.shakespeare.load_data()
 

shakespeare.load_data()提供的数据集由一系列字符串Tensors组成,每个序列用于莎士比亚戏剧中特定角色说的每一行。客户端密钥由与角色的名字加入了游戏的名称,因此,例如MUCH_ADO_ABOUT_NOTHING_OTHELLO对应于在剧中无事生非的人物奥赛罗线。请注意,在真实的联合学习场景中,永远不会通过id来标识或跟踪客户端,但是对于模拟而言,使用键控数据集很有用。

例如,在这里,我们可以查看李尔王的一些数据:

 # Here the play is "The Tragedy of King Lear" and the character is "King".
raw_example_dataset = train_data.create_tf_dataset_for_client(
    'THE_TRAGEDY_OF_KING_LEAR_KING')
# To allow for future extensions, each entry x
# is an OrderedDict with a single key 'snippets' which contains the text.
for x in raw_example_dataset.take(2):
  print(x['snippets'])
 
tf.Tensor(b'', shape=(), dtype=string)
tf.Tensor(b'What?', shape=(), dtype=string)

现在,我们使用tf.data.Dataset转换准备此数据,以训练上面加载的char RNN。

 # Input pre-processing parameters
SEQ_LENGTH = 100
BATCH_SIZE = 8
BUFFER_SIZE = 100  # For dataset shuffling
 
 # Construct a lookup table to map string chars to indexes,
# using the vocab loaded above:
table = tf.lookup.StaticHashTable(
    tf.lookup.KeyValueTensorInitializer(
        keys=vocab, values=tf.constant(list(range(len(vocab))),
                                       dtype=tf.int64)),
    default_value=0)


def to_ids(x):
  s = tf.reshape(x['snippets'], shape=[1])
  chars = tf.strings.bytes_split(s).values
  ids = table.lookup(chars)
  return ids


def split_input_target(chunk):
  input_text = tf.map_fn(lambda x: x[:-1], chunk)
  target_text = tf.map_fn(lambda x: x[1:], chunk)
  return (input_text, target_text)


def preprocess(dataset):
  return (
      # Map ASCII chars to int64 indexes using the vocab
      dataset.map(to_ids)
      # Split into individual chars
      .unbatch()
      # Form example sequences of SEQ_LENGTH +1
      .batch(SEQ_LENGTH + 1, drop_remainder=True)
      # Shuffle and form minibatches
      .shuffle(BUFFER_SIZE).batch(BATCH_SIZE, drop_remainder=True)
      # And finally split into (input, target) tuples,
      # each of length SEQ_LENGTH.
      .map(split_input_target))
 

注意,在原始序列的形成和上述批次的形成中,为简单起见,我们使用drop_remainder=True 。这意味着任何至少不具有(SEQ_LENGTH + 1) * BATCH_SIZE字符的字符(客户端)将具有空数据集。解决此问题的典型方法是用特殊令牌填充批次,然后掩盖损失以不考虑填充令牌。

这会使示例变得有些复杂,因此对于本教程,我们仅使用标准教程中的完整批处理。但是,在联合设置中,此问题更为严重,因为许多用户可能具有较小的数据集。

现在,我们可以预处理raw_example_dataset ,并检查类型:

 example_dataset = preprocess(raw_example_dataset)
print(example_dataset.element_spec)
 
(TensorSpec(shape=(8, 100), dtype=tf.int64, name=None), TensorSpec(shape=(8, 100), dtype=tf.int64, name=None))

编译模型并在预处理后的数据上进行测试

我们加载了未编译的keras模型,但是为了运行keras_model.evaluate ,我们需要使用损失和指标对其进行编译。我们还将编译一个优化器,该优化器将用作联合学习中的设备上优化器。

原始教程的字符级精度不高(预测中将最高概率置于正确的下一个字符上的比例)。这是一个有用的指标,因此我们添加了它。但是,我们需要为此定义一个新的度量标准类,因为我们的预测具有等级3(每个BATCH_SIZE * SEQ_LENGTH预测的对BATCH_SIZE * SEQ_LENGTH ),并且SparseCategoricalAccuracy仅期望等级2预测。

 class FlattenedCategoricalAccuracy(tf.keras.metrics.SparseCategoricalAccuracy):

  def __init__(self, name='accuracy', dtype=tf.float32):
    super().__init__(name, dtype=dtype)

  def update_state(self, y_true, y_pred, sample_weight=None):
    y_true = tf.reshape(y_true, [-1, 1])
    y_pred = tf.reshape(y_pred, [-1, len(vocab), 1])
    return super().update_state(y_true, y_pred, sample_weight)
 

现在,我们可以编译一个模型,并在example_dataset上对其进行评估。

 BATCH_SIZE = 8  # The training and eval batch size for the rest of this tutorial.
keras_model = load_model(batch_size=BATCH_SIZE)
keras_model.compile(
    loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
    metrics=[FlattenedCategoricalAccuracy()])

# Confirm that loss is much lower on Shakespeare than on random data
loss, accuracy = keras_model.evaluate(example_dataset.take(5), verbose=0)
print(
    'Evaluating on an example Shakespeare character: {a:3f}'.format(a=accuracy))

# As a sanity check, we can construct some completely random data, where we expect
# the accuracy to be essentially random:
random_guessed_accuracy = 1.0 / len(vocab)
print('Expected accuracy for random guessing: {a:.3f}'.format(
    a=random_guessed_accuracy))
random_indexes = np.random.randint(
    low=0, high=len(vocab), size=1 * BATCH_SIZE * (SEQ_LENGTH + 1))
data = collections.OrderedDict(
    snippets=tf.constant(
        ''.join(np.array(vocab)[random_indexes]), shape=[1, 1]))
random_dataset = preprocess(tf.data.Dataset.from_tensor_slices(data))
loss, accuracy = keras_model.evaluate(random_dataset, steps=10, verbose=0)
print('Evaluating on completely random data: {a:.3f}'.format(a=accuracy))
 
Downloading data from https://storage.googleapis.com/tff-models-public/dickens_rnn.batch8.kerasmodel
16195584/16193984 [==============================] - 0s 0us/step
16203776/16193984 [==============================] - 0s 0us/step
Evaluating on an example Shakespeare character: 0.402000
Expected accuracy for random guessing: 0.012
Evaluating on completely random data: 0.011

通过联合学习微调模型

TFF序列化了所有TensorFlow计算,因此它们有可能可以在非Python环境中运行(即使目前只有可用Python实现的仿真运行时可用)。即使我们以热切的模式(TF 2.0)运行,当前TFF还是通过在“ with tf.Graph.as_default() ”语句的上下文中构造必要的操作来序列化TensorFlow计算。因此,我们需要提供一个函数,TFF可以使用该函数将模型引入其控制的图形中。我们这样做如下:

 # Clone the keras_model inside `create_tff_model()`, which TFF will
# call to produce a new copy of the model inside the graph that it will 
# serialize. Note: we want to construct all the necessary objects we'll need 
# _inside_ this method.
def create_tff_model():
  # TFF uses an `input_spec` so it knows the types and shapes
  # that your model expects.
  input_spec = example_dataset.element_spec
  keras_model_clone = tf.keras.models.clone_model(keras_model)
  return tff.learning.from_keras_model(
      keras_model_clone,
      input_spec=input_spec,
      loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
      metrics=[FlattenedCategoricalAccuracy()])
 

现在,我们准备构造一个联邦平均迭代过程,该过程将用于改进模型(有关联邦平均算法的详细信息,请参见论文《 从分散数据进行深度网络的通信高效学习》 )。

在每轮联合培训之后,我们使用编译的Keras模型执行标准(非联合)评估。在进行模拟的联合学习时,这对于研究目的很有用,并且有一个标准的测试数据集。

在实际的生产环境中,可以使用相同的技术来获取经过联邦学习训练的模型,并在集中的基准数据集上对其进行评估,以进行测试或质量保证。

 # This command builds all the TensorFlow graphs and serializes them: 
fed_avg = tff.learning.build_federated_averaging_process(
    model_fn=create_tff_model,
    client_optimizer_fn=lambda: tf.keras.optimizers.SGD(lr=0.5))
 

这是最简单的循环,在此循环中,我们在单个批次的单个客户端上运行联合平均一轮:

 state = fed_avg.initialize()
state, metrics = fed_avg.next(state, [example_dataset.take(5)])
train_metrics = metrics['train']
print('loss={l:.3f}, accuracy={a:.3f}'.format(
    l=train_metrics['loss'], a=train_metrics['accuracy']))
 
loss=4.403, accuracy=0.132

现在,让我们编写一个更有趣的培训和评估循环。

为了使此模拟仍然相对较快地运行,我们每轮训练相同的三个客户,每个客户仅考虑两个迷你批次。

 def data(client, source=train_data):
  return preprocess(source.create_tf_dataset_for_client(client)).take(5)


clients = [
    'ALL_S_WELL_THAT_ENDS_WELL_CELIA', 'MUCH_ADO_ABOUT_NOTHING_OTHELLO',
]

train_datasets = [data(client) for client in clients]

# We concatenate the test datasets for evaluation with Keras by creating a 
# Dataset of Datasets, and then identity flat mapping across all the examples.
test_dataset = tf.data.Dataset.from_tensor_slices(
    [data(client, test_data) for client in clients]).flat_map(lambda x: x)
 

fed_avg.initialize()生成的模型的初始状态基于fed_avg.initialize()模型的随机初始值设定项,而不是基于已加载的权重,因为clone_model()不会克隆权重。要从预先训练的模型开始训练,我们直接从加载的模型在服务器状态下设置模型权重。

 NUM_ROUNDS = 5

# The state of the FL server, containing the model and optimization state.
state = fed_avg.initialize()

# Load our pre-trained Keras model weights into the global model state.
state = tff.learning.state_with_new_model_weights(
    state,
    trainable_weights=[v.numpy() for v in keras_model.trainable_weights],
    non_trainable_weights=[
        v.numpy() for v in keras_model.non_trainable_weights
    ])


def keras_evaluate(state, round_num):
  # Take our global model weights and push them back into a Keras model to
  # use its standard `.evaluate()` method.
  keras_model = load_model(batch_size=BATCH_SIZE)
  keras_model.compile(
      loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
      metrics=[FlattenedCategoricalAccuracy()])
  tff.learning.assign_weights_to_keras_model(keras_model, state.model)
  loss, accuracy = keras_model.evaluate(example_dataset, steps=2, verbose=0)
  print('\tEval: loss={l:.3f}, accuracy={a:.3f}'.format(l=loss, a=accuracy))


for round_num in range(NUM_ROUNDS):
  print('Round {r}'.format(r=round_num))
  keras_evaluate(state, round_num)
  state, metrics = fed_avg.next(state, train_datasets)
  train_metrics = metrics['train']
  print('\tTrain: loss={l:.3f}, accuracy={a:.3f}'.format(
      l=train_metrics['loss'], a=train_metrics['accuracy']))

print('Final evaluation')
keras_evaluate(state, NUM_ROUNDS + 1)
 
Round 0
    Eval: loss=3.324, accuracy=0.401
    Train: loss=4.360, accuracy=0.155
Round 1
    Eval: loss=4.361, accuracy=0.049
    Train: loss=4.235, accuracy=0.164
Round 2
    Eval: loss=4.219, accuracy=0.177
    Train: loss=4.081, accuracy=0.221
Round 3
    Eval: loss=4.080, accuracy=0.174
    Train: loss=3.940, accuracy=0.226
Round 4
    Eval: loss=3.991, accuracy=0.176
    Train: loss=3.840, accuracy=0.226
Final evaluation
    Eval: loss=3.909, accuracy=0.171

在使用默认更改的情况下,我们还没有进行足够的培训来起到很大的作用,但是如果您对更多莎士比亚数据进行更长的培训,您应该会看到使用更新后的模型生成的文本样式有所不同:

 # Set our newly trained weights back in the originally created model.
keras_model_batch1.set_weights([v.numpy() for v in keras_model.weights])
# Text generation requires batch_size=1
print(generate_text(keras_model_batch1, 'What of TensorFlow Federated, you ask? '))
 
What of TensorFlow Federated, you ask? Shalways, I will call your
compet with any city brought their faces uncompany," besumed him. "When he
sticked Madame Defarge pushed the lamps.

"Have I often but no unison. She had probably come, 

建议的扩展名

本教程只是第一步!以下是一些有关如何扩展笔记本的想法:

  • 编写一个更切合实际的培训循环,在其中对客户进行抽样以随机进行培训。
  • 在客户端数据集上使用“ .repeat(NUM_EPOCHS) ”来尝试进行多个局部训练(例如,如McMahan等人所述 )。另请参阅联合学习图像分类
  • 更改compile()命令以在客户端上使用不同的优化算法进行试验。
  • 尝试使用build_federated_averaging_processserver_optimizer参数来尝试不同的算法,以在服务器上应用模型更新。
  • 尝试client_weight_fn参数来build_federated_averaging_process尝试不同客户的权重。客户端的默认权重会根据客户端上的示例数进行更新,但是您可以执行例如client_weight_fn=lambda _: tf.constant(1.0)