Veja no TensorFlow.org | Executar no Google Colab | Ver fonte no GitHub | Baixar caderno |
Visão geral
Este tutorial demonstra como tf.distribute.Strategy
pode ser usado para treinamento distribuído de vários trabalhadores com tf.estimator
. Se você escreve seu código usando tf.estimator
e está interessado em escalar além de uma única máquina com alto desempenho, este tutorial é para você.
Antes de começar, leia o guia de estratégia de distribuição . O tutorial de treinamento multi-GPU também é relevante, pois este tutorial usa o mesmo modelo.
Configurar
Primeiro, configure o TensorFlow e as importações necessárias.
import tensorflow_datasets as tfds
import tensorflow as tf
import os, json
tf.compat.v1.disable_eager_execution()
Função de entrada
Este tutorial usa o conjunto de dados MNIST do TensorFlow Datasets . O código aqui é semelhante ao tutorial de treinamento multi-GPU com uma diferença importante: ao usar o Estimator para treinamento de vários trabalhadores, é necessário fragmentar o conjunto de dados pelo número de trabalhadores para garantir a convergência do modelo. Os dados de entrada são fragmentados pelo índice do trabalhador, para que cada trabalhador processe 1/num_workers
partes distintas do conjunto de dados.
BUFFER_SIZE = 10000
BATCH_SIZE = 64
def input_fn(mode, input_context=None):
datasets, info = tfds.load(name='mnist',
with_info=True,
as_supervised=True)
mnist_dataset = (datasets['train'] if mode == tf.estimator.ModeKeys.TRAIN else
datasets['test'])
def scale(image, label):
image = tf.cast(image, tf.float32)
image /= 255
return image, label
if input_context:
mnist_dataset = mnist_dataset.shard(input_context.num_input_pipelines,
input_context.input_pipeline_id)
return mnist_dataset.map(scale).cache().shuffle(BUFFER_SIZE).batch(BATCH_SIZE)
Outra abordagem razoável para alcançar a convergência seria embaralhar o conjunto de dados com sementes distintas em cada trabalhador.
Configuração de vários trabalhadores
Uma das principais diferenças neste tutorial (em comparação com o tutorial de treinamento multi-GPU ) é a configuração de vários trabalhadores. A variável de ambiente TF_CONFIG
é a maneira padrão de especificar a configuração do cluster para cada trabalhador que faz parte do cluster.
Existem dois componentes do TF_CONFIG
: cluster
e task
. cluster
fornece informações sobre todo o cluster, ou seja, os trabalhadores e os servidores de parâmetros no cluster. task
fornece informações sobre a tarefa atual. O primeiro cluster
componentes é o mesmo para todos os trabalhadores e servidores de parâmetros no cluster, e a segunda task
de componente é diferente em cada trabalhador e servidor de parâmetros e especifica seu próprio type
e index
. Neste exemplo, o type
tarefa é worker
e o index
de tarefa é 0
.
Para fins de ilustração, este tutorial mostra como definir um TF_CONFIG
com 2 workers em localhost
. Na prática, você criaria vários trabalhadores em um endereço IP e porta externos e TF_CONFIG
em cada trabalhador adequadamente, ou seja, modificaria o index
da tarefa .
os.environ['TF_CONFIG'] = json.dumps({
'cluster': {
'worker': ["localhost:12345", "localhost:23456"]
},
'task': {'type': 'worker', 'index': 0}
})
Defina o modelo
Escreva as camadas, o otimizador e a função de perda para treinamento. Este tutorial define o modelo com camadas Keras, semelhante ao tutorial de treinamento multi-GPU .
LEARNING_RATE = 1e-4
def model_fn(features, labels, mode):
model = tf.keras.Sequential([
tf.keras.layers.Conv2D(32, 3, activation='relu', input_shape=(28, 28, 1)),
tf.keras.layers.MaxPooling2D(),
tf.keras.layers.Flatten(),
tf.keras.layers.Dense(64, activation='relu'),
tf.keras.layers.Dense(10)
])
logits = model(features, training=False)
if mode == tf.estimator.ModeKeys.PREDICT:
predictions = {'logits': logits}
return tf.estimator.EstimatorSpec(labels=labels, predictions=predictions)
optimizer = tf.compat.v1.train.GradientDescentOptimizer(
learning_rate=LEARNING_RATE)
loss = tf.keras.losses.SparseCategoricalCrossentropy(
from_logits=True, reduction=tf.keras.losses.Reduction.NONE)(labels, logits)
loss = tf.reduce_sum(loss) * (1. / BATCH_SIZE)
if mode == tf.estimator.ModeKeys.EVAL:
return tf.estimator.EstimatorSpec(mode, loss=loss)
return tf.estimator.EstimatorSpec(
mode=mode,
loss=loss,
train_op=optimizer.minimize(
loss, tf.compat.v1.train.get_or_create_global_step()))
MultiWorkerMirroredStrategy
Para treinar o modelo, use uma instância de tf.distribute.experimental.MultiWorkerMirroredStrategy
. MultiWorkerMirroredStrategy
cria cópias de todas as variáveis nas camadas do modelo em cada dispositivo em todos os trabalhadores. Ele usa CollectiveOps
, uma operação do TensorFlow para comunicação coletiva, para agregar gradientes e manter as variáveis em sincronia. O guia tf.distribute.Strategy
tem mais detalhes sobre essa estratégia.
strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()
WARNING:tensorflow:From /tmp/ipykernel_7505/349189047.py:1: _CollectiveAllReduceStrategyExperimental.__init__ (from tensorflow.python.distribute.collective_all_reduce_strategy) is deprecated and will be removed in a future version. Instructions for updating: use distribute.MultiWorkerMirroredStrategy instead INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:GPU:0',), communication = CommunicationImplementation.AUTO
Treinar e avaliar o modelo
Em seguida, especifique a estratégia de distribuição no RunConfig
para o estimador e treine e avalie invocando tf.estimator.train_and_evaluate
. Este tutorial distribui apenas o treinamento especificando a estratégia via train_distribute
. Também é possível distribuir a avaliação via eval_distribute
.
config = tf.estimator.RunConfig(train_distribute=strategy)
classifier = tf.estimator.Estimator(
model_fn=model_fn, model_dir='/tmp/multiworker', config=config)
tf.estimator.train_and_evaluate(
classifier,
train_spec=tf.estimator.TrainSpec(input_fn=input_fn),
eval_spec=tf.estimator.EvalSpec(input_fn=input_fn)
)
INFO:tensorflow:Initializing RunConfig with distribution strategies. INFO:tensorflow:Not using Distribute Coordinator. INFO:tensorflow:Using config: {'_model_dir': '/tmp/multiworker', '_tf_random_seed': None, '_save_summary_steps': 100, '_save_checkpoints_steps': None, '_save_checkpoints_secs': 600, '_session_config': allow_soft_placement: true graph_options { rewrite_options { meta_optimizer_iterations: ONE } } , '_keep_checkpoint_max': 5, '_keep_checkpoint_every_n_hours': 10000, '_log_step_count_steps': 100, '_train_distribute': <tensorflow.python.distribute.collective_all_reduce_strategy._CollectiveAllReduceStrategyExperimental object at 0x7f3404234490>, '_device_fn': None, '_protocol': None, '_eval_distribute': None, '_experimental_distribute': None, '_experimental_max_worker_delay_secs': None, '_session_creation_timeout_secs': 7200, '_checkpoint_save_graph_def': True, '_service': None, '_cluster_spec': ClusterSpec({}), '_task_type': 'worker', '_task_id': 0, '_global_id_in_cluster': 0, '_master': '', '_evaluation_master': '', '_is_chief': True, '_num_ps_replicas': 0, '_num_worker_replicas': 1, '_distribute_coordinator_mode': None} INFO:tensorflow:Not using Distribute Coordinator. INFO:tensorflow:Running training and evaluation locally (non-distributed). INFO:tensorflow:Start train and evaluate loop. The evaluate will happen after every checkpoint. Checkpoint frequency is determined based on RunConfig arguments: save_checkpoints_steps None or save_checkpoints_secs 600. WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow_estimator/python/estimator/estimator.py:1244: StrategyBase.configure (from tensorflow.python.distribute.distribute_lib) is deprecated and will be removed in a future version. Instructions for updating: use `update_config_proto` instead. INFO:tensorflow:The `input_fn` accepts an `input_context` which will be given by DistributionStrategy INFO:tensorflow:Calling model_fn. /tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow/python/data/ops/dataset_ops.py:449: UserWarning: To make it possible to preserve tf.data options across serialization boundaries, their implementation has moved to be part of the TensorFlow graph. As a consequence, the options value is in general no longer known at graph construction time. Invoking this method in graph mode retains the legacy behavior of the original implementation, but note that the returned value might not reflect the actual value of the options. warnings.warn("To make it possible to preserve tf.data options across " INFO:tensorflow:Calling model_fn. INFO:tensorflow:Done calling model_fn. INFO:tensorflow:Done calling model_fn. INFO:tensorflow:Create CheckpointSaverHook. INFO:tensorflow:Create CheckpointSaverHook. WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow_estimator/python/estimator/util.py:95: DistributedIteratorV1.initialize (from tensorflow.python.distribute.v1.input_lib) is deprecated and will be removed in a future version. Instructions for updating: Use the iterator's `initializer` property instead. WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow_estimator/python/estimator/util.py:95: DistributedIteratorV1.initialize (from tensorflow.python.distribute.v1.input_lib) is deprecated and will be removed in a future version. Instructions for updating: Use the iterator's `initializer` property instead. INFO:tensorflow:Graph was finalized. INFO:tensorflow:Graph was finalized. INFO:tensorflow:Running local_init_op. INFO:tensorflow:Running local_init_op. INFO:tensorflow:Done running local_init_op. INFO:tensorflow:Done running local_init_op. INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 0... INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 0... INFO:tensorflow:Saving checkpoints for 0 into /tmp/multiworker/model.ckpt. INFO:tensorflow:Saving checkpoints for 0 into /tmp/multiworker/model.ckpt. INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 0... INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 0... 2022-01-26 05:29:43.503603: W tensorflow/core/grappler/utils/graph_view.cc:836] No registered 'MultiDeviceIteratorFromStringHandle' OpKernel for GPU devices compatible with node { {node MultiDeviceIteratorFromStringHandle} } . Registered: device='CPU' 2022-01-26 05:29:43.504873: W tensorflow/core/grappler/utils/graph_view.cc:836] No registered 'MultiDeviceIteratorGetNextFromShard' OpKernel for GPU devices compatible with node { {node MultiDeviceIteratorGetNextFromShard} } . Registered: device='CPU' INFO:tensorflow:loss = 2.292878, step = 0 INFO:tensorflow:loss = 2.292878, step = 0 INFO:tensorflow:global_step/sec: 173.275 INFO:tensorflow:global_step/sec: 173.275 INFO:tensorflow:loss = 2.29561, step = 100 (0.579 sec) INFO:tensorflow:loss = 2.29561, step = 100 (0.579 sec) INFO:tensorflow:global_step/sec: 189.057 INFO:tensorflow:global_step/sec: 189.057 INFO:tensorflow:loss = 2.2644367, step = 200 (0.529 sec) INFO:tensorflow:loss = 2.2644367, step = 200 (0.529 sec) INFO:tensorflow:global_step/sec: 193.075 INFO:tensorflow:global_step/sec: 193.075 INFO:tensorflow:loss = 2.2662685, step = 300 (0.517 sec) INFO:tensorflow:loss = 2.2662685, step = 300 (0.517 sec) INFO:tensorflow:global_step/sec: 199.957 INFO:tensorflow:global_step/sec: 199.957 INFO:tensorflow:loss = 2.2667098, step = 400 (0.500 sec) INFO:tensorflow:loss = 2.2667098, step = 400 (0.500 sec) INFO:tensorflow:global_step/sec: 204.217 INFO:tensorflow:global_step/sec: 204.217 INFO:tensorflow:loss = 2.251912, step = 500 (0.490 sec) INFO:tensorflow:loss = 2.251912, step = 500 (0.490 sec) INFO:tensorflow:global_step/sec: 201.747 INFO:tensorflow:global_step/sec: 201.747 INFO:tensorflow:loss = 2.2633677, step = 600 (0.496 sec) INFO:tensorflow:loss = 2.2633677, step = 600 (0.496 sec) INFO:tensorflow:global_step/sec: 206.079 INFO:tensorflow:global_step/sec: 206.079 INFO:tensorflow:loss = 2.2531767, step = 700 (0.485 sec) INFO:tensorflow:loss = 2.2531767, step = 700 (0.485 sec) INFO:tensorflow:global_step/sec: 231.299 INFO:tensorflow:global_step/sec: 231.299 INFO:tensorflow:loss = 2.2578738, step = 800 (0.433 sec) INFO:tensorflow:loss = 2.2578738, step = 800 (0.433 sec) INFO:tensorflow:global_step/sec: 657.044 INFO:tensorflow:global_step/sec: 657.044 INFO:tensorflow:loss = 2.2344787, step = 900 (0.150 sec) INFO:tensorflow:loss = 2.2344787, step = 900 (0.150 sec) INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 938... INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 938... INFO:tensorflow:Saving checkpoints for 938 into /tmp/multiworker/model.ckpt. INFO:tensorflow:Saving checkpoints for 938 into /tmp/multiworker/model.ckpt. INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 938... INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 938... INFO:tensorflow:Calling model_fn. INFO:tensorflow:Calling model_fn. INFO:tensorflow:Done calling model_fn. INFO:tensorflow:Done calling model_fn. INFO:tensorflow:Starting evaluation at 2022-01-26T05:29:56 INFO:tensorflow:Starting evaluation at 2022-01-26T05:29:56 INFO:tensorflow:Graph was finalized. INFO:tensorflow:Graph was finalized. INFO:tensorflow:Restoring parameters from /tmp/multiworker/model.ckpt-938 INFO:tensorflow:Restoring parameters from /tmp/multiworker/model.ckpt-938 INFO:tensorflow:Running local_init_op. INFO:tensorflow:Running local_init_op. INFO:tensorflow:Done running local_init_op. INFO:tensorflow:Done running local_init_op. INFO:tensorflow:Evaluation [10/100] INFO:tensorflow:Evaluation [10/100] INFO:tensorflow:Evaluation [20/100] INFO:tensorflow:Evaluation [20/100] INFO:tensorflow:Evaluation [30/100] INFO:tensorflow:Evaluation [30/100] INFO:tensorflow:Evaluation [40/100] INFO:tensorflow:Evaluation [40/100] INFO:tensorflow:Evaluation [50/100] INFO:tensorflow:Evaluation [50/100] INFO:tensorflow:Evaluation [60/100] INFO:tensorflow:Evaluation [60/100] INFO:tensorflow:Evaluation [70/100] INFO:tensorflow:Evaluation [70/100] INFO:tensorflow:Evaluation [80/100] INFO:tensorflow:Evaluation [80/100] INFO:tensorflow:Evaluation [90/100] INFO:tensorflow:Evaluation [90/100] INFO:tensorflow:Evaluation [100/100] INFO:tensorflow:Evaluation [100/100] INFO:tensorflow:Inference Time : 2.04637s INFO:tensorflow:Inference Time : 2.04637s INFO:tensorflow:Finished evaluation at 2022-01-26-05:29:58 INFO:tensorflow:Finished evaluation at 2022-01-26-05:29:58 INFO:tensorflow:Saving dict for global step 938: global_step = 938, loss = 2.234131 INFO:tensorflow:Saving dict for global step 938: global_step = 938, loss = 2.234131 INFO:tensorflow:Saving 'checkpoint_path' summary for global step 938: /tmp/multiworker/model.ckpt-938 INFO:tensorflow:Saving 'checkpoint_path' summary for global step 938: /tmp/multiworker/model.ckpt-938 INFO:tensorflow:Loss for final step: 1.10881. INFO:tensorflow:Loss for final step: 1.10881. ({'loss': 2.234131, 'global_step': 938}, [])
Otimize o desempenho do treinamento
Agora você tem um modelo e um estimador com capacidade para vários trabalhadores, desenvolvido por tf.distribute.Strategy
. Você pode tentar as seguintes técnicas para otimizar o desempenho do treinamento de vários trabalhadores:
- Aumente o tamanho do lote: O tamanho do lote especificado aqui é por GPU. Em geral, é aconselhável o maior tamanho de lote que se ajuste à memória da GPU.
- Cast variáveis: Cast as variáveis para
tf.float
se possível. O modelo oficial da ResNet inclui um exemplo de como isso pode ser feito. Use comunicação coletiva:
MultiWorkerMirroredStrategy
fornece várias implementações de comunicação coletiva .- O
RING
implementa coletivos baseados em anel usando gRPC como a camada de comunicação entre hosts. -
NCCL
usa NCCL da Nvidia para implementar coletivos. -
AUTO
adia a escolha para o tempo de execução.
A melhor escolha de implementação coletiva depende do número e tipo de GPUs e da interconexão de rede no cluster. Para substituir a escolha automática, especifique um valor válido para o parâmetro de
communication
do construtor deMultiWorkerMirroredStrategy
, por exemplo,communication=tf.distribute.experimental.CollectiveCommunication.NCCL
.- O
Visite a seção Desempenho no guia para saber mais sobre outras estratégias e ferramentas que você pode usar para otimizar o desempenho de seus modelos do TensorFlow.
Outros exemplos de código
- Exemplo de ponta a ponta para treinamento de vários trabalhadores no tensorflow/ecossistema usando modelos do Kubernetes. Este exemplo começa com um modelo Keras e o converte em um estimador usando a API
tf.keras.estimator.model_to_estimator
. - Modelos oficiais , muitos dos quais podem ser configurados para executar várias estratégias de distribuição.