Veja no TensorFlow.org | Executar no Google Colab | Ver fonte no GitHub | Baixar caderno |
Visão geral
Este tutorial demonstra como realizar treinamento distribuído para vários trabalhadores com um modelo Keras e a API Model.fit
usando a API tf.distribute.Strategy
— especificamente a classe tf.distribute.MultiWorkerMirroredStrategy
. Com a ajuda dessa estratégia, um modelo Keras projetado para ser executado em um único trabalhador pode funcionar perfeitamente em vários trabalhadores com alterações mínimas de código.
Para aqueles interessados em uma compreensão mais profunda das APIs tf.distribute.Strategy
, o guia Treinamento distribuído no TensorFlow está disponível para uma visão geral das estratégias de distribuição suportadas pelo TensorFlow.
Para saber como usar o MultiWorkerMirroredStrategy
com Keras e um loop de treinamento personalizado, consulte Loop de treinamento personalizado com Keras e MultiWorkerMirroredStrategy .
Observe que o objetivo deste tutorial é demonstrar um exemplo mínimo de vários trabalhadores com dois trabalhadores.
Configurar
Comece com algumas importações necessárias:
import json
import os
import sys
Antes de importar o TensorFlow, faça algumas alterações no ambiente:
- Desative todas as GPUs. Isso evita erros causados por todos os trabalhadores tentando usar a mesma GPU. Em uma aplicação do mundo real, cada trabalhador estaria em uma máquina diferente.
os.environ["CUDA_VISIBLE_DEVICES"] = "-1"
- Redefina a variável de ambiente
TF_CONFIG
(você aprenderá mais sobre isso mais tarde):
os.environ.pop('TF_CONFIG', None)
- Certifique-se de que o diretório atual esteja no caminho do Python—isso permite que o notebook importe os arquivos escritos por
%%writefile
posteriormente:
if '.' not in sys.path:
sys.path.insert(0, '.')
Agora importe o TensorFlow:
import tensorflow as tf
Definição de conjunto de dados e modelo
Em seguida, crie um arquivo mnist_setup.py
com uma configuração simples de modelo e conjunto de dados. Este arquivo Python será usado pelos processos de trabalho neste tutorial:
%%writefile mnist_setup.py
import os
import tensorflow as tf
import numpy as np
def mnist_dataset(batch_size):
(x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
# The `x` arrays are in uint8 and have values in the [0, 255] range.
# You need to convert them to float32 with values in the [0, 1] range.
x_train = x_train / np.float32(255)
y_train = y_train.astype(np.int64)
train_dataset = tf.data.Dataset.from_tensor_slices(
(x_train, y_train)).shuffle(60000).repeat().batch(batch_size)
return train_dataset
def build_and_compile_cnn_model():
model = tf.keras.Sequential([
tf.keras.layers.InputLayer(input_shape=(28, 28)),
tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
tf.keras.layers.Conv2D(32, 3, activation='relu'),
tf.keras.layers.Flatten(),
tf.keras.layers.Dense(128, activation='relu'),
tf.keras.layers.Dense(10)
])
model.compile(
loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
optimizer=tf.keras.optimizers.SGD(learning_rate=0.001),
metrics=['accuracy'])
return model
Writing mnist_setup.py
Treinamento de modelo em um único trabalhador
Tente treinar o modelo por um pequeno número de épocas e observe os resultados de um único trabalhador para garantir que tudo funcione corretamente. À medida que o treinamento progride, a perda deve cair e a precisão deve aumentar.
import mnist_setup
batch_size = 64
single_worker_dataset = mnist_setup.mnist_dataset(batch_size)
single_worker_model = mnist_setup.build_and_compile_cnn_model()
single_worker_model.fit(single_worker_dataset, epochs=3, steps_per_epoch=70)
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/mnist.npz 11493376/11490434 [==============================] - 0s 0us/step 11501568/11490434 [==============================] - 0s 0us/step 2022-02-05 02:20:59.945141: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected Epoch 1/3 70/70 [==============================] - 1s 12ms/step - loss: 2.2839 - accuracy: 0.1788 Epoch 2/3 70/70 [==============================] - 1s 12ms/step - loss: 2.2492 - accuracy: 0.3185 Epoch 3/3 70/70 [==============================] - 1s 12ms/step - loss: 2.2012 - accuracy: 0.4795 <keras.callbacks.History at 0x7f666a2e4510>
Configuração de vários trabalhadores
Agora vamos entrar no mundo do treinamento de vários trabalhadores.
Um cluster com jobs e tarefas
No TensorFlow, o treinamento distribuído envolve: um 'cluster'
com várias tarefas, e cada uma das tarefas pode ter uma ou mais 'task'
.
Você precisará da variável de ambiente de configuração TF_CONFIG
para treinar em várias máquinas, cada uma das quais possivelmente com uma função diferente. TF_CONFIG
é uma string JSON usada para especificar a configuração do cluster para cada trabalhador que faz parte do cluster.
Existem dois componentes de uma variável TF_CONFIG
: 'cluster'
e 'task'
.
Um
'cluster'
é o mesmo para todos os trabalhadores e fornece informações sobre o cluster de treinamento, que é um dict composto por diferentes tipos de empregos, como'worker'
ou'chief'
.- No treinamento de vários trabalhadores com
tf.distribute.MultiWorkerMirroredStrategy
, geralmente há um'worker'
que assume responsabilidades, como salvar um ponto de verificação e escrever um arquivo de resumo para o TensorBoard, além do que um'worker'
normal faz. Tal'worker'
é referido como o trabalhador chefe (com um nome de trabalho'chief'
). - É costume que o
'chief'
tenha'index'
0
para ser nomeado (na verdade, é assim quetf.distribute.Strategy
é implementado).
- No treinamento de vários trabalhadores com
Uma
'task'
fornece informações da tarefa atual e é diferente para cada trabalhador. Ele especifica o'type'
e'index'
desse trabalhador.
Abaixo segue um exemplo de configuração:
tf_config = {
'cluster': {
'worker': ['localhost:12345', 'localhost:23456']
},
'task': {'type': 'worker', 'index': 0}
}
Aqui está o mesmo TF_CONFIG
serializado como uma string JSON:
json.dumps(tf_config)
'{"cluster": {"worker": ["localhost:12345", "localhost:23456"]}, "task": {"type": "worker", "index": 0} }'
Observe que tf_config
é apenas uma variável local em Python. Para poder usá-lo para uma configuração de treinamento, este dict precisa ser serializado como um JSON e colocado em uma variável de ambiente TF_CONFIG
.
Na configuração de exemplo acima, você define a tarefa 'type'
como 'worker'
e a tarefa 'index'
como 0
. Portanto, esta máquina é o primeiro trabalhador. Ele será apontado como o trabalhador 'chief'
e fará mais trabalho do que os outros.
Para fins de ilustração, este tutorial mostra como você pode configurar uma variável TF_CONFIG
com dois workers em um localhost
.
Na prática, você criaria vários trabalhadores em endereços/portas IP externas e definiria uma variável TF_CONFIG
em cada trabalhador de acordo.
Neste tutorial, você usará dois trabalhadores:
- O
TF_CONFIG
do primeiro trabalhador ('chief'
) é mostrado acima. - Para o segundo trabalhador, você definirá
tf_config['task']['index']=1
Variáveis de ambiente e subprocessos em notebooks
Os subprocessos herdam as variáveis de ambiente de seu pai.
Por exemplo, você pode definir uma variável de ambiente neste processo do Jupyter Notebook da seguinte maneira:
os.environ['GREETINGS'] = 'Hello TensorFlow!'
Em seguida, você pode acessar a variável de ambiente de um subprocesso:
echo ${GREETINGS}
Hello TensorFlow!
Na próxima seção, você usará um método semelhante para passar o TF_CONFIG
para os subprocessos de trabalho. Em um cenário do mundo real, você não iniciaria seus trabalhos dessa maneira, mas é suficiente neste exemplo.
Escolha a estratégia certa
No TensorFlow, existem duas formas principais de treinamento distribuído:
- Treinamento síncrono , onde as etapas do treinamento são sincronizadas entre os trabalhadores e réplicas, e
- Treinamento assíncrono , em que as etapas de treinamento não são estritamente sincronizadas (por exemplo, treinamento do servidor de parâmetros ).
Este tutorial demonstra como realizar treinamento síncrono de vários trabalhadores usando uma instância de tf.distribute.MultiWorkerMirroredStrategy
.
MultiWorkerMirroredStrategy
cria cópias de todas as variáveis nas camadas do modelo em cada dispositivo em todos os trabalhadores. Ele usa CollectiveOps
, uma operação do TensorFlow para comunicação coletiva, para agregar gradientes e manter as variáveis em sincronia. O guia tf.distribute.Strategy
tem mais detalhes sobre essa estratégia.
strategy = tf.distribute.MultiWorkerMirroredStrategy()
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled. INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:CPU:0',), communication = CommunicationImplementation.AUTO
MultiWorkerMirroredStrategy
fornece várias implementações por meio do parâmetro tf.distribute.experimental.CommunicationOptions
: 1) RING
implementa coletivos baseados em anel usando gRPC como camada de comunicação entre hosts; 2) NCCL
usa a NVIDIA Collective Communication Library para implementar coletivos; e 3) AUTO
adia a escolha para o tempo de execução. A melhor escolha de implementação coletiva depende do número e tipo de GPUs e da interconexão de rede no cluster.
Treine o modelo
Com a integração da API tf.distribute.Strategy
em tf.keras
, a única mudança que você fará para distribuir o treinamento para vários trabalhadores é incluir a construção do modelo e a chamada model.compile()
dentro de strategy.scope()
. O escopo da estratégia de distribuição determina como e onde as variáveis são criadas e, no caso de MultiWorkerMirroredStrategy
, as variáveis criadas são MirroredVariable
e são replicadas em cada um dos trabalhadores.
with strategy.scope():
# Model building/compiling need to be within `strategy.scope()`.
multi_worker_model = mnist_setup.build_and_compile_cnn_model()
Para realmente executar com o MultiWorkerMirroredStrategy
, você precisará executar processos de trabalho e passar um TF_CONFIG
para eles.
Como o arquivo mnist_setup.py
escrito anteriormente, aqui está o main.py
que cada um dos workers executará:
%%writefile main.py
import os
import json
import tensorflow as tf
import mnist_setup
per_worker_batch_size = 64
tf_config = json.loads(os.environ['TF_CONFIG'])
num_workers = len(tf_config['cluster']['worker'])
strategy = tf.distribute.MultiWorkerMirroredStrategy()
global_batch_size = per_worker_batch_size * num_workers
multi_worker_dataset = mnist_setup.mnist_dataset(global_batch_size)
with strategy.scope():
# Model building/compiling need to be within `strategy.scope()`.
multi_worker_model = mnist_setup.build_and_compile_cnn_model()
multi_worker_model.fit(multi_worker_dataset, epochs=3, steps_per_epoch=70)
Writing main.py
No trecho de código acima, observe que global_batch_size
, que é passado para Dataset.batch
, é definido como per_worker_batch_size * num_workers
. Isso garante que cada trabalhador processe lotes de exemplos per_worker_batch_size
, independentemente do número de trabalhadores.
O diretório atual agora contém os dois arquivos Python:
ls *.py
main.py mnist_setup.py
Então json-serialize o TF_CONFIG
e adicione-o às variáveis de ambiente:
os.environ['TF_CONFIG'] = json.dumps(tf_config)
Agora, você pode iniciar um processo de trabalho que executará o main.py
e usará o TF_CONFIG
:
# first kill any previous runs
%killbgscripts
All background processes were killed.
python main.py &> job_0.log
Há algumas coisas a serem observadas sobre o comando acima:
- Ele usa o
%%bash
que é uma "mágica" de notebook para executar alguns comandos do bash. - Ele usa o sinalizador
--bg
para executar o processobash
em segundo plano, porque esse trabalhador não será encerrado. Ele espera por todos os trabalhadores antes de começar.
O processo de trabalho em segundo plano não imprimirá a saída neste notebook, então o &>
redireciona sua saída para um arquivo para que você possa inspecionar o que aconteceu em um arquivo de log posteriormente.
Então, espere alguns segundos para o processo iniciar:
import time
time.sleep(10)
Agora, inspecione o que foi gerado no arquivo de log do trabalhador até agora:
cat job_0.log
2022-02-05 02:21:06.348503: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
A última linha do arquivo de log deve dizer: Started server with target: grpc://localhost:12345
. O primeiro trabalhador está agora pronto e está esperando que todos os outros trabalhadores estejam prontos para prosseguir.
Então atualize o tf_config
para o processo do segundo trabalhador pegar:
tf_config['task']['index'] = 1
os.environ['TF_CONFIG'] = json.dumps(tf_config)
Inicie o segundo trabalhador. Isso iniciará o treinamento, pois todos os trabalhadores estão ativos (portanto, não há necessidade de fazer esse processo em segundo plano):
python main.py
Epoch 1/3 70/70 [==============================] - 6s 51ms/step - loss: 2.2766 - accuracy: 0.1722 Epoch 2/3 70/70 [==============================] - 3s 48ms/step - loss: 2.2172 - accuracy: 0.4157 Epoch 3/3 70/70 [==============================] - 3s 49ms/step - loss: 2.1471 - accuracy: 0.5901 2022-02-05 02:21:16.367945: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected 2022-02-05 02:21:17.234030: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2" op: "TensorSliceDataset" input: "Placeholder/_0" input: "Placeholder/_1" attr { key: "Toutput_types" value { list { type: DT_FLOAT type: DT_INT64 } } } attr { key: "_cardinality" value { i: 60000 } } attr { key: "is_files" value { b: false } } attr { key: "metadata" value { s: "\n\024TensorSliceDataset:0" } } attr { key: "output_shapes" value { list { shape { dim { size: 28 } dim { size: 28 } } shape { } } } } experimental_type { type_id: TFT_PRODUCT args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_INT64 } } } } args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_INT64 } } } } } 2022-02-05 02:21:17.450972: W tensorflow/core/framework/dataset.cc:768] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.
Se você verificar novamente os logs escritos pelo primeiro trabalhador, descobrirá que ele participou do treinamento desse modelo:
cat job_0.log
2022-02-05 02:21:06.348503: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected 2022-02-05 02:21:17.232316: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2" op: "TensorSliceDataset" input: "Placeholder/_0" input: "Placeholder/_1" attr { key: "Toutput_types" value { list { type: DT_FLOAT type: DT_INT64 } } } attr { key: "_cardinality" value { i: 60000 } } attr { key: "is_files" value { b: false } } attr { key: "metadata" value { s: "\n\024TensorSliceDataset:0" } } attr { key: "output_shapes" value { list { shape { dim { size: 28 } dim { size: 28 } } shape { } } } } experimental_type { type_id: TFT_PRODUCT args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_INT64 } } } } args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_INT64 } } } } } 2022-02-05 02:21:17.457812: W tensorflow/core/framework/dataset.cc:768] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations. Epoch 1/3 70/70 [==============================] - 6s 51ms/step - loss: 2.2766 - accuracy: 0.1722 Epoch 2/3 70/70 [==============================] - 3s 48ms/step - loss: 2.2172 - accuracy: 0.4157 Epoch 3/3 70/70 [==============================] - 3s 49ms/step - loss: 2.1471 - accuracy: 0.5901
Sem surpresa, isso foi mais lento do que o teste executado no início deste tutorial.
A execução de vários trabalhadores em uma única máquina apenas adiciona sobrecarga.
O objetivo aqui não era melhorar o tempo de treinamento, mas apenas dar um exemplo de treinamento multitrabalhador.
# Delete the `TF_CONFIG`, and kill any background tasks so they don't affect the next section.
os.environ.pop('TF_CONFIG', None)
%killbgscripts
All background processes were killed.
Treinamento de vários trabalhadores em profundidade
Até agora, você aprendeu como executar uma configuração básica de vários trabalhadores.
Durante o restante do tutorial, você aprenderá detalhadamente outros fatores que podem ser úteis ou importantes para casos de uso reais.
Fragmentação do conjunto de dados
No treinamento de vários trabalhadores, a fragmentação do conjunto de dados é necessária para garantir a convergência e o desempenho.
O exemplo na seção anterior se baseia no autosharding padrão fornecido pela API tf.distribute.Strategy
. Você pode controlar a fragmentação definindo o tf.data.experimental.AutoShardPolicy
do tf.data.experimental.DistributeOptions
.
Para saber mais sobre a fragmentação automática , consulte o guia de entrada distribuída .
Aqui está um exemplo rápido de como desativar a fragmentação automática, para que cada réplica processe todos os exemplos ( não recomendado ):
options = tf.data.Options()
options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.OFF
global_batch_size = 64
multi_worker_dataset = mnist_setup.mnist_dataset(batch_size=64)
dataset_no_auto_shard = multi_worker_dataset.with_options(options)
Avaliação
Se você passar o validation_data
para Model.fit
, ele alternará entre treinamento e avaliação para cada época. A avaliação que utiliza o validation_data
é distribuída no mesmo conjunto de workers e os resultados da avaliação são agregados e disponibilizados para todos os workers.
Semelhante ao treinamento, o conjunto de dados de validação é fragmentado automaticamente no nível do arquivo. Você precisa definir um tamanho de lote global no conjunto de dados de validação e definir o validation_steps
.
Um conjunto de dados repetido também é recomendado para avaliação.
Como alternativa, você também pode criar outra tarefa que leia periodicamente os pontos de verificação e execute a avaliação. Isso é o que o Estimator faz. Mas esta não é uma forma recomendada de realizar a avaliação e, portanto, seus detalhes são omitidos.
Desempenho
Agora você tem um modelo Keras que está configurado para ser executado em vários workers com o MultiWorkerMirroredStrategy
.
Para ajustar o desempenho do treinamento de vários trabalhadores, você pode tentar o seguinte:
tf.distribute.MultiWorkerMirroredStrategy
fornece várias implementações de comunicação coletiva :- O
RING
implementa coletivos baseados em anel usando gRPC como a camada de comunicação entre hosts. - A
NCCL
usa a NVIDIA Collective Communication Library para implementar coletivos. -
AUTO
adia a escolha para o tempo de execução.
A melhor escolha de implementação coletiva depende do número de GPUs, do tipo de GPUs e da interconexão de rede no cluster. Para substituir a escolha automática, especifique o parâmetro
communication_options
do construtor deMultiWorkerMirroredStrategy
. Por exemplo:communication_options=tf.distribute.experimental.CommunicationOptions(implementation=tf.distribute.experimental.CollectiveCommunication.NCCL)
- O
Transmita as variáveis para
tf.float
se possível:- O modelo oficial da ResNet inclui um exemplo de como isso pode ser feito.
Tolerância ao erro
No treinamento síncrono, o cluster falharia se um dos trabalhadores falhar e não existir nenhum mecanismo de recuperação de falhas.
Usar Keras com tf.distribute.Strategy
vem com a vantagem de tolerância a falhas nos casos em que os trabalhadores morrem ou ficam instáveis. Você pode fazer isso preservando o estado de treinamento no sistema de arquivos distribuído de sua escolha, de modo que, após uma reinicialização da instância que falhou ou preempção anteriormente, o estado de treinamento seja recuperado.
Quando um trabalhador fica indisponível, outros trabalhadores falharão (possivelmente após um tempo limite). Nesses casos, o trabalhador indisponível precisa ser reiniciado, assim como outros trabalhadores que falharam.
Retorno de chamada ModelCheckpoint
O retorno de chamada ModelCheckpoint
não fornece mais a funcionalidade de tolerância a falhas. Em vez disso, use o retorno de chamada BackupAndRestore
.
O retorno de chamada ModelCheckpoint
ainda pode ser usado para salvar pontos de verificação. Mas com isso, se o treinamento foi interrompido ou finalizado com sucesso, para continuar o treinamento a partir do checkpoint, o usuário é responsável por carregar o modelo manualmente.
Opcionalmente, o usuário pode optar por salvar e restaurar modelo/pesos fora do retorno de chamada ModelCheckpoint
.
Salvamento e carregamento do modelo
Para salvar seu modelo usando model.save
ou tf.saved_model.save
, o destino do salvamento precisa ser diferente para cada trabalhador.
- Para trabalhadores não-chefes, você precisará salvar o modelo em um diretório temporário.
- Para o chefe, você precisará salvar no diretório do modelo fornecido.
Os diretórios temporários no trabalhador precisam ser exclusivos para evitar erros resultantes de vários trabalhadores tentando gravar no mesmo local.
O modelo salvo em todos os diretórios é idêntico e, normalmente, apenas o modelo salvo pelo chefe deve ser referenciado para restauração ou atendimento.
Você deve ter alguma lógica de limpeza que exclua os diretórios temporários criados pelos trabalhadores após a conclusão do treinamento.
A razão para economizar no chefe e nos trabalhadores ao mesmo tempo é porque você pode estar agregando variáveis durante o checkpoint, o que exige que o chefe e os trabalhadores participem do protocolo de comunicação allreduce. Por outro lado, permitir que o chefe e os trabalhadores salvem no mesmo diretório de modelo resultará em erros devido à contenção.
Usando o MultiWorkerMirroredStrategy
, o programa é executado em todos os trabalhadores e, para saber se o trabalhador atual é o chefe, ele aproveita o objeto resolvedor de cluster que possui os atributos task_type
e task_id
:
-
task_type
informa qual é o trabalho atual (por exemplo'worker'
). -
task_id
informa o identificador do trabalhador. - O trabalhador com
task_id == 0
é designado como o trabalhador chefe.
No trecho de código abaixo, a função write_filepath
fornece o caminho do arquivo a ser gravado, que depende do task_id
do trabalhador:
- Para o trabalhador chefe (com
task_id == 0
), ele grava no caminho do arquivo original. - Para outros trabalhadores, ele cria um diretório temporário—
temp_dir
—comtask_id
no caminho do diretório para escrever:
model_path = '/tmp/keras-model'
def _is_chief(task_type, task_id):
# Note: there are two possible `TF_CONFIG` configuration.
# 1) In addition to `worker` tasks, a `chief` task type is use;
# in this case, this function should be modified to
# `return task_type == 'chief'`.
# 2) Only `worker` task type is used; in this case, worker 0 is
# regarded as the chief. The implementation demonstrated here
# is for this case.
# For the purpose of this Colab section, the `task_type is None` case
# is added because it is effectively run with only a single worker.
return (task_type == 'worker' and task_id == 0) or task_type is None
def _get_temp_dir(dirpath, task_id):
base_dirpath = 'workertemp_' + str(task_id)
temp_dir = os.path.join(dirpath, base_dirpath)
tf.io.gfile.makedirs(temp_dir)
return temp_dir
def write_filepath(filepath, task_type, task_id):
dirpath = os.path.dirname(filepath)
base = os.path.basename(filepath)
if not _is_chief(task_type, task_id):
dirpath = _get_temp_dir(dirpath, task_id)
return os.path.join(dirpath, base)
task_type, task_id = (strategy.cluster_resolver.task_type,
strategy.cluster_resolver.task_id)
write_model_path = write_filepath(model_path, task_type, task_id)
Com isso, você está pronto para salvar:
multi_worker_model.save(write_model_path)
2022-02-05 02:21:31.809502: W tensorflow/python/util/util.cc:368] Sets are not currently considered sequences, but this may change in the future, so consider avoiding using them. INFO:tensorflow:Assets written to: /tmp/keras-model/assets INFO:tensorflow:Assets written to: /tmp/keras-model/assets
Conforme descrito acima, mais tarde o modelo deve ser carregado apenas do caminho para o qual o chefe foi salvo, então vamos remover os temporários que os trabalhadores não-chefes salvaram:
if not _is_chief(task_type, task_id):
tf.io.gfile.rmtree(os.path.dirname(write_model_path))
Agora, na hora de carregar, vamos usar a conveniente API tf.keras.models.load_model
e continuar com o trabalho adicional.
Aqui, suponha que use apenas um único trabalhador para carregar e continuar o treinamento; nesse caso, você não chama tf.keras.models.load_model
dentro de outro strategy.scope()
(observe que strategy = tf.distribute.MultiWorkerMirroredStrategy()
, conforme definido anteriormente ):
loaded_model = tf.keras.models.load_model(model_path)
# Now that the model is restored, and can continue with the training.
loaded_model.fit(single_worker_dataset, epochs=2, steps_per_epoch=20)
Epoch 1/2 20/20 [==============================] - 1s 12ms/step - loss: 2.2949 - accuracy: 0.0492 Epoch 2/2 20/20 [==============================] - 0s 13ms/step - loss: 2.2680 - accuracy: 0.0773 <keras.callbacks.History at 0x7f6669989750>
Salvar e restaurar pontos de verificação
Por outro lado, o checkpointing permite salvar os pesos do seu modelo e restaurá-los sem precisar salvar o modelo inteiro.
Aqui, você criará um tf.train.Checkpoint
que rastreia o modelo, que é gerenciado pelo tf.train.CheckpointManager
, para que apenas o último checkpoint seja preservado:
checkpoint_dir = '/tmp/ckpt'
checkpoint = tf.train.Checkpoint(model=multi_worker_model)
write_checkpoint_dir = write_filepath(checkpoint_dir, task_type, task_id)
checkpoint_manager = tf.train.CheckpointManager(
checkpoint, directory=write_checkpoint_dir, max_to_keep=1)
Depois que o CheckpointManager
estiver configurado, você estará pronto para salvar e remover os pontos de verificação que os trabalhadores não-chefes salvaram:
checkpoint_manager.save()
if not _is_chief(task_type, task_id):
tf.io.gfile.rmtree(write_checkpoint_dir)
Agora, quando você precisar restaurar o modelo, poderá encontrar o último checkpoint salvo usando a conveniente função tf.train.latest_checkpoint
. Depois de restaurar o ponto de verificação, você pode continuar com o treinamento.
latest_checkpoint = tf.train.latest_checkpoint(checkpoint_dir)
checkpoint.restore(latest_checkpoint)
multi_worker_model.fit(multi_worker_dataset, epochs=2, steps_per_epoch=20)
2022-02-05 02:21:33.584421: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2" op: "TensorSliceDataset" input: "Placeholder/_0" input: "Placeholder/_1" attr { key: "Toutput_types" value { list { type: DT_FLOAT type: DT_INT64 } } } attr { key: "_cardinality" value { i: 60000 } } attr { key: "is_files" value { b: false } } attr { key: "metadata" value { s: "\n\024TensorSliceDataset:5" } } attr { key: "output_shapes" value { list { shape { dim { size: 28 } dim { size: 28 } } shape { } } } } experimental_type { type_id: TFT_PRODUCT args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_INT64 } } } } args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_INT64 } } } } } Epoch 1/2 2022-02-05 02:21:33.803317: W tensorflow/core/framework/dataset.cc:768] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations. 20/20 [==============================] - 3s 13ms/step - loss: 2.2970 - accuracy: 0.0547 Epoch 2/2 20/20 [==============================] - 0s 13ms/step - loss: 2.2690 - accuracy: 0.0938 <keras.callbacks.History at 0x7f6669589850>
Retorno de chamada BackupAndRestore
O retorno de chamada tf.keras.callbacks.BackupAndRestore
fornece a funcionalidade de tolerância a falhas fazendo backup do modelo e do número de época atual em um arquivo de ponto de verificação temporário no argumento backup_dir
para BackupAndRestore
. Isso é feito no final de cada época.
Depois que os trabalhos são interrompidos e reiniciados, o retorno de chamada restaura o último ponto de verificação e o treinamento continua desde o início da época interrompida. Qualquer treinamento parcial já feito na época inacabada antes da interrupção será descartado, para que não afete o estado final do modelo.
Para usá-lo, forneça uma instância de tf.keras.callbacks.BackupAndRestore
na chamada Model.fit
.
Com MultiWorkerMirroredStrategy
, se um trabalhador for interrompido, todo o cluster pausará até que o trabalhador interrompido seja reiniciado. Outros trabalhadores também serão reiniciados e o trabalhador interrompido reingressa no cluster. Em seguida, cada trabalhador lê o arquivo de ponto de verificação que foi salvo anteriormente e recupera seu estado anterior, permitindo assim que o cluster volte a sincronizar. Então, o treinamento continua.
O retorno de chamada BackupAndRestore
usa o CheckpointManager
para salvar e restaurar o estado de treinamento, que gera um arquivo chamado checkpoint que rastreia os checkpoints existentes junto com o mais recente. Por esse motivo, backup_dir
não deve ser reutilizado para armazenar outros pontos de verificação para evitar colisão de nomes.
Atualmente, o retorno de chamada BackupAndRestore
oferece suporte ao treinamento de um único funcionário sem estratégia — MirroredStrategy
— e treinamento de vários funcionários com o MultiWorkerMirroredStrategy
.
Abaixo estão dois exemplos para treinamento de vários trabalhadores e treinamento de um único trabalhador:
# Multi-worker training with `MultiWorkerMirroredStrategy`
# and the `BackupAndRestore` callback.
callbacks = [tf.keras.callbacks.BackupAndRestore(backup_dir='/tmp/backup')]
with strategy.scope():
multi_worker_model = mnist_setup.build_and_compile_cnn_model()
multi_worker_model.fit(multi_worker_dataset,
epochs=3,
steps_per_epoch=70,
callbacks=callbacks)
2022-02-05 02:21:37.063622: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2" op: "TensorSliceDataset" input: "Placeholder/_0" input: "Placeholder/_1" attr { key: "Toutput_types" value { list { type: DT_FLOAT type: DT_INT64 } } } attr { key: "_cardinality" value { i: 60000 } } attr { key: "is_files" value { b: false } } attr { key: "metadata" value { s: "\n\024TensorSliceDataset:5" } } attr { key: "output_shapes" value { list { shape { dim { size: 28 } dim { size: 28 } } shape { } } } } experimental_type { type_id: TFT_PRODUCT args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_INT64 } } } } args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_INT64 } } } } } Epoch 1/3 70/70 [==============================] - 3s 13ms/step - loss: 2.2667 - accuracy: 0.2123 Epoch 2/3 70/70 [==============================] - 1s 13ms/step - loss: 2.1925 - accuracy: 0.4509 Epoch 3/3 70/70 [==============================] - 1s 13ms/step - loss: 2.1057 - accuracy: 0.5614 <keras.callbacks.History at 0x7f6669555d90>
Se você inspecionar o diretório de backup_dir
especificado em BackupAndRestore
, poderá observar alguns arquivos de ponto de verificação gerados temporariamente. Esses arquivos são necessários para recuperar as instâncias perdidas anteriormente e serão removidos pela biblioteca no final do Model.fit
após a saída bem-sucedida do seu treinamento.
Recursos adicionais
- O guia Treinamento distribuído no TensorFlow fornece uma visão geral das estratégias de distribuição disponíveis.
- O tutorial Loop de treinamento personalizado com Keras e MultiWorkerMirroredStrategy mostra como usar o
MultiWorkerMirroredStrategy
com Keras e um loop de treinamento personalizado. - Confira os modelos oficiais , muitos dos quais podem ser configurados para executar várias estratégias de distribuição.
- O guia Melhor desempenho com tf.function fornece informações sobre outras estratégias e ferramentas, como o TensorFlow Profiler , que você pode usar para otimizar o desempenho de seus modelos do TensorFlow.