Treinamento para vários trabalhadores com Keras

Veja no TensorFlow.org Executar no Google Colab Ver fonte no GitHub Baixar caderno

Visão geral

Este tutorial demonstra como realizar treinamento distribuído para vários trabalhadores com um modelo Keras e a API Model.fit usando a API tf.distribute.Strategy — especificamente a classe tf.distribute.MultiWorkerMirroredStrategy . Com a ajuda dessa estratégia, um modelo Keras projetado para ser executado em um único trabalhador pode funcionar perfeitamente em vários trabalhadores com alterações mínimas de código.

Para aqueles interessados ​​em uma compreensão mais profunda das APIs tf.distribute.Strategy , o guia Treinamento distribuído no TensorFlow está disponível para uma visão geral das estratégias de distribuição suportadas pelo TensorFlow.

Para saber como usar o MultiWorkerMirroredStrategy com Keras e um loop de treinamento personalizado, consulte Loop de treinamento personalizado com Keras e MultiWorkerMirroredStrategy .

Observe que o objetivo deste tutorial é demonstrar um exemplo mínimo de vários trabalhadores com dois trabalhadores.

Configurar

Comece com algumas importações necessárias:

import json
import os
import sys

Antes de importar o TensorFlow, faça algumas alterações no ambiente:

  1. Desative todas as GPUs. Isso evita erros causados ​​por todos os trabalhadores tentando usar a mesma GPU. Em uma aplicação do mundo real, cada trabalhador estaria em uma máquina diferente.
os.environ["CUDA_VISIBLE_DEVICES"] = "-1"
  1. Redefina a variável de ambiente TF_CONFIG (você aprenderá mais sobre isso mais tarde):
os.environ.pop('TF_CONFIG', None)
  1. Certifique-se de que o diretório atual esteja no caminho do Python—isso permite que o notebook importe os arquivos escritos por %%writefile posteriormente:
if '.' not in sys.path:
  sys.path.insert(0, '.')

Agora importe o TensorFlow:

import tensorflow as tf

Definição de conjunto de dados e modelo

Em seguida, crie um arquivo mnist_setup.py com uma configuração simples de modelo e conjunto de dados. Este arquivo Python será usado pelos processos de trabalho neste tutorial:

%%writefile mnist_setup.py

import os
import tensorflow as tf
import numpy as np

def mnist_dataset(batch_size):
  (x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
  # The `x` arrays are in uint8 and have values in the [0, 255] range.
  # You need to convert them to float32 with values in the [0, 1] range.
  x_train = x_train / np.float32(255)
  y_train = y_train.astype(np.int64)
  train_dataset = tf.data.Dataset.from_tensor_slices(
      (x_train, y_train)).shuffle(60000).repeat().batch(batch_size)
  return train_dataset

def build_and_compile_cnn_model():
  model = tf.keras.Sequential([
      tf.keras.layers.InputLayer(input_shape=(28, 28)),
      tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
      tf.keras.layers.Conv2D(32, 3, activation='relu'),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(128, activation='relu'),
      tf.keras.layers.Dense(10)
  ])
  model.compile(
      loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
      optimizer=tf.keras.optimizers.SGD(learning_rate=0.001),
      metrics=['accuracy'])
  return model
Writing mnist_setup.py

Treinamento de modelo em um único trabalhador

Tente treinar o modelo por um pequeno número de épocas e observe os resultados de um único trabalhador para garantir que tudo funcione corretamente. À medida que o treinamento progride, a perda deve cair e a precisão deve aumentar.

import mnist_setup

batch_size = 64
single_worker_dataset = mnist_setup.mnist_dataset(batch_size)
single_worker_model = mnist_setup.build_and_compile_cnn_model()
single_worker_model.fit(single_worker_dataset, epochs=3, steps_per_epoch=70)
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/mnist.npz
11493376/11490434 [==============================] - 0s 0us/step
11501568/11490434 [==============================] - 0s 0us/step
2022-02-05 02:20:59.945141: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
Epoch 1/3
70/70 [==============================] - 1s 12ms/step - loss: 2.2839 - accuracy: 0.1788
Epoch 2/3
70/70 [==============================] - 1s 12ms/step - loss: 2.2492 - accuracy: 0.3185
Epoch 3/3
70/70 [==============================] - 1s 12ms/step - loss: 2.2012 - accuracy: 0.4795
<keras.callbacks.History at 0x7f666a2e4510>

Configuração de vários trabalhadores

Agora vamos entrar no mundo do treinamento de vários trabalhadores.

Um cluster com jobs e tarefas

No TensorFlow, o treinamento distribuído envolve: um 'cluster' com várias tarefas, e cada uma das tarefas pode ter uma ou mais 'task' .

Você precisará da variável de ambiente de configuração TF_CONFIG para treinar em várias máquinas, cada uma das quais possivelmente com uma função diferente. TF_CONFIG é uma string JSON usada para especificar a configuração do cluster para cada trabalhador que faz parte do cluster.

Existem dois componentes de uma variável TF_CONFIG : 'cluster' e 'task' .

  • Um 'cluster' é o mesmo para todos os trabalhadores e fornece informações sobre o cluster de treinamento, que é um dict composto por diferentes tipos de empregos, como 'worker' ou 'chief' .

    • No treinamento de vários trabalhadores com tf.distribute.MultiWorkerMirroredStrategy , geralmente há um 'worker' que assume responsabilidades, como salvar um ponto de verificação e escrever um arquivo de resumo para o TensorBoard, além do que um 'worker' normal faz. Tal 'worker' é referido como o trabalhador chefe (com um nome de trabalho 'chief' ).
    • É costume que o 'chief' tenha 'index' 0 para ser nomeado (na verdade, é assim que tf.distribute.Strategy é implementado).
  • Uma 'task' fornece informações da tarefa atual e é diferente para cada trabalhador. Ele especifica o 'type' e 'index' desse trabalhador.

Abaixo segue um exemplo de configuração:

tf_config = {
    'cluster': {
        'worker': ['localhost:12345', 'localhost:23456']
    },
    'task': {'type': 'worker', 'index': 0}
}

Aqui está o mesmo TF_CONFIG serializado como uma string JSON:

json.dumps(tf_config)
'{"cluster": {"worker": ["localhost:12345", "localhost:23456"]}, "task": {"type": "worker", "index": 0} }'

Observe que tf_config é apenas uma variável local em Python. Para poder usá-lo para uma configuração de treinamento, este dict precisa ser serializado como um JSON e colocado em uma variável de ambiente TF_CONFIG .

Na configuração de exemplo acima, você define a tarefa 'type' como 'worker' e a tarefa 'index' como 0 . Portanto, esta máquina é o primeiro trabalhador. Ele será apontado como o trabalhador 'chief' e fará mais trabalho do que os outros.

Para fins de ilustração, este tutorial mostra como você pode configurar uma variável TF_CONFIG com dois workers em um localhost .

Na prática, você criaria vários trabalhadores em endereços/portas IP externas e definiria uma variável TF_CONFIG em cada trabalhador de acordo.

Neste tutorial, você usará dois trabalhadores:

  • O TF_CONFIG do primeiro trabalhador ( 'chief' ) é mostrado acima.
  • Para o segundo trabalhador, você definirá tf_config['task']['index']=1

Variáveis ​​de ambiente e subprocessos em notebooks

Os subprocessos herdam as variáveis ​​de ambiente de seu pai.

Por exemplo, você pode definir uma variável de ambiente neste processo do Jupyter Notebook da seguinte maneira:

os.environ['GREETINGS'] = 'Hello TensorFlow!'

Em seguida, você pode acessar a variável de ambiente de um subprocesso:

echo ${GREETINGS}
Hello TensorFlow!

Na próxima seção, você usará um método semelhante para passar o TF_CONFIG para os subprocessos de trabalho. Em um cenário do mundo real, você não iniciaria seus trabalhos dessa maneira, mas é suficiente neste exemplo.

Escolha a estratégia certa

No TensorFlow, existem duas formas principais de treinamento distribuído:

  • Treinamento síncrono , onde as etapas do treinamento são sincronizadas entre os trabalhadores e réplicas, e
  • Treinamento assíncrono , em que as etapas de treinamento não são estritamente sincronizadas (por exemplo, treinamento do servidor de parâmetros ).

Este tutorial demonstra como realizar treinamento síncrono de vários trabalhadores usando uma instância de tf.distribute.MultiWorkerMirroredStrategy .

MultiWorkerMirroredStrategy cria cópias de todas as variáveis ​​nas camadas do modelo em cada dispositivo em todos os trabalhadores. Ele usa CollectiveOps , uma operação do TensorFlow para comunicação coletiva, para agregar gradientes e manter as variáveis ​​em sincronia. O guia tf.distribute.Strategy tem mais detalhes sobre essa estratégia.

strategy = tf.distribute.MultiWorkerMirroredStrategy()
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled.
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:CPU:0',), communication = CommunicationImplementation.AUTO

MultiWorkerMirroredStrategy fornece várias implementações por meio do parâmetro tf.distribute.experimental.CommunicationOptions : 1) RING implementa coletivos baseados em anel usando gRPC como camada de comunicação entre hosts; 2) NCCL usa a NVIDIA Collective Communication Library para implementar coletivos; e 3) AUTO adia a escolha para o tempo de execução. A melhor escolha de implementação coletiva depende do número e tipo de GPUs e da interconexão de rede no cluster.

Treine o modelo

Com a integração da API tf.distribute.Strategy em tf.keras , a única mudança que você fará para distribuir o treinamento para vários trabalhadores é incluir a construção do modelo e a chamada model.compile() dentro de strategy.scope() . O escopo da estratégia de distribuição determina como e onde as variáveis ​​são criadas e, no caso de MultiWorkerMirroredStrategy , as variáveis ​​criadas são MirroredVariable e são replicadas em cada um dos trabalhadores.

with strategy.scope():
  # Model building/compiling need to be within `strategy.scope()`.
  multi_worker_model = mnist_setup.build_and_compile_cnn_model()

Para realmente executar com o MultiWorkerMirroredStrategy , você precisará executar processos de trabalho e passar um TF_CONFIG para eles.

Como o arquivo mnist_setup.py escrito anteriormente, aqui está o main.py que cada um dos workers executará:

%%writefile main.py

import os
import json

import tensorflow as tf
import mnist_setup

per_worker_batch_size = 64
tf_config = json.loads(os.environ['TF_CONFIG'])
num_workers = len(tf_config['cluster']['worker'])

strategy = tf.distribute.MultiWorkerMirroredStrategy()

global_batch_size = per_worker_batch_size * num_workers
multi_worker_dataset = mnist_setup.mnist_dataset(global_batch_size)

with strategy.scope():
  # Model building/compiling need to be within `strategy.scope()`.
  multi_worker_model = mnist_setup.build_and_compile_cnn_model()


multi_worker_model.fit(multi_worker_dataset, epochs=3, steps_per_epoch=70)
Writing main.py

No trecho de código acima, observe que global_batch_size , que é passado para Dataset.batch , é definido como per_worker_batch_size * num_workers . Isso garante que cada trabalhador processe lotes de exemplos per_worker_batch_size , independentemente do número de trabalhadores.

O diretório atual agora contém os dois arquivos Python:

ls *.py
main.py
mnist_setup.py

Então json-serialize o TF_CONFIG e adicione-o às variáveis ​​de ambiente:

os.environ['TF_CONFIG'] = json.dumps(tf_config)

Agora, você pode iniciar um processo de trabalho que executará o main.py e usará o TF_CONFIG :

# first kill any previous runs
%killbgscripts
All background processes were killed.
python main.py &> job_0.log

Há algumas coisas a serem observadas sobre o comando acima:

  1. Ele usa o %%bash que é uma "mágica" de notebook para executar alguns comandos do bash.
  2. Ele usa o sinalizador --bg para executar o processo bash em segundo plano, porque esse trabalhador não será encerrado. Ele espera por todos os trabalhadores antes de começar.

O processo de trabalho em segundo plano não imprimirá a saída neste notebook, então o &> redireciona sua saída para um arquivo para que você possa inspecionar o que aconteceu em um arquivo de log posteriormente.

Então, espere alguns segundos para o processo iniciar:

import time
time.sleep(10)

Agora, inspecione o que foi gerado no arquivo de log do trabalhador até agora:

cat job_0.log
2022-02-05 02:21:06.348503: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected

A última linha do arquivo de log deve dizer: Started server with target: grpc://localhost:12345 . O primeiro trabalhador está agora pronto e está esperando que todos os outros trabalhadores estejam prontos para prosseguir.

Então atualize o tf_config para o processo do segundo trabalhador pegar:

tf_config['task']['index'] = 1
os.environ['TF_CONFIG'] = json.dumps(tf_config)

Inicie o segundo trabalhador. Isso iniciará o treinamento, pois todos os trabalhadores estão ativos (portanto, não há necessidade de fazer esse processo em segundo plano):

python main.py
Epoch 1/3
70/70 [==============================] - 6s 51ms/step - loss: 2.2766 - accuracy: 0.1722
Epoch 2/3
70/70 [==============================] - 3s 48ms/step - loss: 2.2172 - accuracy: 0.4157
Epoch 3/3
70/70 [==============================] - 3s 49ms/step - loss: 2.1471 - accuracy: 0.5901
2022-02-05 02:21:16.367945: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2022-02-05 02:21:17.234030: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_INT64
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 60000
  }
}
attr {
  key: "is_files"
  value {
    b: false
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\024TensorSliceDataset:0"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 28
        }
        dim {
          size: 28
        }
      }
      shape {
      }
    }
  }
}
experimental_type {
  type_id: TFT_PRODUCT
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT64
        }
      }
    }
  }
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT64
        }
      }
    }
  }
}

2022-02-05 02:21:17.450972: W tensorflow/core/framework/dataset.cc:768] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.

Se você verificar novamente os logs escritos pelo primeiro trabalhador, descobrirá que ele participou do treinamento desse modelo:

cat job_0.log
2022-02-05 02:21:06.348503: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2022-02-05 02:21:17.232316: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_INT64
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 60000
  }
}
attr {
  key: "is_files"
  value {
    b: false
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\024TensorSliceDataset:0"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 28
        }
        dim {
          size: 28
        }
      }
      shape {
      }
    }
  }
}
experimental_type {
  type_id: TFT_PRODUCT
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT64
        }
      }
    }
  }
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT64
        }
      }
    }
  }
}

2022-02-05 02:21:17.457812: W tensorflow/core/framework/dataset.cc:768] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.
Epoch 1/3
70/70 [==============================] - 6s 51ms/step - loss: 2.2766 - accuracy: 0.1722
Epoch 2/3
70/70 [==============================] - 3s 48ms/step - loss: 2.2172 - accuracy: 0.4157
Epoch 3/3
70/70 [==============================] - 3s 49ms/step - loss: 2.1471 - accuracy: 0.5901

Sem surpresa, isso foi mais lento do que o teste executado no início deste tutorial.

A execução de vários trabalhadores em uma única máquina apenas adiciona sobrecarga.

O objetivo aqui não era melhorar o tempo de treinamento, mas apenas dar um exemplo de treinamento multitrabalhador.

# Delete the `TF_CONFIG`, and kill any background tasks so they don't affect the next section.
os.environ.pop('TF_CONFIG', None)
%killbgscripts
All background processes were killed.

Treinamento de vários trabalhadores em profundidade

Até agora, você aprendeu como executar uma configuração básica de vários trabalhadores.

Durante o restante do tutorial, você aprenderá detalhadamente outros fatores que podem ser úteis ou importantes para casos de uso reais.

Fragmentação do conjunto de dados

No treinamento de vários trabalhadores, a fragmentação do conjunto de dados é necessária para garantir a convergência e o desempenho.

O exemplo na seção anterior se baseia no autosharding padrão fornecido pela API tf.distribute.Strategy . Você pode controlar a fragmentação definindo o tf.data.experimental.AutoShardPolicy do tf.data.experimental.DistributeOptions .

Para saber mais sobre a fragmentação automática , consulte o guia de entrada distribuída .

Aqui está um exemplo rápido de como desativar a fragmentação automática, para que cada réplica processe todos os exemplos ( não recomendado ):

options = tf.data.Options()
options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.OFF

global_batch_size = 64
multi_worker_dataset = mnist_setup.mnist_dataset(batch_size=64)
dataset_no_auto_shard = multi_worker_dataset.with_options(options)

Avaliação

Se você passar o validation_data para Model.fit , ele alternará entre treinamento e avaliação para cada época. A avaliação que utiliza o validation_data é distribuída no mesmo conjunto de workers e os resultados da avaliação são agregados e disponibilizados para todos os workers.

Semelhante ao treinamento, o conjunto de dados de validação é fragmentado automaticamente no nível do arquivo. Você precisa definir um tamanho de lote global no conjunto de dados de validação e definir o validation_steps .

Um conjunto de dados repetido também é recomendado para avaliação.

Como alternativa, você também pode criar outra tarefa que leia periodicamente os pontos de verificação e execute a avaliação. Isso é o que o Estimator faz. Mas esta não é uma forma recomendada de realizar a avaliação e, portanto, seus detalhes são omitidos.

Desempenho

Agora você tem um modelo Keras que está configurado para ser executado em vários workers com o MultiWorkerMirroredStrategy .

Para ajustar o desempenho do treinamento de vários trabalhadores, você pode tentar o seguinte:

  • tf.distribute.MultiWorkerMirroredStrategy fornece várias implementações de comunicação coletiva :

    • O RING implementa coletivos baseados em anel usando gRPC como a camada de comunicação entre hosts.
    • A NCCL usa a NVIDIA Collective Communication Library para implementar coletivos.
    • AUTO adia a escolha para o tempo de execução.

    A melhor escolha de implementação coletiva depende do número de GPUs, do tipo de GPUs e da interconexão de rede no cluster. Para substituir a escolha automática, especifique o parâmetro communication_options do construtor de MultiWorkerMirroredStrategy . Por exemplo:

    communication_options=tf.distribute.experimental.CommunicationOptions(implementation=tf.distribute.experimental.CollectiveCommunication.NCCL)
    
  • Transmita as variáveis ​​para tf.float se possível:

    • O modelo oficial da ResNet inclui um exemplo de como isso pode ser feito.

Tolerância ao erro

No treinamento síncrono, o cluster falharia se um dos trabalhadores falhar e não existir nenhum mecanismo de recuperação de falhas.

Usar Keras com tf.distribute.Strategy vem com a vantagem de tolerância a falhas nos casos em que os trabalhadores morrem ou ficam instáveis. Você pode fazer isso preservando o estado de treinamento no sistema de arquivos distribuído de sua escolha, de modo que, após uma reinicialização da instância que falhou ou preempção anteriormente, o estado de treinamento seja recuperado.

Quando um trabalhador fica indisponível, outros trabalhadores falharão (possivelmente após um tempo limite). Nesses casos, o trabalhador indisponível precisa ser reiniciado, assim como outros trabalhadores que falharam.

Retorno de chamada ModelCheckpoint

O retorno de chamada ModelCheckpoint não fornece mais a funcionalidade de tolerância a falhas. Em vez disso, use o retorno de chamada BackupAndRestore .

O retorno de chamada ModelCheckpoint ainda pode ser usado para salvar pontos de verificação. Mas com isso, se o treinamento foi interrompido ou finalizado com sucesso, para continuar o treinamento a partir do checkpoint, o usuário é responsável por carregar o modelo manualmente.

Opcionalmente, o usuário pode optar por salvar e restaurar modelo/pesos fora do retorno de chamada ModelCheckpoint .

Salvamento e carregamento do modelo

Para salvar seu modelo usando model.save ou tf.saved_model.save , o destino do salvamento precisa ser diferente para cada trabalhador.

  • Para trabalhadores não-chefes, você precisará salvar o modelo em um diretório temporário.
  • Para o chefe, você precisará salvar no diretório do modelo fornecido.

Os diretórios temporários no trabalhador precisam ser exclusivos para evitar erros resultantes de vários trabalhadores tentando gravar no mesmo local.

O modelo salvo em todos os diretórios é idêntico e, normalmente, apenas o modelo salvo pelo chefe deve ser referenciado para restauração ou atendimento.

Você deve ter alguma lógica de limpeza que exclua os diretórios temporários criados pelos trabalhadores após a conclusão do treinamento.

A razão para economizar no chefe e nos trabalhadores ao mesmo tempo é porque você pode estar agregando variáveis ​​durante o checkpoint, o que exige que o chefe e os trabalhadores participem do protocolo de comunicação allreduce. Por outro lado, permitir que o chefe e os trabalhadores salvem no mesmo diretório de modelo resultará em erros devido à contenção.

Usando o MultiWorkerMirroredStrategy , o programa é executado em todos os trabalhadores e, para saber se o trabalhador atual é o chefe, ele aproveita o objeto resolvedor de cluster que possui os atributos task_type e task_id :

  • task_type informa qual é o trabalho atual (por exemplo 'worker' ).
  • task_id informa o identificador do trabalhador.
  • O trabalhador com task_id == 0 é designado como o trabalhador chefe.

No trecho de código abaixo, a função write_filepath fornece o caminho do arquivo a ser gravado, que depende do task_id do trabalhador:

  • Para o trabalhador chefe (com task_id == 0 ), ele grava no caminho do arquivo original.
  • Para outros trabalhadores, ele cria um diretório temporário— temp_dir —com task_id no caminho do diretório para escrever:
model_path = '/tmp/keras-model'

def _is_chief(task_type, task_id):
  # Note: there are two possible `TF_CONFIG` configuration.
  #   1) In addition to `worker` tasks, a `chief` task type is use;
  #      in this case, this function should be modified to
  #      `return task_type == 'chief'`.
  #   2) Only `worker` task type is used; in this case, worker 0 is
  #      regarded as the chief. The implementation demonstrated here
  #      is for this case.
  # For the purpose of this Colab section, the `task_type is None` case
  # is added because it is effectively run with only a single worker.
  return (task_type == 'worker' and task_id == 0) or task_type is None

def _get_temp_dir(dirpath, task_id):
  base_dirpath = 'workertemp_' + str(task_id)
  temp_dir = os.path.join(dirpath, base_dirpath)
  tf.io.gfile.makedirs(temp_dir)
  return temp_dir

def write_filepath(filepath, task_type, task_id):
  dirpath = os.path.dirname(filepath)
  base = os.path.basename(filepath)
  if not _is_chief(task_type, task_id):
    dirpath = _get_temp_dir(dirpath, task_id)
  return os.path.join(dirpath, base)

task_type, task_id = (strategy.cluster_resolver.task_type,
                      strategy.cluster_resolver.task_id)
write_model_path = write_filepath(model_path, task_type, task_id)

Com isso, você está pronto para salvar:

multi_worker_model.save(write_model_path)
2022-02-05 02:21:31.809502: W tensorflow/python/util/util.cc:368] Sets are not currently considered sequences, but this may change in the future, so consider avoiding using them.
INFO:tensorflow:Assets written to: /tmp/keras-model/assets
INFO:tensorflow:Assets written to: /tmp/keras-model/assets

Conforme descrito acima, mais tarde o modelo deve ser carregado apenas do caminho para o qual o chefe foi salvo, então vamos remover os temporários que os trabalhadores não-chefes salvaram:

if not _is_chief(task_type, task_id):
  tf.io.gfile.rmtree(os.path.dirname(write_model_path))

Agora, na hora de carregar, vamos usar a conveniente API tf.keras.models.load_model e continuar com o trabalho adicional.

Aqui, suponha que use apenas um único trabalhador para carregar e continuar o treinamento; nesse caso, você não chama tf.keras.models.load_model dentro de outro strategy.scope() (observe que strategy = tf.distribute.MultiWorkerMirroredStrategy() , conforme definido anteriormente ):

loaded_model = tf.keras.models.load_model(model_path)

# Now that the model is restored, and can continue with the training.
loaded_model.fit(single_worker_dataset, epochs=2, steps_per_epoch=20)
Epoch 1/2
20/20 [==============================] - 1s 12ms/step - loss: 2.2949 - accuracy: 0.0492
Epoch 2/2
20/20 [==============================] - 0s 13ms/step - loss: 2.2680 - accuracy: 0.0773
<keras.callbacks.History at 0x7f6669989750>

Salvar e restaurar pontos de verificação

Por outro lado, o checkpointing permite salvar os pesos do seu modelo e restaurá-los sem precisar salvar o modelo inteiro.

Aqui, você criará um tf.train.Checkpoint que rastreia o modelo, que é gerenciado pelo tf.train.CheckpointManager , para que apenas o último checkpoint seja preservado:

checkpoint_dir = '/tmp/ckpt'

checkpoint = tf.train.Checkpoint(model=multi_worker_model)
write_checkpoint_dir = write_filepath(checkpoint_dir, task_type, task_id)
checkpoint_manager = tf.train.CheckpointManager(
    checkpoint, directory=write_checkpoint_dir, max_to_keep=1)

Depois que o CheckpointManager estiver configurado, você estará pronto para salvar e remover os pontos de verificação que os trabalhadores não-chefes salvaram:

checkpoint_manager.save()
if not _is_chief(task_type, task_id):
  tf.io.gfile.rmtree(write_checkpoint_dir)

Agora, quando você precisar restaurar o modelo, poderá encontrar o último checkpoint salvo usando a conveniente função tf.train.latest_checkpoint . Depois de restaurar o ponto de verificação, você pode continuar com o treinamento.

latest_checkpoint = tf.train.latest_checkpoint(checkpoint_dir)
checkpoint.restore(latest_checkpoint)
multi_worker_model.fit(multi_worker_dataset, epochs=2, steps_per_epoch=20)
2022-02-05 02:21:33.584421: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_INT64
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 60000
  }
}
attr {
  key: "is_files"
  value {
    b: false
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\024TensorSliceDataset:5"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 28
        }
        dim {
          size: 28
        }
      }
      shape {
      }
    }
  }
}
experimental_type {
  type_id: TFT_PRODUCT
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT64
        }
      }
    }
  }
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT64
        }
      }
    }
  }
}
Epoch 1/2
2022-02-05 02:21:33.803317: W tensorflow/core/framework/dataset.cc:768] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.
20/20 [==============================] - 3s 13ms/step - loss: 2.2970 - accuracy: 0.0547
Epoch 2/2
20/20 [==============================] - 0s 13ms/step - loss: 2.2690 - accuracy: 0.0938
<keras.callbacks.History at 0x7f6669589850>

Retorno de chamada BackupAndRestore

O retorno de chamada tf.keras.callbacks.BackupAndRestore fornece a funcionalidade de tolerância a falhas fazendo backup do modelo e do número de época atual em um arquivo de ponto de verificação temporário no argumento backup_dir para BackupAndRestore . Isso é feito no final de cada época.

Depois que os trabalhos são interrompidos e reiniciados, o retorno de chamada restaura o último ponto de verificação e o treinamento continua desde o início da época interrompida. Qualquer treinamento parcial já feito na época inacabada antes da interrupção será descartado, para que não afete o estado final do modelo.

Para usá-lo, forneça uma instância de tf.keras.callbacks.BackupAndRestore na chamada Model.fit .

Com MultiWorkerMirroredStrategy , se um trabalhador for interrompido, todo o cluster pausará até que o trabalhador interrompido seja reiniciado. Outros trabalhadores também serão reiniciados e o trabalhador interrompido reingressa no cluster. Em seguida, cada trabalhador lê o arquivo de ponto de verificação que foi salvo anteriormente e recupera seu estado anterior, permitindo assim que o cluster volte a sincronizar. Então, o treinamento continua.

O retorno de chamada BackupAndRestore usa o CheckpointManager para salvar e restaurar o estado de treinamento, que gera um arquivo chamado checkpoint que rastreia os checkpoints existentes junto com o mais recente. Por esse motivo, backup_dir não deve ser reutilizado para armazenar outros pontos de verificação para evitar colisão de nomes.

Atualmente, o retorno de chamada BackupAndRestore oferece suporte ao treinamento de um único funcionário sem estratégia — MirroredStrategy — e treinamento de vários funcionários com o MultiWorkerMirroredStrategy .

Abaixo estão dois exemplos para treinamento de vários trabalhadores e treinamento de um único trabalhador:

# Multi-worker training with `MultiWorkerMirroredStrategy`
# and the `BackupAndRestore` callback.

callbacks = [tf.keras.callbacks.BackupAndRestore(backup_dir='/tmp/backup')]
with strategy.scope():
  multi_worker_model = mnist_setup.build_and_compile_cnn_model()
multi_worker_model.fit(multi_worker_dataset,
                       epochs=3,
                       steps_per_epoch=70,
                       callbacks=callbacks)
2022-02-05 02:21:37.063622: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_INT64
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 60000
  }
}
attr {
  key: "is_files"
  value {
    b: false
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\024TensorSliceDataset:5"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 28
        }
        dim {
          size: 28
        }
      }
      shape {
      }
    }
  }
}
experimental_type {
  type_id: TFT_PRODUCT
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT64
        }
      }
    }
  }
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT64
        }
      }
    }
  }
}
Epoch 1/3
70/70 [==============================] - 3s 13ms/step - loss: 2.2667 - accuracy: 0.2123
Epoch 2/3
70/70 [==============================] - 1s 13ms/step - loss: 2.1925 - accuracy: 0.4509
Epoch 3/3
70/70 [==============================] - 1s 13ms/step - loss: 2.1057 - accuracy: 0.5614
<keras.callbacks.History at 0x7f6669555d90>

Se você inspecionar o diretório de backup_dir especificado em BackupAndRestore , poderá observar alguns arquivos de ponto de verificação gerados temporariamente. Esses arquivos são necessários para recuperar as instâncias perdidas anteriormente e serão removidos pela biblioteca no final do Model.fit após a saída bem-sucedida do seu treinamento.

Recursos adicionais

  1. O guia Treinamento distribuído no TensorFlow fornece uma visão geral das estratégias de distribuição disponíveis.
  2. O tutorial Loop de treinamento personalizado com Keras e MultiWorkerMirroredStrategy mostra como usar o MultiWorkerMirroredStrategy com Keras e um loop de treinamento personalizado.
  3. Confira os modelos oficiais , muitos dos quais podem ser configurados para executar várias estratégias de distribuição.
  4. O guia Melhor desempenho com tf.function fornece informações sobre outras estratégias e ferramentas, como o TensorFlow Profiler , que você pode usar para otimizar o desempenho de seus modelos do TensorFlow.