Loop de treinamento personalizado com Keras e MultiWorkerMirroredStrategy

Veja no TensorFlow.org Executar no Google Colab Ver fonte no GitHub Baixar caderno

Visão geral

Este tutorial demonstra o treinamento de vários trabalhadores com API de loop de treinamento personalizado, distribuído via MultiWorkerMirroredStrategy, para que um modelo Keras projetado para ser executado em um único trabalhador possa funcionar perfeitamente em vários trabalhadores com alteração mínima de código.

Estamos usando loops de treinamento personalizados para treinar nosso modelo porque eles nos dão flexibilidade e maior controle no treinamento. Além disso, é mais fácil depurar o modelo e o loop de treinamento. Informações mais detalhadas estão disponíveis em Escrevendo um loop de treinamento do zero .

Se você estiver procurando como usar o MultiWorkerMirroredStrategy com keras model.fit , consulte este tutorial .

O guia Treinamento distribuído no TensorFlow está disponível para uma visão geral das estratégias de distribuição que o TensorFlow suporta para os interessados ​​em uma compreensão mais profunda das APIs tf.distribute.Strategy .

Configurar

Primeiro, algumas importações necessárias.

import json
import os
import sys

Antes de importar o TensorFlow, faça algumas alterações no ambiente.

Desative todas as GPUs. Isso evita erros causados ​​por todos os trabalhadores tentando usar a mesma GPU. Para uma aplicação real, cada trabalhador estaria em uma máquina diferente.

os.environ["CUDA_VISIBLE_DEVICES"] = "-1"

Redefina a variável de ambiente TF_CONFIG , você verá mais sobre isso mais tarde.

os.environ.pop('TF_CONFIG', None)

Certifique-se de que o diretório atual esteja no caminho do python. Isso permite que o notebook importe os arquivos gravados por %%writefile posteriormente.

if '.' not in sys.path:
  sys.path.insert(0, '.')

Agora importe o TensorFlow.

import tensorflow as tf

Definição de conjunto de dados e modelo

Em seguida, crie um arquivo mnist.py com uma configuração simples de modelo e conjunto de dados. Este arquivo python será usado pelos processos de trabalho neste tutorial:

%%writefile mnist.py

import os
import tensorflow as tf
import numpy as np

def mnist_dataset(batch_size):
  (x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
  # The `x` arrays are in uint8 and have values in the range [0, 255].
  # You need to convert them to float32 with values in the range [0, 1]
  x_train = x_train / np.float32(255)
  y_train = y_train.astype(np.int64)
  train_dataset = tf.data.Dataset.from_tensor_slices(
      (x_train, y_train)).shuffle(60000)
  return train_dataset

def dataset_fn(global_batch_size, input_context):
  batch_size = input_context.get_per_replica_batch_size(global_batch_size)
  dataset = mnist_dataset(batch_size)
  dataset = dataset.shard(input_context.num_input_pipelines,
                          input_context.input_pipeline_id)
  dataset = dataset.batch(batch_size)
  return dataset

def build_cnn_model():
  return tf.keras.Sequential([
      tf.keras.Input(shape=(28, 28)),
      tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
      tf.keras.layers.Conv2D(32, 3, activation='relu'),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(128, activation='relu'),
      tf.keras.layers.Dense(10)
  ])
Writing mnist.py

Configuração de vários trabalhadores

Agora vamos entrar no mundo do treinamento de vários trabalhadores. No TensorFlow, a variável de ambiente TF_CONFIG é necessária para treinamento em várias máquinas, cada uma com uma função diferente. TF_CONFIG usado abaixo, é uma string JSON usada para especificar a configuração do cluster em cada trabalhador que faz parte do cluster. Este é o método padrão para especificar um cluster, usando cluster_resolver.TFConfigClusterResolver , mas há outras opções disponíveis no módulo distribute.cluster_resolver .

Descreva seu cluster

Aqui está um exemplo de configuração:

tf_config = {
    'cluster': {
        'worker': ['localhost:12345', 'localhost:23456']
    },
    'task': {'type': 'worker', 'index': 0}
}

Aqui está o mesmo TF_CONFIG serializado como uma string JSON:

json.dumps(tf_config)
'{"cluster": {"worker": ["localhost:12345", "localhost:23456"]}, "task": {"type": "worker", "index": 0} }'

Existem dois componentes do TF_CONFIG : cluster e task .

  • cluster é o mesmo para todos os trabalhadores e fornece informações sobre o cluster de treinamento, que é um dict que consiste em diferentes tipos de trabalhos, como worker . No treinamento de vários trabalhadores com o MultiWorkerMirroredStrategy , geralmente há um worker que assume um pouco mais de responsabilidade, como salvar o ponto de verificação e escrever o arquivo de resumo para o TensorBoard, além do que um worker comum faz. Tal trabalhador é referido como o trabalhador chief , e é costume que o worker com index 0 seja apontado como o worker chefe (na verdade, é assim que tf.distribute.Strategy é implementado).

  • task fornece informações da tarefa atual e é diferente em cada trabalhador. Ele especifica o type e o index desse trabalhador.

Neste exemplo, você define o type de tarefa como "worker" e o index da tarefa como 0 . Esta máquina é a primeira operária e será apontada como a principal e fará mais trabalho que as demais. Observe que outras máquinas também precisarão ter a variável de ambiente TF_CONFIG definida, e ela deve ter o mesmo dict de cluster , mas um type de tarefa ou index de tarefa diferente, dependendo de quais são as funções dessas máquinas.

Para fins de ilustração, este tutorial mostra como se pode definir um TF_CONFIG com 2 workers em localhost . Na prática, os usuários criariam vários trabalhadores em endereços/portas IP externas e TF_CONFIG em cada trabalhador adequadamente.

Neste exemplo você usará 2 trabalhadores, o TF_CONFIG do primeiro trabalhador é mostrado acima. Para o segundo trabalhador, você tf_config['task']['index']=1

Acima, tf_config é apenas uma variável local em python. Para realmente usá-lo para configurar o treinamento, esse dicionário precisa ser serializado como JSON e colocado na variável de ambiente TF_CONFIG .

Variáveis ​​de ambiente e subprocessos em notebooks

Os subprocessos herdam as variáveis ​​de ambiente de seu pai. Então, se você definir uma variável de ambiente neste processo jupyter notebook :

os.environ['GREETINGS'] = 'Hello TensorFlow!'

Você pode acessar a variável de ambiente de um subprocesso:

echo ${GREETINGS}
Hello TensorFlow!

Na próxima seção, você usará isso para passar o TF_CONFIG para os subprocessos de trabalho. Você nunca realmente iniciaria seus trabalhos dessa maneira, mas é suficiente para os propósitos deste tutorial: Demonstrar um exemplo mínimo de vários trabalhadores.

MultiWorkerMirroredStrategy

Para treinar o modelo, use uma instância de tf.distribute.MultiWorkerMirroredStrategy , que cria cópias de todas as variáveis ​​nas camadas do modelo em cada dispositivo em todos os trabalhadores. O guia tf.distribute.Strategy tem mais detalhes sobre essa estratégia.

strategy = tf.distribute.MultiWorkerMirroredStrategy()
2022-01-26 05:35:39.353025: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:CPU:0',), communication = CommunicationImplementation.AUTO
2022-01-26 05:35:39.353298: E tensorflow/stream_executor/cuda/cuda_diagnostics.cc:313] kernel version 470.63.1 does not match DSO version 470.86.0 -- cannot find working devices in this configuration

Use tf.distribute.Strategy.scope para especificar que uma estratégia deve ser usada ao construir seu modelo. Isso coloca você no " contexto de réplica cruzada " para essa estratégia, o que significa que a estratégia é colocada no controle de coisas como o posicionamento da variável.

import mnist
with strategy.scope():
  # Model building needs to be within `strategy.scope()`.
  multi_worker_model = mnist.build_cnn_model()

Partilhe automaticamente seus dados entre os funcionários

No treinamento de vários trabalhadores, a fragmentação do conjunto de dados não é necessariamente necessária, mas fornece uma semântica exatamente uma vez que torna mais treinamento mais reprodutível, ou seja, o treinamento em vários trabalhadores deve ser o mesmo que o treinamento em um trabalhador. Nota: o desempenho pode ser afetado em alguns casos.

Veja: distribute_datasets_from_function

per_worker_batch_size = 64
num_workers = len(tf_config['cluster']['worker'])
global_batch_size = per_worker_batch_size * num_workers

with strategy.scope():
  multi_worker_dataset = strategy.distribute_datasets_from_function(
      lambda input_context: mnist.dataset_fn(global_batch_size, input_context))

Defina o loop de treinamento personalizado e treine o modelo

Especifique um otimizador

with strategy.scope():
  # The creation of optimizer and train_accuracy will need to be in
  # `strategy.scope()` as well, since they create variables.
  optimizer = tf.keras.optimizers.RMSprop(learning_rate=0.001)
  train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
      name='train_accuracy')

Defina uma etapa de treinamento com tf.function

@tf.function
def train_step(iterator):
  """Training step function."""

  def step_fn(inputs):
    """Per-Replica step function."""
    x, y = inputs
    with tf.GradientTape() as tape:
      predictions = multi_worker_model(x, training=True)
      per_batch_loss = tf.keras.losses.SparseCategoricalCrossentropy(
          from_logits=True,
          reduction=tf.keras.losses.Reduction.NONE)(y, predictions)
      loss = tf.nn.compute_average_loss(
          per_batch_loss, global_batch_size=global_batch_size)

    grads = tape.gradient(loss, multi_worker_model.trainable_variables)
    optimizer.apply_gradients(
        zip(grads, multi_worker_model.trainable_variables))
    train_accuracy.update_state(y, predictions)
    return loss

  per_replica_losses = strategy.run(step_fn, args=(next(iterator),))
  return strategy.reduce(
      tf.distribute.ReduceOp.SUM, per_replica_losses, axis=None)

Salvar e restaurar pontos de verificação

A implementação do ponto de verificação em um loop de treinamento personalizado exige que o usuário o manipule em vez de usar um retorno de chamada keras. Permite salvar os pesos do modelo e restaurá-los sem precisar salvar o modelo inteiro.

from multiprocessing import util
checkpoint_dir = os.path.join(util.get_temp_dir(), 'ckpt')

def _is_chief(task_type, task_id, cluster_spec):
  return (task_type is None
          or task_type == 'chief'
          or (task_type == 'worker'
              and task_id == 0
              and "chief" not in cluster_spec.as_dict()))

def _get_temp_dir(dirpath, task_id):
  base_dirpath = 'workertemp_' + str(task_id)
  temp_dir = os.path.join(dirpath, base_dirpath)
  tf.io.gfile.makedirs(temp_dir)
  return temp_dir

def write_filepath(filepath, task_type, task_id, cluster_spec):
  dirpath = os.path.dirname(filepath)
  base = os.path.basename(filepath)
  if not _is_chief(task_type, task_id, cluster_spec):
    dirpath = _get_temp_dir(dirpath, task_id)
  return os.path.join(dirpath, base)

Aqui, você criará um tf.train.Checkpoint que rastreia o modelo, que é gerenciado por um tf.train.CheckpointManager para que apenas o último checkpoint seja preservado.

epoch = tf.Variable(
    initial_value=tf.constant(0, dtype=tf.dtypes.int64), name='epoch')
step_in_epoch = tf.Variable(
    initial_value=tf.constant(0, dtype=tf.dtypes.int64),
    name='step_in_epoch')
task_type, task_id = (strategy.cluster_resolver.task_type,
                      strategy.cluster_resolver.task_id)
# We normally don't need to manually instantiate a ClusterSpec, but in this 
# illustrative example we did not set TF_CONFIG before initializing the
# strategy. See the next section for "real-world" usage.
cluster_spec = tf.train.ClusterSpec(tf_config['cluster'])

checkpoint = tf.train.Checkpoint(
    model=multi_worker_model, epoch=epoch, step_in_epoch=step_in_epoch)

write_checkpoint_dir = write_filepath(checkpoint_dir, task_type, task_id,
                                      cluster_spec)
checkpoint_manager = tf.train.CheckpointManager(
    checkpoint, directory=write_checkpoint_dir, max_to_keep=1)

Agora, quando você precisar restaurar, poderá encontrar o último ponto de verificação salvo usando a conveniente função tf.train.latest_checkpoint .

latest_checkpoint = tf.train.latest_checkpoint(checkpoint_dir)
if latest_checkpoint:
  checkpoint.restore(latest_checkpoint)

Depois de restaurar o ponto de verificação, você pode continuar treinando seu loop de treinamento personalizado.

num_epochs = 3
num_steps_per_epoch = 70

while epoch.numpy() < num_epochs:
  iterator = iter(multi_worker_dataset)
  total_loss = 0.0
  num_batches = 0

  while step_in_epoch.numpy() < num_steps_per_epoch:
    total_loss += train_step(iterator)
    num_batches += 1
    step_in_epoch.assign_add(1)

  train_loss = total_loss / num_batches
  print('Epoch: %d, accuracy: %f, train_loss: %f.'
                %(epoch.numpy(), train_accuracy.result(), train_loss))

  train_accuracy.reset_states()

  # Once the `CheckpointManager` is set up, you're now ready to save, and remove
  # the checkpoints non-chief workers saved.
  checkpoint_manager.save()
  if not _is_chief(task_type, task_id, cluster_spec):
    tf.io.gfile.rmtree(write_checkpoint_dir)

  epoch.assign_add(1)
  step_in_epoch.assign(0)
2022-01-26 05:35:40.200068: W tensorflow/core/framework/dataset.cc:768] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.
Epoch: 0, accuracy: 0.849107, train_loss: 0.491886.
Epoch: 1, accuracy: 0.937835, train_loss: 0.197650.
Epoch: 2, accuracy: 0.963170, train_loss: 0.129683.

Configuração de código completo em workers

Para realmente executar com o MultiWorkerMirroredStrategy , você precisará executar processos de trabalho e passar um TF_CONFIG para eles.

Como o arquivo mnist.py escrito anteriormente, aqui está o main.py que contém o mesmo código que percorremos passo a passo anteriormente neste colab, estamos apenas gravando-o em um arquivo para que cada um dos trabalhadores o execute:

Arquivo: main.py

Writing main.py

Treinar e avaliar

O diretório atual agora contém os dois arquivos Python:

ls *.py
main.py
mnist.py

Então json-serialize o TF_CONFIG e adicione-o às variáveis ​​de ambiente:

os.environ['TF_CONFIG'] = json.dumps(tf_config)

Agora, você pode iniciar um processo de trabalho que executará o main.py e usará o TF_CONFIG :

# first kill any previous runs
%killbgscripts
All background processes were killed.
python main.py &> job_0.log

Há algumas coisas a serem observadas sobre o comando acima:

  1. Ele usa o %%bash que é uma "mágica" de notebook para executar alguns comandos do bash.
  2. Ele usa o sinalizador --bg para executar o processo bash em segundo plano, porque esse trabalhador não será encerrado. Ele espera por todos os trabalhadores antes de começar.

O processo de trabalho em segundo plano não imprimirá a saída neste notebook, então o &> redireciona sua saída para um arquivo, para que você possa ver o que aconteceu.

Então, espere alguns segundos para o processo iniciar:

import time
time.sleep(20)

Agora veja o que foi gerado no arquivo de log do trabalhador até agora:

cat job_0.log
2022-01-26 05:35:49.225025: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2022-01-26 05:35:49.225297: E tensorflow/stream_executor/cuda/cuda_diagnostics.cc:313] kernel version 470.63.1 does not match DSO version 470.86.0 -- cannot find working devices in this configuration

A última linha do arquivo de log deve dizer: Started server with target: grpc://localhost:12345 . O primeiro trabalhador está agora pronto e está esperando que todos os outros trabalhadores estejam prontos para prosseguir.

Então atualize o tf_config para o processo do segundo trabalhador pegar:

tf_config['task']['index'] = 1
os.environ['TF_CONFIG'] = json.dumps(tf_config)

Agora inicie o segundo trabalhador. Isso iniciará o treinamento, pois todos os trabalhadores estão ativos (portanto, não há necessidade de fazer esse processo em segundo plano):

python main.py > /dev/null 2>&1

Agora, se você verificar novamente os logs escritos pelo primeiro trabalhador, verá que ele participou do treinamento desse modelo:

cat job_0.log
2022-01-26 05:35:49.225025: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2022-01-26 05:35:49.225297: E tensorflow/stream_executor/cuda/cuda_diagnostics.cc:313] kernel version 470.63.1 does not match DSO version 470.86.0 -- cannot find working devices in this configuration
2022-01-26 05:36:10.343173: W tensorflow/core/framework/dataset.cc:768] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.
Epoch: 0, accuracy: 0.832589, train_loss: 0.531260.
Epoch: 1, accuracy: 0.936161, train_loss: 0.214774.
Epoch: 2, accuracy: 0.958594, train_loss: 0.140772.
# Delete the `TF_CONFIG`, and kill any background tasks so they don't affect the next section.
os.environ.pop('TF_CONFIG', None)
%killbgscripts
All background processes were killed.

Treinamento de vários trabalhadores em profundidade

Este tutorial demonstrou um fluxo de trabalho de Custom Training Loop da configuração de vários trabalhadores. Uma descrição detalhada de outros tópicos está disponível no model.fit's guide configuração de vários trabalhadores e aplicável a CTLs.

Veja também

  1. O guia Treinamento distribuído no TensorFlow fornece uma visão geral das estratégias de distribuição disponíveis.
  2. Modelos oficiais , muitos dos quais podem ser configurados para executar várias estratégias de distribuição.
  3. A seção Desempenho no guia fornece informações sobre outras estratégias e ferramentas que você pode usar para otimizar o desempenho de seus modelos do TensorFlow.