Посмотреть на TensorFlow.org | Запустить в Google Colab | Посмотреть исходный код на GitHub | Скачать блокнот |
Обзор
В этом руководстве показано, как tf.distribute.Strategy
можно использовать для распределенного обучения нескольких сотрудников с помощью tf.estimator
. Если вы пишете свой код с помощью tf.estimator
и заинтересованы в масштабировании за пределы одной машины с высокой производительностью, это руководство для вас.
Прежде чем начать, ознакомьтесь с руководством по стратегии распространения . Учебное пособие по работе с несколькими графическими процессорами также актуально, поскольку в этом руководстве используется та же модель.
Настраивать
Сначала настройте TensorFlow и необходимый импорт.
import tensorflow_datasets as tfds
import tensorflow as tf
import os, json
tf.compat.v1.disable_eager_execution()
Функция ввода
В этом руководстве используется набор данных MNIST из TensorFlow Datasets . Код здесь аналогичен учебному пособию с несколькими графическими процессорами с одним ключевым отличием: при использовании Estimator для обучения нескольких рабочих необходимо сегментировать набор данных по количеству рабочих, чтобы обеспечить сходимость модели. Входные данные сегментируются по рабочему индексу, так что каждый рабочий процесс обрабатывает 1/num_workers
отдельных частей набора данных.
BUFFER_SIZE = 10000
BATCH_SIZE = 64
def input_fn(mode, input_context=None):
datasets, info = tfds.load(name='mnist',
with_info=True,
as_supervised=True)
mnist_dataset = (datasets['train'] if mode == tf.estimator.ModeKeys.TRAIN else
datasets['test'])
def scale(image, label):
image = tf.cast(image, tf.float32)
image /= 255
return image, label
if input_context:
mnist_dataset = mnist_dataset.shard(input_context.num_input_pipelines,
input_context.input_pipeline_id)
return mnist_dataset.map(scale).cache().shuffle(BUFFER_SIZE).batch(BATCH_SIZE)
Другим разумным подходом к достижению конвергенции было бы перемешивание набора данных с разными начальными значениями для каждого рабочего.
Многопользовательская конфигурация
Одним из ключевых отличий этого руководства (по сравнению с учебным пособием по работе с несколькими графическими процессорами ) является настройка нескольких рабочих мест. Переменная среды TF_CONFIG
— это стандартный способ указать конфигурацию кластера для каждого рабочего процесса, который является частью кластера.
Есть два компонента TF_CONFIG
: cluster
и task
. cluster
предоставляет информацию обо всем кластере, а именно о рабочих процессах и серверах параметров в кластере. task
предоставляет информацию о текущей задаче. Первый компонент cluster
одинаков для всех рабочих процессов и серверов параметров в кластере, а task
второго компонента отличается для каждого рабочего процесса и сервера параметров и указывает свой type
и index
. В этом примере type
задачи — worker
, а index
задачи — 0
.
В целях иллюстрации в этом руководстве показано, как настроить TF_CONFIG
с двумя рабочими процессами на localhost
хосте. На практике вы должны создать несколько воркеров на внешнем IP-адресе и порту и соответствующим образом установить TF_CONFIG
для каждого воркера, т.е. изменить index
задачи.
os.environ['TF_CONFIG'] = json.dumps({
'cluster': {
'worker': ["localhost:12345", "localhost:23456"]
},
'task': {'type': 'worker', 'index': 0}
})
Определить модель
Напишите слои, оптимизатор и функцию потерь для обучения. В этом учебном пособии определяется модель со слоями Keras, аналогично учебному пособию по работе с несколькими графическими процессорами .
LEARNING_RATE = 1e-4
def model_fn(features, labels, mode):
model = tf.keras.Sequential([
tf.keras.layers.Conv2D(32, 3, activation='relu', input_shape=(28, 28, 1)),
tf.keras.layers.MaxPooling2D(),
tf.keras.layers.Flatten(),
tf.keras.layers.Dense(64, activation='relu'),
tf.keras.layers.Dense(10)
])
logits = model(features, training=False)
if mode == tf.estimator.ModeKeys.PREDICT:
predictions = {'logits': logits}
return tf.estimator.EstimatorSpec(labels=labels, predictions=predictions)
optimizer = tf.compat.v1.train.GradientDescentOptimizer(
learning_rate=LEARNING_RATE)
loss = tf.keras.losses.SparseCategoricalCrossentropy(
from_logits=True, reduction=tf.keras.losses.Reduction.NONE)(labels, logits)
loss = tf.reduce_sum(loss) * (1. / BATCH_SIZE)
if mode == tf.estimator.ModeKeys.EVAL:
return tf.estimator.EstimatorSpec(mode, loss=loss)
return tf.estimator.EstimatorSpec(
mode=mode,
loss=loss,
train_op=optimizer.minimize(
loss, tf.compat.v1.train.get_or_create_global_step()))
MultiWorkerMirroredСтратегии
Для обучения модели используйте экземпляр tf.distribute.experimental.MultiWorkerMirroredStrategy
. MultiWorkerMirroredStrategy
создает копии всех переменных в слоях модели на каждом устройстве для всех рабочих процессов. Он использует CollectiveOps
, операцию TensorFlow для коллективной коммуникации, для объединения градиентов и синхронизации переменных. Руководство tf.distribute.Strategy
содержит более подробную информацию об этой стратегии.
strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()
WARNING:tensorflow:From /tmp/ipykernel_7505/349189047.py:1: _CollectiveAllReduceStrategyExperimental.__init__ (from tensorflow.python.distribute.collective_all_reduce_strategy) is deprecated and will be removed in a future version. Instructions for updating: use distribute.MultiWorkerMirroredStrategy instead INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:GPU:0',), communication = CommunicationImplementation.AUTO
Обучите и оцените модель
Затем укажите стратегию распределения в RunConfig
для средства оценки и обучите и оцените, вызвав tf.estimator.train_and_evaluate
. Этот учебник распространяет только обучение, указав стратегию через train_distribute
. Также можно распространять оценку через eval_distribute
.
config = tf.estimator.RunConfig(train_distribute=strategy)
classifier = tf.estimator.Estimator(
model_fn=model_fn, model_dir='/tmp/multiworker', config=config)
tf.estimator.train_and_evaluate(
classifier,
train_spec=tf.estimator.TrainSpec(input_fn=input_fn),
eval_spec=tf.estimator.EvalSpec(input_fn=input_fn)
)
INFO:tensorflow:Initializing RunConfig with distribution strategies. INFO:tensorflow:Not using Distribute Coordinator. INFO:tensorflow:Using config: {'_model_dir': '/tmp/multiworker', '_tf_random_seed': None, '_save_summary_steps': 100, '_save_checkpoints_steps': None, '_save_checkpoints_secs': 600, '_session_config': allow_soft_placement: true graph_options { rewrite_options { meta_optimizer_iterations: ONE } } , '_keep_checkpoint_max': 5, '_keep_checkpoint_every_n_hours': 10000, '_log_step_count_steps': 100, '_train_distribute': <tensorflow.python.distribute.collective_all_reduce_strategy._CollectiveAllReduceStrategyExperimental object at 0x7f3404234490>, '_device_fn': None, '_protocol': None, '_eval_distribute': None, '_experimental_distribute': None, '_experimental_max_worker_delay_secs': None, '_session_creation_timeout_secs': 7200, '_checkpoint_save_graph_def': True, '_service': None, '_cluster_spec': ClusterSpec({}), '_task_type': 'worker', '_task_id': 0, '_global_id_in_cluster': 0, '_master': '', '_evaluation_master': '', '_is_chief': True, '_num_ps_replicas': 0, '_num_worker_replicas': 1, '_distribute_coordinator_mode': None} INFO:tensorflow:Not using Distribute Coordinator. INFO:tensorflow:Running training and evaluation locally (non-distributed). INFO:tensorflow:Start train and evaluate loop. The evaluate will happen after every checkpoint. Checkpoint frequency is determined based on RunConfig arguments: save_checkpoints_steps None or save_checkpoints_secs 600. WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow_estimator/python/estimator/estimator.py:1244: StrategyBase.configure (from tensorflow.python.distribute.distribute_lib) is deprecated and will be removed in a future version. Instructions for updating: use `update_config_proto` instead. INFO:tensorflow:The `input_fn` accepts an `input_context` which will be given by DistributionStrategy INFO:tensorflow:Calling model_fn. /tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow/python/data/ops/dataset_ops.py:449: UserWarning: To make it possible to preserve tf.data options across serialization boundaries, their implementation has moved to be part of the TensorFlow graph. As a consequence, the options value is in general no longer known at graph construction time. Invoking this method in graph mode retains the legacy behavior of the original implementation, but note that the returned value might not reflect the actual value of the options. warnings.warn("To make it possible to preserve tf.data options across " INFO:tensorflow:Calling model_fn. INFO:tensorflow:Done calling model_fn. INFO:tensorflow:Done calling model_fn. INFO:tensorflow:Create CheckpointSaverHook. INFO:tensorflow:Create CheckpointSaverHook. WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow_estimator/python/estimator/util.py:95: DistributedIteratorV1.initialize (from tensorflow.python.distribute.v1.input_lib) is deprecated and will be removed in a future version. Instructions for updating: Use the iterator's `initializer` property instead. WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow_estimator/python/estimator/util.py:95: DistributedIteratorV1.initialize (from tensorflow.python.distribute.v1.input_lib) is deprecated and will be removed in a future version. Instructions for updating: Use the iterator's `initializer` property instead. INFO:tensorflow:Graph was finalized. INFO:tensorflow:Graph was finalized. INFO:tensorflow:Running local_init_op. INFO:tensorflow:Running local_init_op. INFO:tensorflow:Done running local_init_op. INFO:tensorflow:Done running local_init_op. INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 0... INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 0... INFO:tensorflow:Saving checkpoints for 0 into /tmp/multiworker/model.ckpt. INFO:tensorflow:Saving checkpoints for 0 into /tmp/multiworker/model.ckpt. INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 0... INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 0... 2022-01-26 05:29:43.503603: W tensorflow/core/grappler/utils/graph_view.cc:836] No registered 'MultiDeviceIteratorFromStringHandle' OpKernel for GPU devices compatible with node { {node MultiDeviceIteratorFromStringHandle} } . Registered: device='CPU' 2022-01-26 05:29:43.504873: W tensorflow/core/grappler/utils/graph_view.cc:836] No registered 'MultiDeviceIteratorGetNextFromShard' OpKernel for GPU devices compatible with node { {node MultiDeviceIteratorGetNextFromShard} } . Registered: device='CPU' INFO:tensorflow:loss = 2.292878, step = 0 INFO:tensorflow:loss = 2.292878, step = 0 INFO:tensorflow:global_step/sec: 173.275 INFO:tensorflow:global_step/sec: 173.275 INFO:tensorflow:loss = 2.29561, step = 100 (0.579 sec) INFO:tensorflow:loss = 2.29561, step = 100 (0.579 sec) INFO:tensorflow:global_step/sec: 189.057 INFO:tensorflow:global_step/sec: 189.057 INFO:tensorflow:loss = 2.2644367, step = 200 (0.529 sec) INFO:tensorflow:loss = 2.2644367, step = 200 (0.529 sec) INFO:tensorflow:global_step/sec: 193.075 INFO:tensorflow:global_step/sec: 193.075 INFO:tensorflow:loss = 2.2662685, step = 300 (0.517 sec) INFO:tensorflow:loss = 2.2662685, step = 300 (0.517 sec) INFO:tensorflow:global_step/sec: 199.957 INFO:tensorflow:global_step/sec: 199.957 INFO:tensorflow:loss = 2.2667098, step = 400 (0.500 sec) INFO:tensorflow:loss = 2.2667098, step = 400 (0.500 sec) INFO:tensorflow:global_step/sec: 204.217 INFO:tensorflow:global_step/sec: 204.217 INFO:tensorflow:loss = 2.251912, step = 500 (0.490 sec) INFO:tensorflow:loss = 2.251912, step = 500 (0.490 sec) INFO:tensorflow:global_step/sec: 201.747 INFO:tensorflow:global_step/sec: 201.747 INFO:tensorflow:loss = 2.2633677, step = 600 (0.496 sec) INFO:tensorflow:loss = 2.2633677, step = 600 (0.496 sec) INFO:tensorflow:global_step/sec: 206.079 INFO:tensorflow:global_step/sec: 206.079 INFO:tensorflow:loss = 2.2531767, step = 700 (0.485 sec) INFO:tensorflow:loss = 2.2531767, step = 700 (0.485 sec) INFO:tensorflow:global_step/sec: 231.299 INFO:tensorflow:global_step/sec: 231.299 INFO:tensorflow:loss = 2.2578738, step = 800 (0.433 sec) INFO:tensorflow:loss = 2.2578738, step = 800 (0.433 sec) INFO:tensorflow:global_step/sec: 657.044 INFO:tensorflow:global_step/sec: 657.044 INFO:tensorflow:loss = 2.2344787, step = 900 (0.150 sec) INFO:tensorflow:loss = 2.2344787, step = 900 (0.150 sec) INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 938... INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 938... INFO:tensorflow:Saving checkpoints for 938 into /tmp/multiworker/model.ckpt. INFO:tensorflow:Saving checkpoints for 938 into /tmp/multiworker/model.ckpt. INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 938... INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 938... INFO:tensorflow:Calling model_fn. INFO:tensorflow:Calling model_fn. INFO:tensorflow:Done calling model_fn. INFO:tensorflow:Done calling model_fn. INFO:tensorflow:Starting evaluation at 2022-01-26T05:29:56 INFO:tensorflow:Starting evaluation at 2022-01-26T05:29:56 INFO:tensorflow:Graph was finalized. INFO:tensorflow:Graph was finalized. INFO:tensorflow:Restoring parameters from /tmp/multiworker/model.ckpt-938 INFO:tensorflow:Restoring parameters from /tmp/multiworker/model.ckpt-938 INFO:tensorflow:Running local_init_op. INFO:tensorflow:Running local_init_op. INFO:tensorflow:Done running local_init_op. INFO:tensorflow:Done running local_init_op. INFO:tensorflow:Evaluation [10/100] INFO:tensorflow:Evaluation [10/100] INFO:tensorflow:Evaluation [20/100] INFO:tensorflow:Evaluation [20/100] INFO:tensorflow:Evaluation [30/100] INFO:tensorflow:Evaluation [30/100] INFO:tensorflow:Evaluation [40/100] INFO:tensorflow:Evaluation [40/100] INFO:tensorflow:Evaluation [50/100] INFO:tensorflow:Evaluation [50/100] INFO:tensorflow:Evaluation [60/100] INFO:tensorflow:Evaluation [60/100] INFO:tensorflow:Evaluation [70/100] INFO:tensorflow:Evaluation [70/100] INFO:tensorflow:Evaluation [80/100] INFO:tensorflow:Evaluation [80/100] INFO:tensorflow:Evaluation [90/100] INFO:tensorflow:Evaluation [90/100] INFO:tensorflow:Evaluation [100/100] INFO:tensorflow:Evaluation [100/100] INFO:tensorflow:Inference Time : 2.04637s INFO:tensorflow:Inference Time : 2.04637s INFO:tensorflow:Finished evaluation at 2022-01-26-05:29:58 INFO:tensorflow:Finished evaluation at 2022-01-26-05:29:58 INFO:tensorflow:Saving dict for global step 938: global_step = 938, loss = 2.234131 INFO:tensorflow:Saving dict for global step 938: global_step = 938, loss = 2.234131 INFO:tensorflow:Saving 'checkpoint_path' summary for global step 938: /tmp/multiworker/model.ckpt-938 INFO:tensorflow:Saving 'checkpoint_path' summary for global step 938: /tmp/multiworker/model.ckpt-938 INFO:tensorflow:Loss for final step: 1.10881. INFO:tensorflow:Loss for final step: 1.10881. ({'loss': 2.234131, 'global_step': 938}, [])
Оптимизация тренировок
Теперь у вас есть модель и оценщик с поддержкой нескольких рабочих на tf.distribute.Strategy
. Вы можете попробовать следующие методы для оптимизации производительности обучения нескольких сотрудников:
- Увеличьте размер пакета: указанный здесь размер пакета относится к графическому процессору. Как правило, рекомендуется наибольший размер пакета, который соответствует памяти графического процессора.
- Приведение переменных: приведение переменных к
tf.float
, если это возможно. Официальная модель ResNet включает пример того, как это можно сделать. Используйте коллективную коммуникацию:
MultiWorkerMirroredStrategy
предоставляет несколько реализаций коллективной коммуникации .-
RING
реализует коллективы на основе кольца, используя gRPC в качестве уровня связи между узлами. -
NCCL
использует NCCL от Nvidia для реализации коллективов. -
AUTO
откладывает выбор до среды выполнения.
Лучший выбор коллективной реализации зависит от количества и типа графических процессоров, а также сетевого соединения в кластере. Чтобы переопределить автоматический выбор, укажите допустимое значение для параметра
communication
конструктораMultiWorkerMirroredStrategy
, например,communication=tf.distribute.experimental.CollectiveCommunication.NCCL
.-
Посетите раздел « Производительность» в руководстве, чтобы узнать больше о других стратегиях и инструментах , которые вы можете использовать для оптимизации производительности своих моделей TensorFlow.
Другие примеры кода
- Сквозной пример обучения нескольких сотрудников tensorflow/экосистеме с использованием шаблонов Kubernetes. Этот пример начинается с модели Keras и преобразует ее в Estimator с помощью API
tf.keras.estimator.model_to_estimator
. - Официальные модели , многие из которых можно настроить для запуска нескольких стратегий распространения.