Veja no TensorFlow.org | Executar no Google Colab | Ver fonte no GitHub | Baixar caderno |
Visão geral
Este tutorial demonstra o treinamento de vários trabalhadores com API de loop de treinamento personalizado, distribuído via MultiWorkerMirroredStrategy, para que um modelo Keras projetado para ser executado em um único trabalhador possa funcionar perfeitamente em vários trabalhadores com alteração mínima de código.
Estamos usando loops de treinamento personalizados para treinar nosso modelo porque eles nos dão flexibilidade e maior controle no treinamento. Além disso, é mais fácil depurar o modelo e o loop de treinamento. Informações mais detalhadas estão disponíveis em Escrevendo um loop de treinamento do zero .
Se você estiver procurando como usar o MultiWorkerMirroredStrategy
com keras model.fit
, consulte este tutorial .
O guia Treinamento distribuído no TensorFlow está disponível para uma visão geral das estratégias de distribuição que o TensorFlow suporta para os interessados em uma compreensão mais profunda das APIs tf.distribute.Strategy
.
Configurar
Primeiro, algumas importações necessárias.
import json
import os
import sys
Antes de importar o TensorFlow, faça algumas alterações no ambiente.
Desative todas as GPUs. Isso evita erros causados por todos os trabalhadores tentando usar a mesma GPU. Para uma aplicação real, cada trabalhador estaria em uma máquina diferente.
os.environ["CUDA_VISIBLE_DEVICES"] = "-1"
Redefina a variável de ambiente TF_CONFIG
, você verá mais sobre isso mais tarde.
os.environ.pop('TF_CONFIG', None)
Certifique-se de que o diretório atual esteja no caminho do python. Isso permite que o notebook importe os arquivos gravados por %%writefile
posteriormente.
if '.' not in sys.path:
sys.path.insert(0, '.')
Agora importe o TensorFlow.
import tensorflow as tf
Definição de conjunto de dados e modelo
Em seguida, crie um arquivo mnist.py
com uma configuração simples de modelo e conjunto de dados. Este arquivo python será usado pelos processos de trabalho neste tutorial:
%%writefile mnist.py
import os
import tensorflow as tf
import numpy as np
def mnist_dataset(batch_size):
(x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
# The `x` arrays are in uint8 and have values in the range [0, 255].
# You need to convert them to float32 with values in the range [0, 1]
x_train = x_train / np.float32(255)
y_train = y_train.astype(np.int64)
train_dataset = tf.data.Dataset.from_tensor_slices(
(x_train, y_train)).shuffle(60000)
return train_dataset
def dataset_fn(global_batch_size, input_context):
batch_size = input_context.get_per_replica_batch_size(global_batch_size)
dataset = mnist_dataset(batch_size)
dataset = dataset.shard(input_context.num_input_pipelines,
input_context.input_pipeline_id)
dataset = dataset.batch(batch_size)
return dataset
def build_cnn_model():
return tf.keras.Sequential([
tf.keras.Input(shape=(28, 28)),
tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
tf.keras.layers.Conv2D(32, 3, activation='relu'),
tf.keras.layers.Flatten(),
tf.keras.layers.Dense(128, activation='relu'),
tf.keras.layers.Dense(10)
])
Writing mnist.py
Configuração de vários trabalhadores
Agora vamos entrar no mundo do treinamento de vários trabalhadores. No TensorFlow, a variável de ambiente TF_CONFIG
é necessária para treinamento em várias máquinas, cada uma com uma função diferente. TF_CONFIG
usado abaixo, é uma string JSON usada para especificar a configuração do cluster em cada trabalhador que faz parte do cluster. Este é o método padrão para especificar um cluster, usando cluster_resolver.TFConfigClusterResolver
, mas há outras opções disponíveis no módulo distribute.cluster_resolver
.
Descreva seu cluster
Aqui está um exemplo de configuração:
tf_config = {
'cluster': {
'worker': ['localhost:12345', 'localhost:23456']
},
'task': {'type': 'worker', 'index': 0}
}
Aqui está o mesmo TF_CONFIG
serializado como uma string JSON:
json.dumps(tf_config)
'{"cluster": {"worker": ["localhost:12345", "localhost:23456"]}, "task": {"type": "worker", "index": 0} }'
Existem dois componentes do TF_CONFIG
: cluster
e task
.
cluster
é o mesmo para todos os trabalhadores e fornece informações sobre o cluster de treinamento, que é um dict que consiste em diferentes tipos de trabalhos, comoworker
. No treinamento de vários trabalhadores com oMultiWorkerMirroredStrategy
, geralmente há umworker
que assume um pouco mais de responsabilidade, como salvar o ponto de verificação e escrever o arquivo de resumo para o TensorBoard, além do que umworker
comum faz. Tal trabalhador é referido como o trabalhadorchief
, e é costume que oworker
comindex
0 seja apontado como oworker
chefe (na verdade, é assim quetf.distribute.Strategy
é implementado).task
fornece informações da tarefa atual e é diferente em cada trabalhador. Ele especifica otype
e oindex
desse trabalhador.
Neste exemplo, você define o type
de tarefa como "worker"
e o index
da tarefa como 0
. Esta máquina é a primeira operária e será apontada como a principal e fará mais trabalho que as demais. Observe que outras máquinas também precisarão ter a variável de ambiente TF_CONFIG
definida, e ela deve ter o mesmo dict de cluster
, mas um type
de tarefa ou index
de tarefa diferente, dependendo de quais são as funções dessas máquinas.
Para fins de ilustração, este tutorial mostra como se pode definir um TF_CONFIG
com 2 workers em localhost
. Na prática, os usuários criariam vários trabalhadores em endereços/portas IP externas e TF_CONFIG
em cada trabalhador adequadamente.
Neste exemplo você usará 2 trabalhadores, o TF_CONFIG
do primeiro trabalhador é mostrado acima. Para o segundo trabalhador, você tf_config['task']['index']=1
Acima, tf_config
é apenas uma variável local em python. Para realmente usá-lo para configurar o treinamento, esse dicionário precisa ser serializado como JSON e colocado na variável de ambiente TF_CONFIG
.
Variáveis de ambiente e subprocessos em notebooks
Os subprocessos herdam as variáveis de ambiente de seu pai. Então, se você definir uma variável de ambiente neste processo jupyter notebook
:
os.environ['GREETINGS'] = 'Hello TensorFlow!'
Você pode acessar a variável de ambiente de um subprocesso:
echo ${GREETINGS}
Hello TensorFlow!
Na próxima seção, você usará isso para passar o TF_CONFIG
para os subprocessos de trabalho. Você nunca realmente iniciaria seus trabalhos dessa maneira, mas é suficiente para os propósitos deste tutorial: Demonstrar um exemplo mínimo de vários trabalhadores.
MultiWorkerMirroredStrategy
Para treinar o modelo, use uma instância de tf.distribute.MultiWorkerMirroredStrategy
, que cria cópias de todas as variáveis nas camadas do modelo em cada dispositivo em todos os trabalhadores. O guia tf.distribute.Strategy
tem mais detalhes sobre essa estratégia.
strategy = tf.distribute.MultiWorkerMirroredStrategy()
2022-01-26 05:35:39.353025: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:CPU:0',), communication = CommunicationImplementation.AUTO 2022-01-26 05:35:39.353298: E tensorflow/stream_executor/cuda/cuda_diagnostics.cc:313] kernel version 470.63.1 does not match DSO version 470.86.0 -- cannot find working devices in this configuration
Use tf.distribute.Strategy.scope
para especificar que uma estratégia deve ser usada ao construir seu modelo. Isso coloca você no " contexto de réplica cruzada " para essa estratégia, o que significa que a estratégia é colocada no controle de coisas como o posicionamento da variável.
import mnist
with strategy.scope():
# Model building needs to be within `strategy.scope()`.
multi_worker_model = mnist.build_cnn_model()
Partilhe automaticamente seus dados entre os funcionários
No treinamento de vários trabalhadores, a fragmentação do conjunto de dados não é necessariamente necessária, mas fornece uma semântica exatamente uma vez que torna mais treinamento mais reprodutível, ou seja, o treinamento em vários trabalhadores deve ser o mesmo que o treinamento em um trabalhador. Nota: o desempenho pode ser afetado em alguns casos.
Veja: distribute_datasets_from_function
per_worker_batch_size = 64
num_workers = len(tf_config['cluster']['worker'])
global_batch_size = per_worker_batch_size * num_workers
with strategy.scope():
multi_worker_dataset = strategy.distribute_datasets_from_function(
lambda input_context: mnist.dataset_fn(global_batch_size, input_context))
Defina o loop de treinamento personalizado e treine o modelo
Especifique um otimizador
with strategy.scope():
# The creation of optimizer and train_accuracy will need to be in
# `strategy.scope()` as well, since they create variables.
optimizer = tf.keras.optimizers.RMSprop(learning_rate=0.001)
train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
name='train_accuracy')
Defina uma etapa de treinamento com tf.function
@tf.function
def train_step(iterator):
"""Training step function."""
def step_fn(inputs):
"""Per-Replica step function."""
x, y = inputs
with tf.GradientTape() as tape:
predictions = multi_worker_model(x, training=True)
per_batch_loss = tf.keras.losses.SparseCategoricalCrossentropy(
from_logits=True,
reduction=tf.keras.losses.Reduction.NONE)(y, predictions)
loss = tf.nn.compute_average_loss(
per_batch_loss, global_batch_size=global_batch_size)
grads = tape.gradient(loss, multi_worker_model.trainable_variables)
optimizer.apply_gradients(
zip(grads, multi_worker_model.trainable_variables))
train_accuracy.update_state(y, predictions)
return loss
per_replica_losses = strategy.run(step_fn, args=(next(iterator),))
return strategy.reduce(
tf.distribute.ReduceOp.SUM, per_replica_losses, axis=None)
Salvar e restaurar pontos de verificação
A implementação do ponto de verificação em um loop de treinamento personalizado exige que o usuário o manipule em vez de usar um retorno de chamada keras. Permite salvar os pesos do modelo e restaurá-los sem precisar salvar o modelo inteiro.
from multiprocessing import util
checkpoint_dir = os.path.join(util.get_temp_dir(), 'ckpt')
def _is_chief(task_type, task_id, cluster_spec):
return (task_type is None
or task_type == 'chief'
or (task_type == 'worker'
and task_id == 0
and "chief" not in cluster_spec.as_dict()))
def _get_temp_dir(dirpath, task_id):
base_dirpath = 'workertemp_' + str(task_id)
temp_dir = os.path.join(dirpath, base_dirpath)
tf.io.gfile.makedirs(temp_dir)
return temp_dir
def write_filepath(filepath, task_type, task_id, cluster_spec):
dirpath = os.path.dirname(filepath)
base = os.path.basename(filepath)
if not _is_chief(task_type, task_id, cluster_spec):
dirpath = _get_temp_dir(dirpath, task_id)
return os.path.join(dirpath, base)
Aqui, você criará um tf.train.Checkpoint
que rastreia o modelo, que é gerenciado por um tf.train.CheckpointManager
para que apenas o último checkpoint seja preservado.
epoch = tf.Variable(
initial_value=tf.constant(0, dtype=tf.dtypes.int64), name='epoch')
step_in_epoch = tf.Variable(
initial_value=tf.constant(0, dtype=tf.dtypes.int64),
name='step_in_epoch')
task_type, task_id = (strategy.cluster_resolver.task_type,
strategy.cluster_resolver.task_id)
# We normally don't need to manually instantiate a ClusterSpec, but in this
# illustrative example we did not set TF_CONFIG before initializing the
# strategy. See the next section for "real-world" usage.
cluster_spec = tf.train.ClusterSpec(tf_config['cluster'])
checkpoint = tf.train.Checkpoint(
model=multi_worker_model, epoch=epoch, step_in_epoch=step_in_epoch)
write_checkpoint_dir = write_filepath(checkpoint_dir, task_type, task_id,
cluster_spec)
checkpoint_manager = tf.train.CheckpointManager(
checkpoint, directory=write_checkpoint_dir, max_to_keep=1)
Agora, quando você precisar restaurar, poderá encontrar o último ponto de verificação salvo usando a conveniente função tf.train.latest_checkpoint
.
latest_checkpoint = tf.train.latest_checkpoint(checkpoint_dir)
if latest_checkpoint:
checkpoint.restore(latest_checkpoint)
Depois de restaurar o ponto de verificação, você pode continuar treinando seu loop de treinamento personalizado.
num_epochs = 3
num_steps_per_epoch = 70
while epoch.numpy() < num_epochs:
iterator = iter(multi_worker_dataset)
total_loss = 0.0
num_batches = 0
while step_in_epoch.numpy() < num_steps_per_epoch:
total_loss += train_step(iterator)
num_batches += 1
step_in_epoch.assign_add(1)
train_loss = total_loss / num_batches
print('Epoch: %d, accuracy: %f, train_loss: %f.'
%(epoch.numpy(), train_accuracy.result(), train_loss))
train_accuracy.reset_states()
# Once the `CheckpointManager` is set up, you're now ready to save, and remove
# the checkpoints non-chief workers saved.
checkpoint_manager.save()
if not _is_chief(task_type, task_id, cluster_spec):
tf.io.gfile.rmtree(write_checkpoint_dir)
epoch.assign_add(1)
step_in_epoch.assign(0)
2022-01-26 05:35:40.200068: W tensorflow/core/framework/dataset.cc:768] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations. Epoch: 0, accuracy: 0.849107, train_loss: 0.491886. Epoch: 1, accuracy: 0.937835, train_loss: 0.197650. Epoch: 2, accuracy: 0.963170, train_loss: 0.129683.
Configuração de código completo em workers
Para realmente executar com o MultiWorkerMirroredStrategy
, você precisará executar processos de trabalho e passar um TF_CONFIG
para eles.
Como o arquivo mnist.py
escrito anteriormente, aqui está o main.py
que contém o mesmo código que percorremos passo a passo anteriormente neste colab, estamos apenas gravando-o em um arquivo para que cada um dos trabalhadores o execute:
Arquivo: main.py
%%writefile main.py
import os
import json
import tensorflow as tf
import mnist
from multiprocessing import util
per_worker_batch_size = 64
tf_config = json.loads(os.environ['TF_CONFIG'])
num_workers = len(tf_config['cluster']['worker'])
global_batch_size = per_worker_batch_size * num_workers
num_epochs = 3
num_steps_per_epoch=70
# Checkpoint saving and restoring
def _is_chief(task_type, task_id, cluster_spec):
return (task_type is None
or task_type == 'chief'
or (task_type == 'worker'
and task_id == 0
and 'chief' not in cluster_spec.as_dict()))
def _get_temp_dir(dirpath, task_id):
base_dirpath = 'workertemp_' + str(task_id)
temp_dir = os.path.join(dirpath, base_dirpath)
tf.io.gfile.makedirs(temp_dir)
return temp_dir
def write_filepath(filepath, task_type, task_id, cluster_spec):
dirpath = os.path.dirname(filepath)
base = os.path.basename(filepath)
if not _is_chief(task_type, task_id, cluster_spec):
dirpath = _get_temp_dir(dirpath, task_id)
return os.path.join(dirpath, base)
checkpoint_dir = os.path.join(util.get_temp_dir(), 'ckpt')
# Define Strategy
strategy = tf.distribute.MultiWorkerMirroredStrategy()
with strategy.scope():
# Model building/compiling need to be within `strategy.scope()`.
multi_worker_model = mnist.build_cnn_model()
multi_worker_dataset = strategy.distribute_datasets_from_function(
lambda input_context: mnist.dataset_fn(global_batch_size, input_context))
optimizer = tf.keras.optimizers.RMSprop(learning_rate=0.001)
train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
name='train_accuracy')
@tf.function
def train_step(iterator):
"""Training step function."""
def step_fn(inputs):
"""Per-Replica step function."""
x, y = inputs
with tf.GradientTape() as tape:
predictions = multi_worker_model(x, training=True)
per_batch_loss = tf.keras.losses.SparseCategoricalCrossentropy(
from_logits=True,
reduction=tf.keras.losses.Reduction.NONE)(y, predictions)
loss = tf.nn.compute_average_loss(
per_batch_loss, global_batch_size=global_batch_size)
grads = tape.gradient(loss, multi_worker_model.trainable_variables)
optimizer.apply_gradients(
zip(grads, multi_worker_model.trainable_variables))
train_accuracy.update_state(y, predictions)
return loss
per_replica_losses = strategy.run(step_fn, args=(next(iterator),))
return strategy.reduce(
tf.distribute.ReduceOp.SUM, per_replica_losses, axis=None)
epoch = tf.Variable(
initial_value=tf.constant(0, dtype=tf.dtypes.int64), name='epoch')
step_in_epoch = tf.Variable(
initial_value=tf.constant(0, dtype=tf.dtypes.int64),
name='step_in_epoch')
task_type, task_id, cluster_spec = (strategy.cluster_resolver.task_type,
strategy.cluster_resolver.task_id,
strategy.cluster_resolver.cluster_spec())
checkpoint = tf.train.Checkpoint(
model=multi_worker_model, epoch=epoch, step_in_epoch=step_in_epoch)
write_checkpoint_dir = write_filepath(checkpoint_dir, task_type, task_id,
cluster_spec)
checkpoint_manager = tf.train.CheckpointManager(
checkpoint, directory=write_checkpoint_dir, max_to_keep=1)
# Restoring the checkpoint
latest_checkpoint = tf.train.latest_checkpoint(checkpoint_dir)
if latest_checkpoint:
checkpoint.restore(latest_checkpoint)
# Resume our CTL training
while epoch.numpy() < num_epochs:
iterator = iter(multi_worker_dataset)
total_loss = 0.0
num_batches = 0
while step_in_epoch.numpy() < num_steps_per_epoch:
total_loss += train_step(iterator)
num_batches += 1
step_in_epoch.assign_add(1)
train_loss = total_loss / num_batches
print('Epoch: %d, accuracy: %f, train_loss: %f.'
%(epoch.numpy(), train_accuracy.result(), train_loss))
train_accuracy.reset_states()
checkpoint_manager.save()
if not _is_chief(task_type, task_id, cluster_spec):
tf.io.gfile.rmtree(write_checkpoint_dir)
epoch.assign_add(1)
step_in_epoch.assign(0)
Writing main.py
Treinar e avaliar
O diretório atual agora contém os dois arquivos Python:
ls *.py
main.py mnist.py
Então json-serialize o TF_CONFIG
e adicione-o às variáveis de ambiente:
os.environ['TF_CONFIG'] = json.dumps(tf_config)
Agora, você pode iniciar um processo de trabalho que executará o main.py
e usará o TF_CONFIG
:
# first kill any previous runs
%killbgscripts
All background processes were killed.
python main.py &> job_0.log
Há algumas coisas a serem observadas sobre o comando acima:
- Ele usa o
%%bash
que é uma "mágica" de notebook para executar alguns comandos do bash. - Ele usa o sinalizador
--bg
para executar o processobash
em segundo plano, porque esse trabalhador não será encerrado. Ele espera por todos os trabalhadores antes de começar.
O processo de trabalho em segundo plano não imprimirá a saída neste notebook, então o &>
redireciona sua saída para um arquivo, para que você possa ver o que aconteceu.
Então, espere alguns segundos para o processo iniciar:
import time
time.sleep(20)
Agora veja o que foi gerado no arquivo de log do trabalhador até agora:
cat job_0.log
2022-01-26 05:35:49.225025: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected 2022-01-26 05:35:49.225297: E tensorflow/stream_executor/cuda/cuda_diagnostics.cc:313] kernel version 470.63.1 does not match DSO version 470.86.0 -- cannot find working devices in this configuration
A última linha do arquivo de log deve dizer: Started server with target: grpc://localhost:12345
. O primeiro trabalhador está agora pronto e está esperando que todos os outros trabalhadores estejam prontos para prosseguir.
Então atualize o tf_config
para o processo do segundo trabalhador pegar:
tf_config['task']['index'] = 1
os.environ['TF_CONFIG'] = json.dumps(tf_config)
Agora inicie o segundo trabalhador. Isso iniciará o treinamento, pois todos os trabalhadores estão ativos (portanto, não há necessidade de fazer esse processo em segundo plano):
python main.py > /dev/null 2>&1
Agora, se você verificar novamente os logs escritos pelo primeiro trabalhador, verá que ele participou do treinamento desse modelo:
cat job_0.log
2022-01-26 05:35:49.225025: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected 2022-01-26 05:35:49.225297: E tensorflow/stream_executor/cuda/cuda_diagnostics.cc:313] kernel version 470.63.1 does not match DSO version 470.86.0 -- cannot find working devices in this configuration 2022-01-26 05:36:10.343173: W tensorflow/core/framework/dataset.cc:768] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations. Epoch: 0, accuracy: 0.832589, train_loss: 0.531260. Epoch: 1, accuracy: 0.936161, train_loss: 0.214774. Epoch: 2, accuracy: 0.958594, train_loss: 0.140772.
# Delete the `TF_CONFIG`, and kill any background tasks so they don't affect the next section.
os.environ.pop('TF_CONFIG', None)
%killbgscripts
All background processes were killed.
Treinamento de vários trabalhadores em profundidade
Este tutorial demonstrou um fluxo de trabalho de Custom Training Loop
da configuração de vários trabalhadores. Uma descrição detalhada de outros tópicos está disponível no model.fit's guide
configuração de vários trabalhadores e aplicável a CTLs.
Veja também
- O guia Treinamento distribuído no TensorFlow fornece uma visão geral das estratégias de distribuição disponíveis.
- Modelos oficiais , muitos dos quais podem ser configurados para executar várias estratégias de distribuição.
- A seção Desempenho no guia fornece informações sobre outras estratégias e ferramentas que você pode usar para otimizar o desempenho de seus modelos do TensorFlow.