本指南演示了如何将多工作进程分布式训练工作流从 TensorFlow 1 迁移到 TensorFlow 2。
使用 CPU/GPU 执行多工作进程训练:
- 在 TensorFlow 1 中,您通常会使用
API。 - 在 TensorFlow 2 中,使用 Keras API 编写模型、损失函数、优化器和指标。随后,利用 Keras
API 或自定义训练循环(使用tf.GradientTape
# The notebook uses a dataset instance for `Model.fit` with
# `ParameterServerStrategy`, which depends on symbols in TF 2.7.
# Install a utility needed for this demonstration
!pip install portpicker
import tensorflow as tf
import tensorflow.compat.v1 as tf1
features = [[1., 1.5], [2., 2.5], [3., 3.5]]
labels = [[0.3], [0.5], [0.7]]
eval_features = [[4., 4.5], [5., 5.5], [6., 6.5]]
eval_labels = [[0.8], [0.9], [1.]]
您将需要 'TF_CONFIG'
配置环境变量以在 TensorFlow 中的多台机器上进行训练。使用 'TF_CONFIG'
指定 'cluster'
和 'task'
import json
import os
tf_config = {
'cluster': {
'chief': ['localhost:11111'],
'worker': ['localhost:12345', 'localhost:23456', 'localhost:21212'],
'ps': ['localhost:12121', 'localhost:13131'],
'task': {'type': 'chief', 'index': 0}
os.environ['TF_CONFIG'] = json.dumps(tf_config)
注:遗憾的是,由于在 TensorFlow 1 中使用 tf.estimator
API 进行多工作进程训练需要多个客户端(在此 Colab 笔记本中实现这一点会特别棘手),这会使笔记本在没有 'TF_CONFIG'
环境变量的情况下可运行,因此它会回退为本地训练。(有关详情,请参阅使用 TensorFlow 进行分布式训练指南中的设置 'TF_CONFIG'
使用 del
语句移除变量(但在 TensorFlow 1 中的实际多工作进程训练中,您不必这样做):
del os.environ['TF_CONFIG']
TensorFlow 1:使用 tf.estimator API 进行多工作进程分布式训练
以下代码段演示了 TF1 中多工作进程训练的规范工作流:您将使用 tf.estimator.Estimator
和 tf.estimator.train_and_evaluate
API 来分布训练:
def _input_fn():
return tf1.data.Dataset.from_tensor_slices((features, labels)).batch(1)
def _eval_input_fn():
return tf1.data.Dataset.from_tensor_slices(
(eval_features, eval_labels)).batch(1)
def _model_fn(features, labels, mode):
logits = tf1.layers.Dense(1)(features)
loss = tf1.losses.mean_squared_error(labels=labels, predictions=logits)
optimizer = tf1.train.AdagradOptimizer(0.05)
train_op = optimizer.minimize(loss, global_step=tf1.train.get_global_step())
return tf1.estimator.EstimatorSpec(mode, loss=loss, train_op=train_op)
estimator = tf1.estimator.Estimator(model_fn=_model_fn)
train_spec = tf1.estimator.TrainSpec(input_fn=_input_fn)
eval_spec = tf1.estimator.EvalSpec(input_fn=_eval_input_fn)
tf1.estimator.train_and_evaluate(estimator, train_spec, eval_spec)
TensorFlow 2:使用分布策略进行多工作进程训练
在 TensorFlow 2 中,使用 CPU、GPU 和 TPU 的多个工作进程之间的分布式训练是通过 tf.distribute.Strategy
和 tf.distribute.MultiWorkerMirroredStrategy
,这两种策略都是为使用多个工作进程进行 CPU/GPU 训练而设计的。
使用了一个协调器 ('chief'
),这使其对此 Colab 笔记本中的环境更加友好。您将在此处使用一些效用函数来设置可运行体验所必需的支持元素:您将创建一个进程内聚簇,其中线程用于模拟参数服务器 ('ps'
) 和工作进程 ('worker'
)。有关参数服务器训练的更多信息,请参阅使用 ParameterServerStrategy 进行参数服务器训练教程。
在此示例中,首先使用 tf.distribute.cluster_resolver.TFConfigClusterResolver
环境变量以提供聚簇信息。如果您使用聚簇管理系统进行分布式训练,请检查它是否已经提供了 'TF_CONFIG'
,如果已提供,则无需显式设置此环境变量。(有关详情,请参阅使用 TensorFlow 进行分布式训练指南中的设置 'TF_CONFIG'
# Find ports that are available for the `'chief'` (the coordinator),
# `'worker'`s, and `'ps'` (parameter servers).
import portpicker
chief_port = portpicker.pick_unused_port()
worker_ports = [portpicker.pick_unused_port() for _ in range(3)]
ps_ports = [portpicker.pick_unused_port() for _ in range(2)]
# Dump the cluster information to `'TF_CONFIG'`.
tf_config = {
'cluster': {
'chief': ["localhost:%s" % chief_port],
'worker': ["localhost:%s" % port for port in worker_ports],
'ps': ["localhost:%s" % port for port in ps_ports],
'task': {'type': 'chief', 'index': 0}
os.environ['TF_CONFIG'] = json.dumps(tf_config)
# Use a cluster resolver to bridge the information to the strategy created below.
cluster_resolver = tf.distribute.cluster_resolver.TFConfigClusterResolver()
然后,为工程进程和参数服务器一一创建 tf.distribute.Server
# Workers need some inter_ops threads to work properly.
# This is only needed for this notebook to demo. Real servers
# should not need this.
worker_config = tf.compat.v1.ConfigProto()
worker_config.inter_op_parallelism_threads = 4
for i in range(3):
for i in range(2):
在现实世界的分布式训练中,您将使用多台机器而不是启动协调器上的所有 tf.distribute.Server
和 "ps"
(参数服务器)的机器将各自运行一个 tf.distribute.Server
一切准备就绪后,创建 ParameterServerStrategy
strategy = tf.distribute.experimental.ParameterServerStrategy(cluster_resolver)
INFO:tensorflow:`tf.distribute.experimental.ParameterServerStrategy` is initialized with cluster_spec: ClusterSpec({'chief': ['localhost:43271'], 'ps': ['localhost:46705', 'localhost:45915'], 'worker': ['localhost:39667', 'localhost:38207', 'localhost:35449']}) INFO:tensorflow:ParameterServerStrategyV2 is now connecting to cluster with cluster_spec: ClusterSpec({'chief': ['localhost:43271'], 'ps': ['localhost:46705', 'localhost:45915'], 'worker': ['localhost:39667', 'localhost:38207', 'localhost:35449']}) INFO:tensorflow:ParameterServerStrategy (CentralStorageStrategy if you are using a single machine) with compute_devices = ['/job:chief/replica:0/task:0/device:GPU:0', '/job:chief/replica:0/task:0/device:GPU:1', '/job:chief/replica:0/task:0/device:GPU:2', '/job:chief/replica:0/task:0/device:GPU:3'], variable_device = '/device:CPU:0' INFO:tensorflow:Number of GPUs on workers: 4
创建策略对象后,定义模型、优化器和其他变量,然后在 Strategy.scope
API 中调用 Keras Model.compile
以分布训练。(如需了解详情,请参阅 Strategy.scope
API 文档。)
dataset = tf.data.Dataset.from_tensor_slices(
(features, labels)).shuffle(10).repeat().batch(64)
eval_dataset = tf.data.Dataset.from_tensor_slices(
(eval_features, eval_labels)).repeat().batch(1)
with strategy.scope():
model = tf.keras.models.Sequential([tf.keras.layers.Dense(1)])
optimizer = tf.keras.optimizers.Adagrad(learning_rate=0.05)
model.compile(optimizer, "mse")
model.fit(dataset, epochs=5, steps_per_epoch=10)
model.evaluate(eval_dataset, steps=10, return_dict=True)
分区器 (
)TensorFlow 2 中的
支持变量分区,并提供与 TensorFlow 1 相同,但名称不容易混淆的分区器:
或者,您也可以使用 MultiWorkerMirroredStrategy
# To clean up the `TF_CONFIG` used for `ParameterServerStrategy`.
del os.environ['TF_CONFIG']
strategy = tf.distribute.MultiWorkerMirroredStrategy()
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled. INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:GPU:0', '/device:GPU:1', '/device:GPU:2', '/device:GPU:3'), communication = CommunicationImplementation.AUTO
可以将上面使用的策略替换为 MultiWorkerMirroredStrategy
与 tf.estimator
API 一样,由于 MultiWorkerMirroredStrategy
是一种多客户端策略,在此 Colab 笔记本中运行分布式训练并不容易。因此,用这种策略替换上面的代码最终会以在本地运行结束。使用 Keras Model.fit/自定义训练循环的多工作进程训练教程演示了在设置 'TF_CONFIG'
变量的情况下,如何在 Colab 的本地主机上使用两个工作进程运行多工作进程训练。在实践中,您将在外部 IP 地址/端口上创建多个工作进程,并使用 'TF_CONFIG'
要详细了解如何在 TensorFlow 2 中使用 tf.distribute.experimental.ParameterServerStrategy
和 tf.distribute.MultiWorkerMirroredStrategy