Help protect the Great Barrier Reef with TensorFlow on Kaggle Join Challenge

文本生成联合学习

在 TensorFlow.org 上查看 在 Google Colab 中运行 在 GitHub 中查看源代码

:本 Colab 已通过验证,可与最新发布版本tensorflow_federated pip 软件包一起使用,但 Tensorflow Federated 项目仍处于预发布开发阶段,可能无法在 master 上运行。

本教程以“图像分类联合学习”教程中的概念为基础,演示了联合学习的其他几种实用方法。

特别是,我们加载了先前训练的 Keras 模型,并使用基于(模拟)分散数据集的联合训练对其进行优化。这一操作非常重要,原因有几点。能够使用序列化模型,便可方便地将联合学习与其他机器学习方法混合使用。此外,该操作可用的预训练模型范围也在不断扩大——例如,由于预训练模型现已广泛可用(请参见 TF Hub 等库),因此基本不需要从头开始训练语言模型。取而代之的有效方式是,从预训练模型开始,使用联合学习对其进行优化,以适应特定应用分散数据的特定特征。

在本教程中,我们将从能够生成 ASCII 字符的 RNN 开始,通过联合学习对其进行优化。我们还将展示如何将最终权重反馈给原始 Keras 模型,从而简化使用标准工具进行评估和生成文本的工作。

pip install --quiet --upgrade tensorflow_federated
import collections
import functools
import os
import time

import numpy as np
import tensorflow as tf
import tensorflow_federated as tff

np.random.seed(0)

# Test the TFF is working:
tff.federated_computation(lambda: 'Hello, World!')()
b'Hello, World!'

加载预训练模型

我们加载的模型根据 TensorFlow 教程“使用 RNN 通过 Eager Execution 生成文本”进行了预训练。但是,我们没有使用《莎士比亚全集》作为数据集,而是基于查尔斯·狄更斯的《双城记》和《圣诞颂歌》中的文本对模型进行预训练。

除扩大了词汇量,我们并没有修改原始教程,所以这一初始模型不是最先进的模型,但它可以产生合理的预测,足以满足我们的教学目的。最终模型使用 tf.keras.models.save_model(include_optimizer=False) 保存。

在本教程中,我们将使用 TFF 提供的联合版本数据,通过联合学习针对莎士比亚作品微调此模型。

生成词汇查找表

# A fixed vocabularly of ASCII chars that occur in the works of Shakespeare and Dickens:
vocab = list('dhlptx@DHLPTX $(,048cgkoswCGKOSW[_#\'/37;?bfjnrvzBFJNRVZ"&*.26:\naeimquyAEIMQUY]!%)-159\r')

# Creating a mapping from unique characters to indices
char2idx = {u:i for i, u in enumerate(vocab)}
idx2char = np.array(vocab)

加载预训练模型并生成一些文本

def load_model(batch_size):
  urls = {
      1: 'https://storage.googleapis.com/tff-models-public/dickens_rnn.batch1.kerasmodel',
      8: 'https://storage.googleapis.com/tff-models-public/dickens_rnn.batch8.kerasmodel'}
  assert batch_size in urls, 'batch_size must be in ' + str(urls.keys())
  url = urls[batch_size]
  local_file = tf.keras.utils.get_file(os.path.basename(url), origin=url)  
  return tf.keras.models.load_model(local_file, compile=False)
def generate_text(model, start_string):
  # From https://tensorflow.google.cn/tutorials/sequences/text_generation
  num_generate = 200
  input_eval = [char2idx[s] for s in start_string]
  input_eval = tf.expand_dims(input_eval, 0)
  text_generated = []
  temperature = 1.0

  model.reset_states()
  for i in range(num_generate):
    predictions = model(input_eval)
    predictions = tf.squeeze(predictions, 0)
    predictions = predictions / temperature
    predicted_id = tf.random.categorical(
        predictions, num_samples=1)[-1, 0].numpy()
    input_eval = tf.expand_dims([predicted_id], 0)
    text_generated.append(idx2char[predicted_id])

  return (start_string + ''.join(text_generated))
# Text generation requires a batch_size=1 model.
keras_model_batch1 = load_model(batch_size=1)
print(generate_text(keras_model_batch1, 'What of TensorFlow Federated, you ask? '))
Downloading data from https://storage.googleapis.com/tff-models-public/dickens_rnn.batch1.kerasmodel
16195584/16193984 [==============================] - 0s 0us/step
16203776/16193984 [==============================] - 0s 0us/step
What of TensorFlow Federated, you ask? Sall
yesterday. Received the Bailey."

"Mr. Lorry, grimmering himself, or low varked thends the winter, and the eyes of Monsieur
Defarge. "Let his mind, hon in his
life and message; four declare

加载并预处理联合莎士比亚数据

tff.simulation.datasets 软件包提供了各种数据集,这些数据集被拆分成“客户端”,其中每个客户端对应于可能参与联合学习的特定设备上的数据集。

这些数据集提供了真实的非独立同分布数据,可在模拟过程中复制基于真实分散数据进行训练的挑战。这些数据的部分预处理是使用 Leaf 项目 (GitHub) 中的工具完成的。

train_data, test_data = tff.simulation.datasets.shakespeare.load_data()

shakespeare.load_data() 提供的数据集由一系列字符串 Tensors 构成,一个字符串代表莎士比亚戏剧中特定角色的一句台词。客户端键由戏剧名和参演角色名构成,例如 MUCH_ADO_ABOUT_NOTHING_OTHELLO 即对应于 Othello(奥赛罗)角色在戏剧 Much Ado About Nothing(《无事生非》)中的台词。请注意,在真实的联合学习场景中,并不会通过 ID 来标识或跟踪客户端,但对于模拟而言,使用键控数据集则非常实用。

例如,我们可以查看 King Lear(《李尔王》)的如下数据:

# Here the play is "The Tragedy of King Lear" and the character is "King".
raw_example_dataset = train_data.create_tf_dataset_for_client(
    'THE_TRAGEDY_OF_KING_LEAR_KING')
# To allow for future extensions, each entry x
# is an OrderedDict with a single key 'snippets' which contains the text.
for x in raw_example_dataset.take(2):
  print(x['snippets'])
tf.Tensor(b'', shape=(), dtype=string)
tf.Tensor(b'What?', shape=(), dtype=string)

现在,我们使用 tf.data.Dataset 转换来准备此数据,用于训练上面加载的字符 RNN。

# Input pre-processing parameters
SEQ_LENGTH = 100
BATCH_SIZE = 8
BUFFER_SIZE = 100  # For dataset shuffling
# Construct a lookup table to map string chars to indexes,
# using the vocab loaded above:
table = tf.lookup.StaticHashTable(
    tf.lookup.KeyValueTensorInitializer(
        keys=vocab, values=tf.constant(list(range(len(vocab))),
                                       dtype=tf.int64)),
    default_value=0)


def to_ids(x):
  s = tf.reshape(x['snippets'], shape=[1])
  chars = tf.strings.bytes_split(s).values
  ids = table.lookup(chars)
  return ids


def split_input_target(chunk):
  input_text = tf.map_fn(lambda x: x[:-1], chunk)
  target_text = tf.map_fn(lambda x: x[1:], chunk)
  return (input_text, target_text)


def preprocess(dataset):
  return (
      # Map ASCII chars to int64 indexes using the vocab
      dataset.map(to_ids)
      # Split into individual chars
      .unbatch()
      # Form example sequences of SEQ_LENGTH +1
      .batch(SEQ_LENGTH + 1, drop_remainder=True)
      # Shuffle and form minibatches
      .shuffle(BUFFER_SIZE).batch(BATCH_SIZE, drop_remainder=True)
      # And finally split into (input, target) tuples,
      # each of length SEQ_LENGTH.
      .map(split_input_target))

请注意,在形成原始序列和形成上述批次时,为简单起见,我们使用 drop_remainder=True。这意味着文本字符数低于 (SEQ_LENGTH + 1) * BATCH_SIZE 的任何角色(客户端)的数据集都将为空。解决此问题的典型方法是使用特殊词例填充批次,然后遮盖损失以忽略填充词例。

这会使样本变得有些复杂,所以我们在本教程中仅使用标准教程所介绍的完整批次。但在联合环境中,此问题将更为严重,因为可能会有许多用户使用较小的数据集。

现在,我们可以预处理我们的 raw_example_dataset,并检查类型:

example_dataset = preprocess(raw_example_dataset)
print(example_dataset.element_spec)
(TensorSpec(shape=(8, 100), dtype=tf.int64, name=None), TensorSpec(shape=(8, 100), dtype=tf.int64, name=None))

编译模型并基于预处理的数据进行测试

我们加载了未编译的 Keras 模型,但为了运行 keras_model.evaluate,我们需要使用损失和指标对其进行编译。我们还将在一个优化器中编译,该优化器将用作联合学习的设备端优化器。

原始教程并不具备字符级准确率(在预测中,最高概率能够落于正确的下一字符的部分)。这是一项有用的指标,因此我们添加了该指标。但是,我们需要为此定义一个新的指标类,因为我们的预测的秩为 3(每个 BATCH_SIZE * SEQ_LENGTH 预测的 logits 的向量),而 SparseCategoricalAccuracy 仅期望秩为 2 的预测。

class FlattenedCategoricalAccuracy(tf.keras.metrics.SparseCategoricalAccuracy):

  def __init__(self, name='accuracy', dtype=tf.float32):
    super().__init__(name, dtype=dtype)

  def update_state(self, y_true, y_pred, sample_weight=None):
    y_true = tf.reshape(y_true, [-1, 1])
    y_pred = tf.reshape(y_pred, [-1, len(vocab), 1])
    return super().update_state(y_true, y_pred, sample_weight)

现在,我们可以编译模型,并基于我们的 example_dataset 对其进行评估。

BATCH_SIZE = 8  # The training and eval batch size for the rest of this tutorial.
keras_model = load_model(batch_size=BATCH_SIZE)
keras_model.compile(
    loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
    metrics=[FlattenedCategoricalAccuracy()])

# Confirm that loss is much lower on Shakespeare than on random data
loss, accuracy = keras_model.evaluate(example_dataset.take(5), verbose=0)
print(
    'Evaluating on an example Shakespeare character: {a:3f}'.format(a=accuracy))

# As a sanity check, we can construct some completely random data, where we expect
# the accuracy to be essentially random:
random_guessed_accuracy = 1.0 / len(vocab)
print('Expected accuracy for random guessing: {a:.3f}'.format(
    a=random_guessed_accuracy))
random_indexes = np.random.randint(
    low=0, high=len(vocab), size=1 * BATCH_SIZE * (SEQ_LENGTH + 1))
data = collections.OrderedDict(
    snippets=tf.constant(
        ''.join(np.array(vocab)[random_indexes]), shape=[1, 1]))
random_dataset = preprocess(tf.data.Dataset.from_tensor_slices(data))
loss, accuracy = keras_model.evaluate(random_dataset, steps=10, verbose=0)
print('Evaluating on completely random data: {a:.3f}'.format(a=accuracy))
Downloading data from https://storage.googleapis.com/tff-models-public/dickens_rnn.batch8.kerasmodel
16195584/16193984 [==============================] - 0s 0us/step
16203776/16193984 [==============================] - 0s 0us/step
Evaluating on an example Shakespeare character: 0.402000
Expected accuracy for random guessing: 0.012
Evaluating on completely random data: 0.011

通过联合学习微调模型

TFF 会序列化所有 TensorFlow 计算,因此它们有可能会在非 Python 环境中运行(虽然目前只有由 Python 实现的模拟运行时可用)。即使我们以 Eager 模式 (TF 2.0) 运行,当前 TFF 也能够通过在“with tf.Graph.as_default()”语句的上下文中构造必要的运算来序列化 TensorFlow 计算。因此,我们需要提供一个函数,供 TFF 将模型引入其控制的计算图中。我们的做法如下:

# Clone the keras_model inside `create_tff_model()`, which TFF will
# call to produce a new copy of the model inside the graph that it will 
# serialize. Note: we want to construct all the necessary objects we'll need 
# _inside_ this method.
def create_tff_model():
  # TFF uses an `input_spec` so it knows the types and shapes
  # that your model expects.
  input_spec = example_dataset.element_spec
  keras_model_clone = tf.keras.models.clone_model(keras_model)
  return tff.learning.from_keras_model(
      keras_model_clone,
      input_spec=input_spec,
      loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
      metrics=[FlattenedCategoricalAccuracy()])

现在,我们准备构造一个联合平均迭代过程,用于改进模型(有关联合平均算法的详细信息,请参阅论文《Communication-Efficient Learning of Deep Networks from Decentralized Data》)。

在每轮联合训练之后,我们使用编译的 Keras 模型执行标准(非联合)评估。在进行模拟联合学习且存在标准测试数据集时,这种操作对于研究目的而言非常实用。

在实际的生产环境中,可以使用相同的技术通过联合学习来训练模型,并基于集中式基准数据集对模型进行评估,供测试或质量保证之用。

# This command builds all the TensorFlow graphs and serializes them: 
fed_avg = tff.learning.build_federated_averaging_process(
    model_fn=create_tff_model,
    client_optimizer_fn=lambda: tf.keras.optimizers.SGD(lr=0.5))

以下为最简单的循环,在此循环中,我们在单个批次的单个客户端上运行一轮联合平均:

state = fed_avg.initialize()
state, metrics = fed_avg.next(state, [example_dataset.take(5)])
print('loss={l:.3f}, accuracy={a:.3f}'.format(
    l=metrics.train.loss, a=metrics.train.accuracy))
loss=4.403, accuracy=0.132

现在,让我们编写一个更为有趣的训练和评估循环。

为了使此模拟仍能相对较快地运行,我们每轮训练三个相同的客户端,每个客户端仅考虑两个迷你批次。

def data(client, source=train_data):
  return preprocess(source.create_tf_dataset_for_client(client)).take(5)


clients = [
    'ALL_S_WELL_THAT_ENDS_WELL_CELIA', 'MUCH_ADO_ABOUT_NOTHING_OTHELLO',
]

train_datasets = [data(client) for client in clients]

# We concatenate the test datasets for evaluation with Keras by creating a 
# Dataset of Datasets, and then identity flat mapping across all the examples.
test_dataset = tf.data.Dataset.from_tensor_slices(
    [data(client, test_data) for client in clients]).flat_map(lambda x: x)

fed_avg.initialize() 生成的模型的初始状态基于 Keras 模型的随机初始值设定项,而非加载的权重,因为 clone_model() 不会克隆权重。要从预训练模型开始训练,我们直接使用加载的模型在服务器状态下设置模型权重。

NUM_ROUNDS = 5

# The state of the FL server, containing the model and optimization state.
state = fed_avg.initialize()

state = tff.learning.state_with_new_model_weights(
    state,
    trainable_weights=[v.numpy() for v in keras_model.trainable_weights],
    non_trainable_weights=[
        v.numpy() for v in keras_model.non_trainable_weights
    ])


def keras_evaluate(state, round_num):
  # Take our global model weights and push them back into a Keras model to
  # use its standard `.evaluate()` method.
  keras_model = load_model(batch_size=BATCH_SIZE)
  keras_model.compile(
      loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
    metrics=[FlattenedCategoricalAccuracy()])
  tff.learning.assign_weights_to_keras_model(keras_model, state.model)
  loss, accuracy = keras_model.evaluate(example_dataset, steps=2, verbose=0)
  print('\tEval: loss={l:.3f}, accuracy={a:.3f}'.format(l=loss, a=accuracy))

for round_num in range(NUM_ROUNDS):
  print('Round {r}'.format(r=round_num))
  keras_evaluate(state, round_num)
  state, metrics = fed_avg.next(state, train_datasets)
  print('\tTrain: loss={l:.3f}, accuracy={a:.3f}'.format(
      l=metrics.train.loss, a=metrics.train.accuracy))

keras_evaluate(state, NUM_ROUNDS + 1)
Round 0
    Eval: loss=3.324, accuracy=0.401
    Train: loss=4.360, accuracy=0.155
Round 1
    Eval: loss=4.361, accuracy=0.049
    Train: loss=4.235, accuracy=0.164
Round 2
    Eval: loss=4.219, accuracy=0.177
    Train: loss=4.081, accuracy=0.221
Round 3
    Eval: loss=4.080, accuracy=0.174
    Train: loss=3.940, accuracy=0.226
Round 4
    Eval: loss=3.991, accuracy=0.176
    Train: loss=3.840, accuracy=0.226
Final evaluation
    Eval: loss=3.909, accuracy=0.171

我们仅做默认更改,没有进行足够的训练来实现大幅调整,但是如果您使用更大量的莎士比亚数据进行更长时间的训练,那么您应该会看到更新后的模型所生成的文本风格会有所不同:

# Set our newly trained weights back in the originally created model.
keras_model_batch1.set_weights([v.numpy() for v in keras_model.weights])
# Text generation requires batch_size=1
print(generate_text(keras_model_batch1, 'What of TensorFlow Federated, you ask? '))
What of TensorFlow Federated, you ask? Shalways, I will call your
compet with any city brought their faces uncompany," besumed him. "When he
sticked Madame Defarge pushed the lamps.

"Have I often but no unison. She had probably come,

建议的扩展学习

本教程只是第一步!以下是有关如何扩展此笔记本的一些想法:

  • 编写一个更为真实的训练循环,对客户端进行抽样以实现随机训练。
  • 基于客户端数据集使用“.repeat(NUM_EPOCHS)”尝试多个周期的本地训练(例如,McMahan 等人所述方法)。另请参阅图像分类联合学习,其中提供了相关内容。
  • 更改 compile() 命令以在客户端上尝试使用不同的优化算法。
  • 尝试针对 build_federated_averaging_process 使用 server_optimizer 参数以尝试在服务器上应用模型更新的不同算法。
  • 尝试针对 build_federated_averaging_process 使用 client_weight_fn 参数数以尝试不同的客户端权重。默认权重客户端会根据客户端上的样本量进行更新,但是您可以执行以下操作:client_weight_fn=lambda _: tf.constant(1.0)