# 教程大纲

1. 加载 TFF 库。
2. 探索/预处理联合 EMNIST 数据集。
3. 创建一个模型。
4. 为训练建立联合平均过程。
5. 分析训练指标。
6. 设置联合评估计算。
7. 分析评估指标。

## 准备输入数据

``````# Code for loading federated data from TFF repository
``````

`load_data()` 返回的数据集是 `tff.simulation.datasets.ClientData` 的实例，后者是一个允许您枚举用户集、构造表示特定用户的数据的 `tf.data.Dataset` 以及查询各个元素的结构的接口。

``````len(emnist_train.client_ids)
``````
``````# Let's look at the shape of our data
example_dataset = emnist_train.create_tf_dataset_for_client(
emnist_train.client_ids[0])

example_dataset.element_spec
``````
``````# Let's select an example dataset from one of our simulated clients
example_dataset = emnist_train.create_tf_dataset_for_client(
emnist_train.client_ids[0])

# Your code to get an example element from one client:
example_element = next(iter(example_dataset))

example_element['label'].numpy()
``````
``````plt.imshow(example_element['pixels'].numpy(), cmap='gray', aspect='equal')
plt.grid(False)
_ = plt.show()
``````

``````## Example MNIST digits for one client
f = plt.figure(figsize=(20,4))
j = 0

for e in example_dataset.take(40):
plt.subplot(4, 10, j+1)
plt.imshow(e['pixels'].numpy(), cmap='gray', aspect='equal')
plt.axis('off')
j += 1
``````
``````# Number of examples per layer for a sample of clients
f = plt.figure(figsize=(12,7))
f.suptitle("Label Counts for a Sample of Clients")
for i in range(6):
ds = emnist_train.create_tf_dataset_for_client(emnist_train.client_ids[i])
k = collections.defaultdict(list)
for e in ds:
k[e['label'].numpy()].append(e['label'].numpy())
plt.subplot(2, 3, i+1)
plt.title("Client {}".format(i))
for j in range(10):
plt.hist(k[j], density=False, bins=[0,1,2,3,4,5,6,7,8,9,10])
``````
``````# Let's play around with the emnist_train dataset.
# Let's explore the non-iid charateristic of the example data.

for i in range(5):
ds = emnist_train.create_tf_dataset_for_client(emnist_train.client_ids[i])
k = collections.defaultdict(list)
for e in ds:
k[e['label'].numpy()].append(e['pixels'].numpy())
f = plt.figure(i, figsize=(12,5))
f.suptitle("Client #{}'s Mean Image Per Label".format(i))
for j in range(10):
mn_img = np.mean(k[j],0)
plt.subplot(2, 5, j+1)
plt.imshow(mn_img.reshape((28,28)))#,cmap='gray')
plt.axis('off')

# Each client has different mean images -- each client will be nudging the model
# in their own directions.
``````

### 预处理数据

``````NUM_CLIENTS = 10
NUM_EPOCHS = 5
BATCH_SIZE = 20
SHUFFLE_BUFFER = 100
PREFETCH_BUFFER=10

def preprocess(dataset):

def batch_format_fn(element):
"""Flatten a batch `pixels` and return the features as an `OrderedDict`."""
return collections.OrderedDict(
x=tf.reshape(element['pixels'], [-1, 784]),
y=tf.reshape(element['label'], [-1, 1]))

return dataset.repeat(NUM_EPOCHS).shuffle(SHUFFLE_BUFFER).batch(
BATCH_SIZE).map(batch_format_fn).prefetch(PREFETCH_BUFFER)
``````

``````preprocessed_example_dataset = preprocess(example_dataset)

sample_batch = tf.nest.map_structure(lambda x: x.numpy(),
next(iter(preprocessed_example_dataset)))

sample_batch
``````

``````def make_federated_data(client_data, client_ids):
return [
preprocess(client_data.create_tf_dataset_for_client(x))
for x in client_ids
]
``````

``````sample_clients = emnist_train.client_ids[0:NUM_CLIENTS]

# Your code to get the federated dataset here for the sampled clients:
federated_train_data = make_federated_data(emnist_train, sample_clients)

print('Number of client datasets: {l}'.format(l=len(federated_train_data)))
print('First dataset: {d}'.format(d=federated_train_data[0]))
``````

## 使用 Keras 创建模型

``````def create_keras_model():
return tf.keras.models.Sequential([
tf.keras.layers.InputLayer(input_shape=(784,)),
tf.keras.layers.Dense(10, kernel_initializer='zeros'),
tf.keras.layers.Softmax(),
])
``````

``````## Centralized training with keras ---------------------------------------------

# This is separate from the TFF tutorial, and demonstrates how to train a
# Keras model in a centralized fashion (contrasting training in a federated env)
(x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data()

# Preprocess the data (these are NumPy arrays)
x_train = x_train.reshape(60000, 784).astype("float32") / 255

y_train = y_train.astype("float32")

mod = create_keras_model()
mod.compile(
optimizer=tf.keras.optimizers.RMSprop(),
loss=tf.keras.losses.SparseCategoricalCrossentropy(),
metrics=[tf.keras.metrics.SparseCategoricalAccuracy()]
)
h = mod.fit(
x_train,
y_train,
batch_size=64,
epochs=2
)

# ------------------------------------------------------------------------------
``````

``````def model_fn():
# We _must_ create a new model here, and _not_ capture it from an external
# scope. TFF will call this within different graph contexts.
keras_model = create_keras_model()
return tff.learning.from_keras_model(
keras_model,
input_spec=preprocessed_example_dataset.element_spec,
loss=tf.keras.losses.SparseCategoricalCrossentropy(),
metrics=[tf.keras.metrics.SparseCategoricalAccuracy()])
``````

## 基于联合数据训练模型

``````iterative_process = tff.learning.build_federated_averaging_process(
model_fn,
client_optimizer_fn=lambda: tf.keras.optimizers.SGD(learning_rate=0.02),
server_optimizer_fn=lambda: tf.keras.optimizers.SGD(learning_rate=1.0))
``````

``````def initialize():
...

def next(state):
...

iterative_process = IterativeProcess(initialize, next)
state = iterative_process.initialize()
for round in range(num_rounds):
state = iterative_process.next(state)
``````

``````state = iterative_process.initialize()
``````

``````# Run one single round of training.
state, metrics = iterative_process.next(state, federated_train_data)
print('round  1, metrics={}'.format(metrics['train']))
``````

``````NUM_ROUNDS = 11
for round_num in range(2, NUM_ROUNDS):
state, metrics = iterative_process.next(state, federated_train_data)
print('round {:2d}, metrics={}'.format(round_num, metrics['train']))
``````

## 在 TensorBoard 中显示模型指标接下来，我们使用 Tensorboard 呈现来自这些联合计算的指标。

``````import os
import shutil

logdir = "/tmp/logs/scalars/training/"
if os.path.exists(logdir):
shutil.rmtree(logdir)

# Your code to create a summary writer:
summary_writer = tf.summary.create_file_writer(logdir)

state = iterative_process.initialize()
``````

``````with summary_writer.as_default():
for round_num in range(1, NUM_ROUNDS):
state, metrics = iterative_process.next(state, federated_train_data)
for name, value in metrics['train'].items():
tf.summary.scalar(name, value, step=round_num)
``````

``````%tensorboard --logdir /tmp/logs/scalars/ --port=0
``````

## 评估

``````# Construct federated evaluation computation here:
evaluation = tff.learning.build_federated_evaluation(model_fn)
``````

``````import random
shuffled_ids = emnist_test.client_ids.copy()
random.shuffle(shuffled_ids)
sample_clients = shuffled_ids[0:NUM_CLIENTS]

federated_test_data = make_federated_data(emnist_test, sample_clients)

len(federated_test_data), federated_test_data[0]
``````
``````# Run evaluation on the test data here, using the federated model produced from
# training:
test_metrics = evaluation(state.model, federated_test_data)
``````
``````str(test_metrics)
``````

# 构建您自己的 FL 算法

• 了解联合学习算法的一般结构。
• 探索 TFF 的 Federated Core
• 使用 Federated Core 直接实现联合平均。

## 准备输入数据

``````emnist_train, emnist_test = tff.simulation.datasets.emnist.load_data()
``````
``````NUM_CLIENTS = 10
BATCH_SIZE = 20

def preprocess(dataset):

def batch_format_fn(element):
"""Flatten a batch of EMNIST data and return a (features, label) tuple."""
return (tf.reshape(element['pixels'], [-1, 784]),
tf.reshape(element['label'], [-1, 1]))

return dataset.batch(BATCH_SIZE).map(batch_format_fn)
``````
``````client_ids = np.random.choice(emnist_train.client_ids, size=NUM_CLIENTS, replace=False)

federated_train_data = [preprocess(emnist_train.create_tf_dataset_for_client(x))
for x in client_ids
]
``````

## 准备模型

``````def create_keras_model():
return tf.keras.models.Sequential([
tf.keras.layers.InputLayer(input_shape=(784,)),
tf.keras.layers.Dense(10, kernel_initializer='zeros'),
tf.keras.layers.Softmax(),
])
``````

``````def model_fn():
keras_model = create_keras_model()
return tff.learning.from_keras_model(
keras_model,
input_spec=federated_train_data[0].element_spec,
loss=tf.keras.losses.SparseCategoricalCrossentropy(),
metrics=[tf.keras.metrics.SparseCategoricalAccuracy()])
``````

# 自定义 FL 算法

1. 服务器到客户端的广播步骤。
2. 本地客户端更新步骤。
3. 客户端到服务器的上传步骤。
4. 服务器更新步骤。

``````def initialize_fn():
model = model_fn()
return model.weights.trainable
``````

``````def next_fn(server_weights, federated_dataset):
# Broadcast the server weights to the clients.

# Each client computes their updated weights.
client_weights = client_update(federated_dataset, server_weights_at_client)

# The server averages these updates.
mean_client_weights = mean(client_weights)

# The server updates its model.
server_weights = server_update(mean_client_weights)

return server_weights
``````

## TensorFlow 块

### 客户端更新

• `trainable`：与可训练层对应的张量列表。
• `non_trainable`：与不可训练层对应的张量列表。

``````@tf.function
def client_update(model, dataset, server_weights, client_optimizer):
"""Performs training (using the server model weights) on the client's dataset."""
# Initialize the client model with the current server weights.
client_weights = model.weights.trainable
# Assign the server weights to the client model.
tf.nest.map_structure(lambda x, y: x.assign(y),
client_weights, server_weights)

# Use the client_optimizer to update the local model.
for batch in dataset:
# Compute a forward pass on the batch of data
outputs = model.forward_pass(batch)

# Apply the gradient using a client optimizer.

return client_weights
``````

### 服务器更新

``````@tf.function
def server_update(model, mean_client_weights):
"""Updates the server model weights as the average of the client model weights."""
model_weights = model.weights.trainable
# Assign the mean client weights to the server model.
tf.nest.map_structure(lambda x, y: x.assign(y),
model_weights, mean_client_weights)
return model_weights
``````

# Federated Core 简介

Federated Core (FC) 是一组用作 `tff.learning` API 基础的低级接口。不过，这些接口不仅限于学习。事实上，它们可用于对分布式数据进行分析和许多其他计算。

## 联合数据

``````federated_float_on_clients = tff.type_at_clients(tf.float32)
``````

``````str(federated_float_on_clients)
``````

TFF 关注三个信息：数据、数据放置的位置以及数据如何转换。前两个封装在联合类型中，而最后一个封装在联合计算中。

## 联合计算

TFF 是一种强类型函数式编程环境，其基本单元是联合计算。这些单元是接受联合值作为输入并返回联合值作为输出的逻辑片段。

``````@tff.federated_computation(tff.type_at_clients(tf.float32))
def get_average_temperature(client_temperatures):
return tff.federated_mean(client_temperatures)
``````

``````str(get_average_temperature.type_signature)
``````

`tff.federated_computation` 接受联合类型 `<float>@CLIENTS` 的参数，并返回联合类型 `<float>@SERVER` 的值。联合计算可以从服务器到客户端、从客户端到客户端或者从服务器到服务器。另外，联合计算的构成也可以像普通函数一样，只要它们的类型签名匹配即可。

``````get_average_temperature([68.5, 70.3, 69.8])
``````

## 非 Eager 计算和 TensorFlow

``````@tff.tf_computation(tf.float32)
``````

``````str(add_half.type_signature)
``````

``````@tff.federated_computation(tff.type_at_clients(tf.float32))
``````

``````str(add_half_on_clients.type_signature)
``````

# 构建您自己的 FL 算法（第 2 部分）

## TensorFlow 联合块

### 创建初始化计算

``````@tff.tf_computation
def server_init():
model = model_fn()
return model.weights.trainable
``````

``````@tff.federated_computation
def initialize_fn():
return tff.federated_value(server_init(), tff.SERVER)
``````

### 创建 `next_fn`

``````whimsy_model = model_fn()
tf_dataset_type = tff.SequenceType(whimsy_model.input_spec)
``````

``````str(tf_dataset_type)
``````

``````model_weights_type = server_init.type_signature.result
``````

``````str(model_weights_type)
``````

``````@tff.tf_computation(tf_dataset_type, model_weights_type)
def client_update_fn(tf_dataset, server_weights):
model = model_fn()
client_optimizer = tf.keras.optimizers.SGD(learning_rate=0.01)
return client_update(model, tf_dataset, server_weights, client_optimizer)
``````

``````@tff.tf_computation(model_weights_type)
def server_update_fn(mean_client_weights):
model = model_fn()
return server_update(model, mean_client_weights)
``````

``````federated_server_type = tff.type_at_server(model_weights_type)
federated_dataset_type = tff.type_at_clients(tf_dataset_type)
``````

1. 服务器到客户端的广播步骤。
2. 本地客户端更新步骤。
3. 客户端到服务器的上传步骤。
4. 服务器更新步骤。

``````@tff.federated_computation(federated_server_type, federated_dataset_type)
def next_fn(server_weights, federated_dataset):
# Broadcast the server weights to the clients.

# Each client computes their updated weights.
client_weights = tff.federated_map(
client_update_fn, (federated_dataset, server_weights_at_client))

# The server averages these updates.
mean_client_weights = tff.federated_mean(client_weights)

# The server updates its model.
server_weights = tff.federated_map(server_update_fn, mean_client_weights)

return server_weights
``````

``````federated_algorithm = tff.templates.IterativeProcess(
initialize_fn=initialize_fn,
next_fn=next_fn
)
``````

``````str(federated_algorithm.initialize.type_signature)
``````

``````str(federated_algorithm.next.type_signature)
``````

## 评估算法

``````central_emnist_test = emnist_test.create_tf_dataset_from_all_clients().take(1000)
central_emnist_test = preprocess(central_emnist_test)
``````

``````def evaluate(server_state):
keras_model = create_keras_model()
keras_model.compile(
loss=tf.keras.losses.SparseCategoricalCrossentropy(),
metrics=[tf.keras.metrics.SparseCategoricalAccuracy()]
)
keras_model.set_weights(server_state)
keras_model.evaluate(central_emnist_test)
``````

``````server_state = federated_algorithm.initialize()
evaluate(server_state)
``````

``````for round in range(15):
server_state = federated_algorithm.next(server_state, federated_train_data)
``````
``````evaluate(server_state)
``````

[]
[]