This tutorial will describe how to set up high-performance simulation using a TFF runtime running on Kubernetes. The model is the same as in the previous tutorial, High-performance simulations with TFF. The only difference is that here we use a worker pool instead of a local executor.
This tutorial refers to Google Cloud's GKE to create the Kubernetes cluster, but all the steps after the cluster is created can be used with any Kubernetes installation.
![]() |
![]() |
![]() |
在 GKE 上启动 TFF 工作进程
注:本教程假定用户目前拥有 GCP 项目。
创建一个 Kubernetes 集群
以下步骤只需执行一次。可以将该集群重用于将来的工作负载。
按照 GKE 说明来创建容器集群。本教程的其余部分假定集群的名称为 tff-cluster
,但实际名称并不重要。当您到达“第 5 步:部署应用”时,请停止按照说明操作。
部署 TFF 工作进程应用
与 GCP 交互的命令可以在本地运行,也可以在 Google Cloud Shell 中运行。我们建议使用 Google Cloud Shell,因为它不需要其他设置。
- 运行以下命令来启动 Kubernetes 应用。
$ kubectl create deployment tff-workers --image=gcr.io/tensorflow-federated/remote-executor-service:{ {version} }
- 为应用添加一个负载均衡器。
$ kubectl expose deployment tff-workers --type=LoadBalancer --port 80 --target-port 8000
注:这会将您的部署公开到互联网,并且仅用于演示目的。对于生产用途,强烈建议使用防火墙和身份验证。
在 Google Cloud Console 上查找负载均衡器的 IP 地址。您稍后会需要它来将训练循环连接到工作进程应用。
(或者)在本地启动 Docker 容器
$ docker run --rm -p 8000:8000 gcr.io/tensorflow-federated/remote_executor_service:{ {version} }
设置 TFF 环境
!pip install --upgrade tensorflow_federated_nightly
!pip install --quiet --upgrade nest_asyncio
import nest_asyncio
nest_asyncio.apply()
定义要训练的模型
import collections
import time
import tensorflow as tf
import tensorflow_federated as tff
source, _ = tff.simulation.datasets.emnist.load_data()
def map_fn(example):
return collections.OrderedDict(
x=tf.reshape(example['pixels'], [-1, 784]), y=example['label'])
def client_data(n):
ds = source.create_tf_dataset_for_client(source.client_ids[n])
return ds.repeat(10).batch(20).map(map_fn)
train_data = [client_data(n) for n in range(10)]
input_spec = train_data[0].element_spec
def model_fn():
model = tf.keras.models.Sequential([
tf.keras.layers.Input(shape=(784,)),
tf.keras.layers.Dense(units=10, kernel_initializer='zeros'),
tf.keras.layers.Softmax(),
])
return tff.learning.from_keras_model(
model,
input_spec=input_spec,
loss=tf.keras.losses.SparseCategoricalCrossentropy(),
metrics=[tf.keras.metrics.SparseCategoricalAccuracy()])
trainer = tff.learning.build_federated_averaging_process(
model_fn, client_optimizer_fn=lambda: tf.keras.optimizers.SGD(0.02))
def evaluate(num_rounds=10):
state = trainer.initialize()
for round in range(num_rounds):
t1 = time.time()
state, metrics = trainer.next(state, train_data)
t2 = time.time()
print('Round {}: loss {}, round time {}'.format(round, metrics.loss, t2 - t1))
设置远程执行器
默认情况下,TFF 在本地执行所有计算。在此步骤中,我们指示 TFF 连接到我们在上面设置的 Kubernetes 服务。确保在此处复制服务的 IP 地址。
import grpc
ip_address = '0.0.0.0'
port = 80
channels = [grpc.insecure_channel(f'{ip_address}:{port}') for _ in range(10)]
tff.backends.native.set_remote_execution_context(channels, rpc_mode='STREAMING')
运行训练
evaluate()
Round 0: loss 4.370407581329346, round time 4.201097726821899 Round 1: loss 4.1407670974731445, round time 3.3283166885375977 Round 2: loss 3.865147590637207, round time 3.098310947418213 Round 3: loss 3.534019708633423, round time 3.1565616130828857 Round 4: loss 3.272688388824463, round time 3.175067663192749 Round 5: loss 2.935391664505005, round time 3.008434534072876 Round 6: loss 2.7399251461029053, round time 3.31435227394104 Round 7: loss 2.5054931640625, round time 3.4411356449127197 Round 8: loss 2.290508985519409, round time 3.158798933029175 Round 9: loss 2.1194536685943604, round time 3.1348156929016113