Ceci est une alternative à la construire votre propre algorithme d' apprentissage fédéré tutoriel et le simple_fedavg exemple pour construire un processus itératif personnalisé pour la fédérée moyenne algorithme. Ce tutoriel utilisera optimiseurs TFF au lieu de optimiseurs KERAS. L'abstraction de l'optimiseur TFF est conçue pour être état-in-état-out afin d'être plus facile à incorporer dans un processus itératif TFF. Les tff.learning
API acceptent également optimiseurs TFF comme argument d'entrée.
Avant de commencer
Avant de commencer, veuillez exécuter ce qui suit pour vous assurer que votre environnement est correctement configuré. Si vous ne voyez pas un message d' accueil, s'il vous plaît se référer à l' installation guide pour les instructions.
!pip install --quiet --upgrade tensorflow-federated-nightly
!pip install --quiet --upgrade nest-asyncio
import nest_asyncio
import functools
import attr
import numpy as np
import tensorflow as tf
import tensorflow_federated as tff
Préparation des données et du modèle
Le traitement des données de EMNIST et le modèle sont très similaires à l' simple_fedavg exemple.
# Load dataset.
emnist_train, emnist_test = tff.simulation.datasets.emnist.load_data(only_digits)
# Define preprocessing functions.
def preprocess_fn(dataset, batch_size=16):
def batch_format_fn(element):
return (tf.expand_dims(element['pixels'], -1), element['label'])
return dataset.batch(batch_size).map(batch_format_fn)
# Preprocess and sample clients for prototyping.
train_client_ids = sorted(emnist_train.client_ids)
train_data = emnist_train.preprocess(preprocess_fn)
central_test_data = preprocess_fn(
# Define model.
def create_keras_model():
"""The CNN model used in https://arxiv.org/abs/1602.05629."""
data_format = 'channels_last'
input_shape = [28, 28, 1]
max_pool = functools.partial(
pool_size=(2, 2),
conv2d = functools.partial(
model = tf.keras.models.Sequential([
conv2d(filters=32, input_shape=input_shape),
tf.keras.layers.Dense(512, activation=tf.nn.relu),
tf.keras.layers.Dense(10 if only_digits else 62),
return model
# Wrap as `tff.learning.Model`.
def model_fn():
keras_model = create_keras_model()
return tff.learning.from_keras_model(
Processus itératif personnalisé
Dans de nombreux cas, les algorithmes fédérés ont 4 composants principaux :
- Une étape de diffusion de serveur à client.
- Une étape de mise à jour du client local.
- Une étape de téléchargement client-serveur.
- Une étape de mise à jour du serveur.
Dans TFF, nous représentons généralement des algorithmes fédérés comme tff.templates.IterativeProcess
(que nous appelons juste un IterativeProcess
tout au long). Ceci est une classe qui contient initialize
et next
fonctions. Ici, initialize
est utilisé pour initialiser le serveur, et next
effectuera un tour de communication de l'algorithme fédéré.
Nous introduirons différents composants pour construire l'algorithme de moyenne fédérée (FedAvg), qui utilisera un optimiseur à l'étape de mise à jour du client et un autre optimiseur à l'étape de mise à jour du serveur. Les logiques de base des mises à jour client et serveur peuvent être exprimées sous forme de blocs TF purs.
Blocs TF : mise à jour client et serveur
Sur chaque client, local client_optimizer
est initialisé et utilisé pour mettre à jour les poids du modèle client. Sur le serveur, server_optimizer
utilisera l'état du tour précédent, et mettre à jour l'état pour le prochain tour.
def client_update(model, dataset, server_weights, client_optimizer):
"""Performs local training on the client's dataset."""
# Initialize the client model with the current server weights.
client_weights = model.trainable_variables
# Assign the server weights to the client model.
tf.nest.map_structure(lambda x, y: x.assign(y),
client_weights, server_weights)
# Initialize the client optimizer.
trainable_tensor_specs = tf.nest.map_structure(
lambda v: tf.TensorSpec(v.shape, v.dtype), client_weights)
optimizer_state = client_optimizer.initialize(trainable_tensor_specs)
# Use the client_optimizer to update the local model.
for batch in iter(dataset):
with tf.GradientTape() as tape:
# Compute a forward pass on the batch of data.
outputs = model.forward_pass(batch)
# Compute the corresponding gradient.
grads = tape.gradient(outputs.loss, client_weights)
# Apply the gradient using a client optimizer.
optimizer_state, updated_weights = client_optimizer.next(
optimizer_state, client_weights, grads)
tf.nest.map_structure(lambda a, b: a.assign(b),
client_weights, updated_weights)
# Return model deltas.
return tf.nest.map_structure(tf.subtract, client_weights, server_weights)
@attr.s(eq=False, frozen=True, slots=True)
class ServerState(object):
trainable_weights = attr.ib()
optimizer_state = attr.ib()
def server_update(server_state, mean_model_delta, server_optimizer):
"""Updates the server model weights."""
# Use aggregated negative model delta as pseudo gradient.
negative_weights_delta = tf.nest.map_structure(
lambda w: -1.0 * w, mean_model_delta)
new_optimizer_state, updated_weights = server_optimizer.next(
server_state.optimizer_state, server_state.trainable_weights,
return tff.structure.update_struct(
Blocs TFF: tff.tf_computation
et tff.federated_computation
Nous utilisons maintenant TFF pour l'orchestration et construisons le processus itératif pour FedAvg. Nous devons envelopper les blocs de TF définis ci - dessus avec tff.tf_computation
, et les méthodes de TFF utilisation tff.federated_broadcast
, tff.federated_map
, tff.federated_mean
dans une tff.federated_computation
fonction. Il est facile d'utiliser les tff.learning.optimizers.Optimizer
API avec initialize
et next
fonctions lors de la définition d' un processus itératif personnalisé.
# 1. Server and client optimizer to be used.
server_optimizer = tff.learning.optimizers.build_sgdm(
learning_rate=0.05, momentum=0.9)
client_optimizer = tff.learning.optimizers.build_sgdm(
# 2. Functions return initial state on server.
def server_init():
model = model_fn()
trainable_tensor_specs = tf.nest.map_structure(
lambda v: tf.TensorSpec(v.shape, v.dtype), model.trainable_variables)
optimizer_state = server_optimizer.initialize(trainable_tensor_specs)
return ServerState(
def server_init_tff():
return tff.federated_value(server_init(), tff.SERVER)
# 3. One round of computation and communication.
server_state_type = server_init.type_signature.result
trainable_weights_type = server_state_type.trainable_weights
# 3-1. Wrap server and client TF blocks with `tff.tf_computation`.
@tff.tf_computation(server_state_type, trainable_weights_type)
def server_update_fn(server_state, model_delta):
return server_update(server_state, model_delta, server_optimizer)
whimsy_model = model_fn()
tf_dataset_type = tff.SequenceType(whimsy_model.input_spec)
@tff.tf_computation(tf_dataset_type, trainable_weights_type)
def client_update_fn(dataset, server_weights):
model = model_fn()
return client_update(model, dataset, server_weights, client_optimizer)
# 3-2. Orchestration with `tff.federated_computation`.
federated_server_type = tff.FederatedType(server_state_type, tff.SERVER)
federated_dataset_type = tff.FederatedType(tf_dataset_type, tff.CLIENTS)
@tff.federated_computation(federated_server_type, federated_dataset_type)
def run_one_round(server_state, federated_dataset):
# Server-to-client broadcast.
server_weights_at_client = tff.federated_broadcast(
# Local client update.
model_deltas = tff.federated_map(
client_update_fn, (federated_dataset, server_weights_at_client))
# Client-to-server upload and aggregation.
mean_model_delta = tff.federated_mean(model_deltas)
# Server update.
server_state = tff.federated_map(
server_update_fn, (server_state, mean_model_delta))
return server_state
# 4. Build the iterative process for FedAvg.
fedavg_process = tff.templates.IterativeProcess(
initialize_fn=server_init_tff, next_fn=run_one_round)
print('type signature of `initialize`:\n',
print('type signature of `next`:\n',
server_state_type: < trainable_weights=< float32[5,5,1,32], float32[32], float32[5,5,32,64], float32[64], float32[3136,512], float32[512], float32[512,10], float32[10] >, optimizer_state=< float32[5,5,1,32], float32[32], float32[5,5,32,64], float32[64], float32[3136,512], float32[512], float32[512,10], float32[10] > > trainable_weights_type: < float32[5,5,1,32], float32[32], float32[5,5,32,64], float32[64], float32[3136,512], float32[512], float32[512,10], float32[10] > tf_dataset_type: < float32[?,28,28,1], int32[?] >* type signature of `initialize`: ( -> < trainable_weights=< float32[5,5,1,32], float32[32], float32[5,5,32,64], float32[64], float32[3136,512], float32[512], float32[512,10], float32[10] >, optimizer_state=< float32[5,5,1,32], float32[32], float32[5,5,32,64], float32[64], float32[3136,512], float32[512], float32[512,10], float32[10] > >@SERVER) type signature of `next`: (< server_state=< trainable_weights=< float32[5,5,1,32], float32[32], float32[5,5,32,64], float32[64], float32[3136,512], float32[512], float32[512,10], float32[10] >, optimizer_state=< float32[5,5,1,32], float32[32], float32[5,5,32,64], float32[64], float32[3136,512], float32[512], float32[512,10], float32[10] > >@SERVER, federated_dataset={< float32[?,28,28,1], int32[?] >*}@CLIENTS > -> < trainable_weights=< float32[5,5,1,32], float32[32], float32[5,5,32,64], float32[64], float32[3136,512], float32[512], float32[512,10], float32[10] >, optimizer_state=< float32[5,5,1,32], float32[32], float32[5,5,32,64], float32[64], float32[3136,512], float32[512], float32[512,10], float32[10] > >@SERVER)
Évaluation de l'algorithme
Nous évaluons la performance sur un ensemble de données d'évaluation centralisé.
def evaluate(server_state):
keras_model = create_keras_model()
lambda var, t: var.assign(t),
keras_model.trainable_weights, server_state.trainable_weights)
metric = tf.keras.metrics.SparseCategoricalAccuracy()
for batch in iter(central_test_data):
preds = keras_model(batch[0], training=False)
metric.update_state(y_true=batch[1], y_pred=preds)
return metric.result().numpy()
server_state = fedavg_process.initialize()
acc = evaluate(server_state)
print('Initial test accuracy', acc)
# Evaluate after a few rounds
sampled_clients = train_client_ids[:CLIENTS_PER_ROUND]
sampled_train_data = [
for client in sampled_clients]
for round in range(20):
server_state = fedavg_process.next(server_state, sampled_train_data)
acc = evaluate(server_state)
print('Test accuracy', acc)
Initial test accuracy 0.09677419 Test accuracy 0.13978495