Veja no TensorFlow.org | Executar no Google Colab | Ver fonte no GitHub | Baixar caderno |
Visão geral
O treinamento do servidor de parâmetros é um método comum de dados paralelos para aumentar o treinamento do modelo em várias máquinas.
Um cluster de treinamento do servidor de parâmetros consiste em trabalhadores e servidores de parâmetros . As variáveis são criadas em servidores de parâmetros e são lidas e atualizadas pelos trabalhadores em cada etapa. Por padrão, os workers leem e atualizam essas variáveis de forma independente, sem sincronizar entre si. É por isso que, às vezes, o treinamento no estilo do servidor de parâmetros é chamado de treinamento assíncrono .
No TensorFlow 2, o treinamento do servidor de parâmetros é desenvolvido pela classe tf.distribute.experimental.ParameterServerStrategy
, que distribui as etapas de treinamento para um cluster que pode ser dimensionado para milhares de trabalhadores (acompanhados por servidores de parâmetros).
Métodos de treinamento suportados
Existem dois principais métodos de treinamento suportados:
- A API Keras
Model.fit
, que é recomendada quando você prefere uma abstração e manipulação de treinamento de alto nível. - Um loop de treinamento personalizado (você pode consultar Treinamento personalizado , Escrevendo um loop de treinamento do zero e Loop de treinamento personalizado com Keras e MultiWorkerMirroredStrategy para obter mais detalhes.) O treinamento de loop personalizado é recomendado quando você preferir definir os detalhes de seu loop de treinamento.
Um cluster com jobs e tarefas
Independentemente da API escolhida ( Model.fit
ou um loop de treinamento personalizado), o treinamento distribuído no TensorFlow 2 envolve: um 'cluster'
com vários 'jobs'
e cada um dos jobs pode ter uma ou mais 'tasks'
.
Ao usar o treinamento do servidor de parâmetros, é recomendável ter:
- Um trabalho de coordenador (que tem o nome de trabalho
chief
) - Vários trabalhos de trabalhador (nome do trabalho
worker
); e - Trabalhos de servidor de vários parâmetros (nome do trabalho
ps
)
Enquanto o coordenador cria recursos, despacha tarefas de treinamento, escreve pontos de verificação e lida com falhas de tarefas, trabalhadores e servidores de parâmetros executam o tf.distribute.Server
que escuta as solicitações do coordenador.
Treinamento do servidor de parâmetros com a API Model.fit
O treinamento do servidor de parâmetros com a API Model.fit
requer que o coordenador use um objeto tf.distribute.experimental.ParameterServerStrategy
e um tf.keras.utils.experimental.DatasetCreator
como entrada. Semelhante ao uso de Model.fit
sem estratégia ou com outras estratégias, o fluxo de trabalho envolve a criação e compilação do modelo, preparando os retornos de chamada, seguidos por uma chamada de Model.fit
.
Treinamento do servidor de parâmetros com um loop de treinamento personalizado
Com loops de treinamento personalizados, a classe tf.distribute.experimental.coordinator.ClusterCoordinator
é o principal componente usado para o coordenador.
- A classe
ClusterCoordinator
precisa trabalhar em conjunto com um objetotf.distribute.Strategy
. - Esse objeto
tf.distribute.Strategy
é necessário para fornecer as informações do cluster e é usado para definir uma etapa de treinamento, conforme demonstrado em Treinamento personalizado com tf.distribute.Strategy . - O objeto
ClusterCoordinator
então despacha a execução dessas etapas de treinamento para trabalhadores remotos. - Para treinamento do servidor de parâmetros, o
ClusterCoordinator
precisa trabalhar com umtf.distribute.experimental.ParameterServerStrategy
.
A API mais importante fornecida pelo objeto ClusterCoordinator
é schedule
:
- A API de
schedule
enfileira umtf.function
e retorna umRemoteValue
semelhante ao futuro imediatamente. - As funções enfileiradas serão despachadas para trabalhadores remotos em threads em segundo plano e seus
RemoteValue
s serão preenchidos de forma assíncrona. - Como o
schedule
não requer atribuição de trabalhador, otf.function
passado pode ser executado em qualquer trabalhador disponível. - Se o trabalhador em que ela é executada ficar indisponível antes de sua conclusão, a função será repetida em outro trabalhador disponível.
- Devido a esse fato e ao fato de que a execução da função não é atômica, uma função pode ser executada mais de uma vez.
Além de despachar funções remotas, o ClusterCoordinator
também ajuda a criar conjuntos de dados em todos os trabalhadores e reconstruir esses conjuntos de dados quando um trabalhador se recupera de uma falha.
Configuração do tutorial
O tutorial se ramificará em Model.fit
e caminhos de loop de treinamento personalizados, e você pode escolher aquele que atende às suas necessidades. Seções diferentes de "Treinando com X" são aplicáveis a ambos os caminhos.
pip install portpicker
import multiprocessing
import os
import random
import portpicker
import tensorflow as tf
Configuração do cluster
Conforme mencionado acima, um cluster de treinamento de servidor de parâmetros requer uma tarefa de coordenador que executa seu programa de treinamento, um ou vários workers e tarefas de servidor de parâmetros que executam servidores tf.distribute.Server
— tf.distribute.Server — e possivelmente uma tarefa de avaliação adicional que executa avaliação secundária (veja a seção de avaliação do carro lateral abaixo). Os requisitos para configurá-los são:
- A tarefa do coordenador precisa conhecer os endereços e as portas de todos os outros servidores TensorFlow, exceto o avaliador.
- Os workers e os servidores de parâmetros precisam saber em qual porta eles precisam escutar. Para simplificar, geralmente você pode passar as informações completas do cluster ao criar servidores TensorFlow nessas tarefas.
- A tarefa do avaliador não precisa conhecer a configuração do cluster de treinamento. Se isso acontecer, ele não deve tentar se conectar ao cluster de treinamento.
- Os workers e os servidores de parâmetros devem ter tipos de tarefas como
"worker"
e"ps"
, respectivamente. O coordenador deve usar"chief"
como o tipo de tarefa por motivos de legado.
Neste tutorial, você criará um cluster em processo para que todo o treinamento do servidor de parâmetros possa ser executado no Colab. Você aprenderá como configurar clusters reais em uma seção posterior.
Cluster em processo
Você começará criando vários servidores TensorFlow com antecedência e se conectará a eles posteriormente. Observe que isso é apenas para fins de demonstração deste tutorial, e em treinamento real os servidores serão iniciados em máquinas "worker"
e "ps"
.
def create_in_process_cluster(num_workers, num_ps):
"""Creates and starts local servers and returns the cluster_resolver."""
worker_ports = [portpicker.pick_unused_port() for _ in range(num_workers)]
ps_ports = [portpicker.pick_unused_port() for _ in range(num_ps)]
cluster_dict = {}
cluster_dict["worker"] = ["localhost:%s" % port for port in worker_ports]
if num_ps > 0:
cluster_dict["ps"] = ["localhost:%s" % port for port in ps_ports]
cluster_spec = tf.train.ClusterSpec(cluster_dict)
# Workers need some inter_ops threads to work properly.
worker_config = tf.compat.v1.ConfigProto()
if multiprocessing.cpu_count() < num_workers + 1:
worker_config.inter_op_parallelism_threads = num_workers + 1
for i in range(num_workers):
tf.distribute.Server(
cluster_spec,
job_name="worker",
task_index=i,
config=worker_config,
protocol="grpc")
for i in range(num_ps):
tf.distribute.Server(
cluster_spec,
job_name="ps",
task_index=i,
protocol="grpc")
cluster_resolver = tf.distribute.cluster_resolver.SimpleClusterResolver(
cluster_spec, rpc_layer="grpc")
return cluster_resolver
# Set the environment variable to allow reporting worker and ps failure to the
# coordinator. This is a workaround and won't be necessary in the future.
os.environ["GRPC_FAIL_FAST"] = "use_caller"
NUM_WORKERS = 3
NUM_PS = 2
cluster_resolver = create_in_process_cluster(NUM_WORKERS, NUM_PS)
A configuração de cluster em processo é frequentemente usada em testes de unidade, como aqui .
Outra opção para teste local é iniciar processos na máquina local - confira Treinamento de vários trabalhadores com Keras para obter um exemplo dessa abordagem.
Instanciar um ParameterServerStrategy
Antes de mergulhar no código de treinamento, vamos instanciar um objeto ParameterServerStrategy
. Observe que isso é necessário independentemente de você estar procedendo com Model.fit
ou um loop de treinamento personalizado. O argumento variable_partitioner
será explicado na seção Variable sharding .
variable_partitioner = (
tf.distribute.experimental.partitioners.MinSizePartitioner(
min_shard_bytes=(256 << 10),
max_shards=NUM_PS))
strategy = tf.distribute.experimental.ParameterServerStrategy(
cluster_resolver,
variable_partitioner=variable_partitioner)
INFO:tensorflow:`tf.distribute.experimental.ParameterServerStrategy` is initialized with cluster_spec: ClusterSpec({'ps': ['localhost:19069', 'localhost:19342'], 'worker': ['localhost:18007', 'localhost:20252', 'localhost:23613']}) INFO:tensorflow:ParameterServerStrategyV2 is now connecting to cluster with cluster_spec: ClusterSpec({'ps': ['localhost:19069', 'localhost:19342'], 'worker': ['localhost:18007', 'localhost:20252', 'localhost:23613']}) INFO:tensorflow:ParameterServerStrategy (CentralStorageStrategy if you are using a single machine) with compute_devices = ['/job:chief/replica:0/task:0/device:GPU:0'], variable_device = '/job:chief/replica:0/task:0/device:GPU:0' INFO:tensorflow:Number of GPUs on workers: 1
Para usar GPUs para treinamento, aloque GPUs visíveis para cada trabalhador. O ParameterServerStrategy
usará todas as GPUs disponíveis em cada trabalhador, com a restrição de que todos os trabalhadores devem ter o mesmo número de GPUs disponíveis.
Fragmentação variável
A fragmentação de variável refere-se à divisão de uma variável em várias variáveis menores, que são chamadas de fragmentos . A fragmentação variável pode ser útil para distribuir a carga da rede ao acessar esses fragmentos. Também é útil distribuir computação e armazenamento de uma variável normal em vários servidores de parâmetros.
Para habilitar a fragmentação de variável, você pode passar um variable_partitioner
ao construir um objeto ParameterServerStrategy
. O variable_partitioner
será invocado sempre que uma variável for criada e espera-se que retorne o número de shards ao longo de cada dimensão da variável. Alguns variable_partitioner
s prontos para uso são fornecidos, como tf.distribute.experimental.partitioners.MinSizePartitioner
. Recomenda-se usar particionadores baseados em tamanho, como tf.distribute.experimental.partitioners.MinSizePartitioner
para evitar particionar variáveis pequenas, que podem ter um impacto negativo na velocidade de treinamento do modelo.
Quando um variable_partitioner
é passado e se você criar uma variável diretamente sob strategy.scope()
, ela se tornará um tipo de contêiner com uma propriedade variables
que fornece acesso à lista de shards. Na maioria dos casos, esse contêiner será convertido automaticamente em um tensor concatenando todos os shards. Como resultado, ela pode ser usada como uma variável normal. Por outro lado, alguns métodos do TensorFlow, como tf.nn.embedding_lookup
, fornecem implementação eficiente para esse tipo de contêiner e, nesses métodos, a concatenação automática será evitada.
Consulte os documentos da API de tf.distribute.experimental.ParameterServerStrategy
para obter mais detalhes.
Treinamento com Model.fit
O Keras fornece uma API de treinamento fácil de usar via Model.fit
que lida com o loop de treinamento sob o capô, com a flexibilidade do train_step
substituível e retornos de chamada, que fornecem funcionalidades como salvar pontos de verificação ou salvar resumos para o TensorBoard. Com Model.fit
, o mesmo código de treinamento pode ser usado para outras estratégias com uma simples troca do objeto de estratégia.
Dados de entrada
Model.fit
com treinamento de servidor de parâmetros requer que os dados de entrada sejam fornecidos em um callable que receba um único argumento do tipo tf.distribute.InputContext
e retorne um tf.data.Dataset
. Em seguida, crie um objeto tf.keras.utils.experimental.DatasetCreator
que receba tal callable
e um objeto tf.distribute.InputOptions
opcional por meio do argumento input_options
.
Observe que é recomendável embaralhar e repetir os dados com o treinamento do servidor de parâmetros e especificar steps_per_epoch
na chamada de fit
para que a biblioteca conheça os limites da época.
Consulte o tutorial de entrada distribuída para obter mais informações sobre o argumento InputContext
.
def dataset_fn(input_context):
global_batch_size = 64
batch_size = input_context.get_per_replica_batch_size(global_batch_size)
x = tf.random.uniform((10, 10))
y = tf.random.uniform((10,))
dataset = tf.data.Dataset.from_tensor_slices((x, y)).shuffle(10).repeat()
dataset = dataset.shard(
input_context.num_input_pipelines,
input_context.input_pipeline_id)
dataset = dataset.batch(batch_size)
dataset = dataset.prefetch(2)
return dataset
dc = tf.keras.utils.experimental.DatasetCreator(dataset_fn)
O código em dataset_fn
será invocado no dispositivo de entrada, que geralmente é a CPU, em cada uma das máquinas de trabalho.
Construção e compilação de modelos
Agora, você criará um tf.keras.Model
— um modelo tf.keras.models.Sequential
trivial para fins de demonstração — seguido por uma chamada Model.compile
para incorporar componentes, como um otimizador, métricas ou parâmetros como steps_per_execution
:
with strategy.scope():
model = tf.keras.models.Sequential([tf.keras.layers.Dense(10)])
model.compile(tf.keras.optimizers.SGD(), loss='mse', steps_per_execution=10)
Retorno de chamadas e treinamento
Antes de chamar model.fit
para o treinamento real, vamos preparar os retornos de chamada necessários para tarefas comuns, como:
-
ModelCheckpoint
: para salvar os pesos do modelo. -
BackupAndRestore
: para garantir que o progresso do treinamento seja automaticamente copiado e recuperado se o cluster apresentar indisponibilidade (como anulação ou preempção); ou -
TensorBoard
: para salvar os relatórios de progresso em arquivos de resumo, que são visualizados na ferramenta TensorBoard.
working_dir = '/tmp/my_working_dir'
log_dir = os.path.join(working_dir, 'log')
ckpt_filepath = os.path.join(working_dir, 'ckpt')
backup_dir = os.path.join(working_dir, 'backup')
callbacks = [
tf.keras.callbacks.TensorBoard(log_dir=log_dir),
tf.keras.callbacks.ModelCheckpoint(filepath=ckpt_filepath),
tf.keras.callbacks.BackupAndRestore(backup_dir=backup_dir),
]
model.fit(dc, epochs=5, steps_per_epoch=20, callbacks=callbacks)
Epoch 1/5 INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',). INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',). INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',). INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',). INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',). INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',). INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',). INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',). 2022-01-26 05:32:01.399347: W tensorflow/python/util/util.cc:368] Sets are not currently considered sequences, but this may change in the future, so consider avoiding using them. INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets 20/20 - 4s - loss: 0.5761 - 4s/epoch - 185ms/step Epoch 2/5 INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets 20/20 - 1s - loss: 0.4423 - 561ms/epoch - 28ms/step Epoch 3/5 WARNING:tensorflow:5 out of the last 5 calls to <function MultiDeviceSaver.save.<locals>.tf_function_save at 0x7f89783e7560> triggered tf.function retracing. Tracing is expensive and the excessive number of tracings could be due to (1) creating @tf.function repeatedly in a loop, (2) passing tensors with different shapes, (3) passing Python objects instead of tensors. For (1), please define your @tf.function outside of the loop. For (2), @tf.function has experimental_relax_shapes=True option that relaxes argument shapes that can avoid unnecessary retracing. For (3), please refer to https://www.tensorflow.org/guide/function#controlling_retracing and https://www.tensorflow.org/api_docs/python/tf/function for more details. INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets WARNING:tensorflow:6 out of the last 6 calls to <function MultiDeviceSaver.save.<locals>.tf_function_save at 0x7f897851f050> triggered tf.function retracing. Tracing is expensive and the excessive number of tracings could be due to (1) creating @tf.function repeatedly in a loop, (2) passing tensors with different shapes, (3) passing Python objects instead of tensors. For (1), please define your @tf.function outside of the loop. For (2), @tf.function has experimental_relax_shapes=True option that relaxes argument shapes that can avoid unnecessary retracing. For (3), please refer to https://www.tensorflow.org/guide/function#controlling_retracing and https://www.tensorflow.org/api_docs/python/tf/function for more details. 20/20 - 0s - loss: 0.3983 - 392ms/epoch - 20ms/step Epoch 4/5 INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets 20/20 - 1s - loss: 0.3592 - 507ms/epoch - 25ms/step Epoch 5/5 INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets 20/20 - 0s - loss: 0.2705 - 384ms/epoch - 19ms/step <keras.callbacks.History at 0x7f89984ca890>
Uso direto com ClusterCoordinator
(opcional)
Mesmo se você escolher o caminho de treinamento Model.fit
, você pode opcionalmente instanciar um objeto tf.distribute.experimental.coordinator.ClusterCoordinator
para agendar outras funções que você gostaria que fossem executadas nos workers. Consulte a seção Treinamento com um loop de treinamento personalizado para obter mais detalhes e exemplos.
Treinamento com um loop de treinamento personalizado
O uso de loops de treinamento personalizados com tf.distribute.Strategy
oferece grande flexibilidade para definir loops de treinamento. Com o ParameterServerStrategy
definido acima (como strategy
), você usará um tf.distribute.experimental.coordinator.ClusterCoordinator
para despachar a execução das etapas de treinamento para trabalhadores remotos.
Em seguida, você criará um modelo, definirá um conjunto de dados e uma função de etapa, como fez no loop de treinamento com outros tf.distribute.Strategy
s. Você pode encontrar mais detalhes no tutorial Treinamento personalizado com tf.distribute.Strategy .
Para garantir uma pré-busca eficiente do conjunto de dados, use as APIs de criação de conjunto de dados distribuídas recomendadas mencionadas na seção Etapas de treinamento de despacho para funcionários remotos abaixo. Além disso, certifique-se de chamar Strategy.run
dentro do worker_fn
para aproveitar ao máximo as GPUs alocadas aos trabalhadores. As demais etapas são as mesmas para treinamento com ou sem GPUs.
Vamos criar esses componentes nas seguintes etapas:
Configure os dados
Primeiro, escreva uma função que crie um conjunto de dados que inclua a lógica de pré-processamento implementada pelas camadas de pré-processamento do Keras .
Você criará essas camadas fora do dataset_fn
mas aplicará a transformação dentro do dataset_fn
, pois envolverá o dataset_fn
em um tf.function
, que não permite que variáveis sejam criadas dentro dele.
feature_vocab = [
"avenger", "ironman", "batman", "hulk", "spiderman", "kingkong", "wonder_woman"
]
label_vocab = ["yes", "no"]
with strategy.scope():
feature_lookup_layer = tf.keras.layers.StringLookup(
vocabulary=feature_vocab,
mask_token=None)
label_lookup_layer = tf.keras.layers.StringLookup(
vocabulary=label_vocab,
num_oov_indices=0,
mask_token=None)
raw_feature_input = tf.keras.layers.Input(
shape=(3,),
dtype=tf.string,
name="feature")
feature_id_input = feature_lookup_layer(raw_feature_input)
feature_preprocess_stage = tf.keras.Model(
{"features": raw_feature_input},
feature_id_input)
raw_label_input = tf.keras.layers.Input(
shape=(1,),
dtype=tf.string,
name="label")
label_id_input = label_lookup_layer(raw_label_input)
label_preprocess_stage = tf.keras.Model(
{"label": raw_label_input},
label_id_input)
/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/numpy/core/numeric.py:2446: FutureWarning: elementwise comparison failed; returning scalar instead, but in the future will perform elementwise comparison return bool(asarray(a1 == a2).all())
Gere exemplos de brinquedos em um conjunto de dados:
def feature_and_label_gen(num_examples=200):
examples = {"features": [], "label": []}
for _ in range(num_examples):
features = random.sample(feature_vocab, 3)
label = ["yes"] if "avenger" in features else ["no"]
examples["features"].append(features)
examples["label"].append(label)
return examples
examples = feature_and_label_gen()
Em seguida, crie o conjunto de dados de treinamento envolvido em um dataset_fn
:
def dataset_fn(_):
raw_dataset = tf.data.Dataset.from_tensor_slices(examples)
train_dataset = raw_dataset.map(
lambda x: (
{"features": feature_preprocess_stage(x["features"])},
label_preprocess_stage(x["label"])
)).shuffle(200).batch(32).repeat()
return train_dataset
Construir o modelo
Em seguida, crie o modelo e outros objetos. Certifique-se de criar todas as variáveis em strategy.scope
.
# These variables created under the `strategy.scope` will be placed on parameter
# servers in a round-robin fashion.
with strategy.scope():
# Create the model. The input needs to be compatible with Keras processing layers.
model_input = tf.keras.layers.Input(
shape=(3,), dtype=tf.int64, name="model_input")
emb_layer = tf.keras.layers.Embedding(
input_dim=len(feature_lookup_layer.get_vocabulary()), output_dim=16384)
emb_output = tf.reduce_mean(emb_layer(model_input), axis=1)
dense_output = tf.keras.layers.Dense(units=1, activation="sigmoid")(emb_output)
model = tf.keras.Model({"features": model_input}, dense_output)
optimizer = tf.keras.optimizers.RMSprop(learning_rate=0.1)
accuracy = tf.keras.metrics.Accuracy()
Vamos confirmar que o uso de FixedShardsPartitioner
dividiu todas as variáveis em dois shards e cada shard foi atribuído a diferentes servidores de parâmetros:
assert len(emb_layer.weights) == 2
assert emb_layer.weights[0].shape == (4, 16384)
assert emb_layer.weights[1].shape == (4, 16384)
assert emb_layer.weights[0].device == "/job:ps/replica:0/task:0/device:CPU:0"
assert emb_layer.weights[1].device == "/job:ps/replica:0/task:1/device:CPU:0"
Defina a etapa de treinamento
Terceiro, crie a etapa de treinamento envolvida em um tf.function
:
@tf.function
def step_fn(iterator):
def replica_fn(batch_data, labels):
with tf.GradientTape() as tape:
pred = model(batch_data, training=True)
per_example_loss = tf.keras.losses.BinaryCrossentropy(
reduction=tf.keras.losses.Reduction.NONE)(labels, pred)
loss = tf.nn.compute_average_loss(per_example_loss)
gradients = tape.gradient(loss, model.trainable_variables)
optimizer.apply_gradients(zip(gradients, model.trainable_variables))
actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
accuracy.update_state(labels, actual_pred)
return loss
batch_data, labels = next(iterator)
losses = strategy.run(replica_fn, args=(batch_data, labels))
return strategy.reduce(tf.distribute.ReduceOp.SUM, losses, axis=None)
Na função de etapa de treinamento acima, chamar Strategy.run
e Strategy.reduce
no step_fn
pode oferecer suporte a várias GPUs por trabalhador. Se os trabalhadores tiverem GPUs alocadas, o Strategy.run
distribuirá os conjuntos de dados em várias réplicas.
Enviar etapas de treinamento para trabalhadores remotos
Depois que todos os cálculos forem definidos por ParameterServerStrategy
, você usará a classe tf.distribute.experimental.coordinator.ClusterCoordinator
para criar recursos e distribuir as etapas de treinamento para trabalhadores remotos.
Vamos primeiro criar um objeto ClusterCoordinator
e passar o objeto de estratégia:
coordinator = tf.distribute.experimental.coordinator.ClusterCoordinator(strategy)
Em seguida, crie um conjunto de dados por trabalhador e um iterador. No per_worker_dataset_fn
abaixo, é recomendado envolver o dataset_fn
em strategy.distribute_datasets_from_function
para permitir uma pré-busca eficiente para GPUs sem problemas.
@tf.function
def per_worker_dataset_fn():
return strategy.distribute_datasets_from_function(dataset_fn)
per_worker_dataset = coordinator.create_per_worker_dataset(per_worker_dataset_fn)
per_worker_iterator = iter(per_worker_dataset)
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).
A etapa final é distribuir a computação para trabalhadores remotos usando ClusterCoordinator.schedule
:
- O método
schedule
enfileira umtf.function
e retorna umRemoteValue
semelhante ao futuro imediatamente. As funções enfileiradas serão despachadas para trabalhadores remotos em threads em segundo plano e oRemoteValue
será preenchido de forma assíncrona. - O método
join
(ClusterCoordinator.join
) pode ser usado para esperar até que todas as funções agendadas sejam executadas.
num_epoches = 4
steps_per_epoch = 5
for i in range(num_epoches):
accuracy.reset_states()
for _ in range(steps_per_epoch):
coordinator.schedule(step_fn, args=(per_worker_iterator,))
# Wait at epoch boundaries.
coordinator.join()
print ("Finished epoch %d, accuracy is %f." % (i, accuracy.result().numpy()))
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',). INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',). Finished epoch 0, accuracy is 0.543750. Finished epoch 1, accuracy is 0.543750. Finished epoch 2, accuracy is 0.950000. Finished epoch 3, accuracy is 1.000000.
Aqui está como você pode buscar o resultado de um RemoteValue
:
loss = coordinator.schedule(step_fn, args=(per_worker_iterator,))
print ("Final loss is %f" % loss.fetch())
Final loss is 0.000000
Como alternativa, você pode iniciar todas as etapas e fazer algo enquanto aguarda a conclusão:
for _ in range(total_steps):
coordinator.schedule(step_fn, args=(per_worker_iterator,))
while not coordinator.done():
time.sleep(10)
# Do something like logging metrics or writing checkpoints.
Para obter o fluxo de trabalho completo de treinamento e veiculação deste exemplo específico, confira este teste .
Mais sobre a criação de conjuntos de dados
O conjunto de dados no código acima é criado usando a API ClusterCoordinator.create_per_worker_dataset
). Ele cria um conjunto de dados por trabalhador e retorna um objeto de contêiner. Você pode chamar o método iter
nele para criar um iterador por trabalhador. O iterador por trabalhador contém um iterador por trabalhador e a fatia correspondente de um trabalhador será substituída no argumento de entrada da função passada para o método ClusterCoordinator.schedule
antes que a função seja executada em um trabalhador específico.
Atualmente, o método ClusterCoordinator.schedule
assume que os workers são equivalentes e, portanto, assume que os conjuntos de dados em diferentes workers são os mesmos, exceto que podem ser embaralhados de forma diferente se contiverem uma operação Dataset.shuffle
. Por isso, também é recomendável que os conjuntos de dados sejam repetidos indefinidamente e você agende um número finito de etapas em vez de depender do OutOfRangeError
de um conjunto de dados.
Outra observação importante é que os conjuntos de dados tf.data
não suportam serialização e desserialização implícitas entre os limites da tarefa. Portanto, é importante criar todo o conjunto de dados dentro da função passada para ClusterCoordinator.create_per_worker_dataset
.
Avaliação
Há mais de uma maneira de definir e executar um loop de avaliação no treinamento distribuído. Cada um tem seus próprios prós e contras, conforme descrito abaixo. O método de avaliação em linha é recomendado se você não tiver preferência.
Avaliação em linha
Nesse método, o coordenador alterna entre treinamento e avaliação e, por isso, é chamado de avaliação inline .
Existem vários benefícios da avaliação em linha. Por exemplo:
- Ele pode suportar grandes modelos de avaliação e conjuntos de dados de avaliação que uma única tarefa não pode conter.
- Os resultados da avaliação podem ser usados para tomar decisões para treinar a próxima época.
Existem duas maneiras de implementar a avaliação em linha: avaliação direta e avaliação distribuída.
- Avaliação direta : Para modelos pequenos e conjuntos de dados de avaliação, o coordenador pode executar a avaliação diretamente no modelo distribuído com o conjunto de dados de avaliação no coordenador:
eval_dataset = tf.data.Dataset.from_tensor_slices(
feature_and_label_gen(num_examples=16)).map(
lambda x: (
{"features": feature_preprocess_stage(x["features"])},
label_preprocess_stage(x["label"])
)).batch(8)
eval_accuracy = tf.keras.metrics.Accuracy()
for batch_data, labels in eval_dataset:
pred = model(batch_data, training=False)
actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
eval_accuracy.update_state(labels, actual_pred)
print ("Evaluation accuracy: %f" % eval_accuracy.result())
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,). Evaluation accuracy: 1.000000
- Avaliação distribuída : para grandes modelos ou conjuntos de dados que são inviáveis de serem executados diretamente no coordenador, a tarefa do coordenador pode distribuir tarefas de avaliação para os trabalhadores por meio dos métodos
ClusterCoordinator.schedule
/ClusterCoordinator.join
:
with strategy.scope():
# Define the eval metric on parameter servers.
eval_accuracy = tf.keras.metrics.Accuracy()
@tf.function
def eval_step(iterator):
def replica_fn(batch_data, labels):
pred = model(batch_data, training=False)
actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
eval_accuracy.update_state(labels, actual_pred)
batch_data, labels = next(iterator)
strategy.run(replica_fn, args=(batch_data, labels))
def eval_dataset_fn():
return tf.data.Dataset.from_tensor_slices(
feature_and_label_gen(num_examples=16)).map(
lambda x: (
{"features": feature_preprocess_stage(x["features"])},
label_preprocess_stage(x["label"])
)).shuffle(16).repeat().batch(8)
per_worker_eval_dataset = coordinator.create_per_worker_dataset(eval_dataset_fn)
per_worker_eval_iterator = iter(per_worker_eval_dataset)
eval_steps_per_epoch = 2
for _ in range(eval_steps_per_epoch):
coordinator.schedule(eval_step, args=(per_worker_eval_iterator,))
coordinator.join()
print ("Evaluation accuracy: %f" % eval_accuracy.result())
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,). WARNING:tensorflow:1 GPUs are allocated per worker. Please use DistributedDataset by calling strategy.experimental_distribute_dataset or strategy.distribute_datasets_from_function to make best use of GPU resources WARNING:tensorflow:1 GPUs are allocated per worker. Please use DistributedDataset by calling strategy.experimental_distribute_dataset or strategy.distribute_datasets_from_function to make best use of GPU resources WARNING:tensorflow:1 GPUs are allocated per worker. Please use DistributedDataset by calling strategy.experimental_distribute_dataset or strategy.distribute_datasets_from_function to make best use of GPU resources Evaluation accuracy: 1.000000
Avaliação do carro lateral
Outro método é chamado de avaliação de carro lateral, onde você cria uma tarefa de avaliador dedicada que lê repetidamente os pontos de verificação e executa a avaliação em um ponto de verificação mais recente. Ele permite que seu programa de treinamento termine mais cedo se você não precisar alterar seu ciclo de treinamento com base nos resultados da avaliação. No entanto, requer uma tarefa de avaliador adicional e checkpoints periódicos para desencadear a avaliação. A seguir está um possível loop de avaliação de side-car:
checkpoint_dir = ...
eval_model = ...
eval_data = ...
checkpoint = tf.train.Checkpoint(model=eval_model)
for latest_checkpoint in tf.train.checkpoints_iterator(
checkpoint_dir):
try:
checkpoint.restore(latest_checkpoint).expect_partial()
except (tf.errors.OpError,) as e:
# checkpoint may be deleted by training when it is about to read it.
continue
# Optionally add callbacks to write summaries.
eval_model.evaluate(eval_data)
# Evaluation finishes when it has evaluated the last epoch.
if latest_checkpoint.endswith('-{}'.format(train_epoches)):
break
Aglomerados no mundo real
Em um ambiente de produção real, você executará todas as tarefas em diferentes processos em diferentes máquinas. A maneira mais simples de configurar informações de cluster em cada tarefa é definir variáveis de ambiente "TF_CONFIG"
e usar um tf.distribute.cluster_resolver.TFConfigClusterResolver
para analisar "TF_CONFIG"
.
Para obter uma descrição geral sobre as variáveis de ambiente "TF_CONFIG"
, consulte o guia de treinamento distribuído .
Se você iniciar suas tarefas de treinamento usando o Kubernetes ou outros modelos de configuração, é muito provável que esses modelos já tenham definido “TF_CONFIG"
para você.
Defina a variável de ambiente "TF_CONFIG"
Suponha que você tenha 3 trabalhadores e 2 servidores de parâmetros, o "TF_CONFIG"
do trabalhador 1 pode ser:
os.environ["TF_CONFIG"] = json.dumps({
"cluster": {
"worker": ["host1:port", "host2:port", "host3:port"],
"ps": ["host4:port", "host5:port"],
"chief": ["host6:port"]
},
"task": {"type": "worker", "index": 1}
})
O "TF_CONFIG"
do avaliador pode ser:
os.environ["TF_CONFIG"] = json.dumps({
"cluster": {
"evaluator": ["host7:port"]
},
"task": {"type": "evaluator", "index": 0}
})
A parte "cluster"
na string "TF_CONFIG"
acima para o avaliador é opcional.
Se você usar o mesmo binário para todas as tarefas
Se você preferir executar todas essas tarefas usando um único binário, precisará permitir que seu programa se ramifique em diferentes funções no início:
cluster_resolver = tf.distribute.cluster_resolver.TFConfigClusterResolver()
if cluster_resolver.task_type in ("worker", "ps"):
# Start a TensorFlow server and wait.
elif cluster_resolver.task_type == "evaluator":
# Run side-car evaluation
else:
# Run the coordinator.
O código a seguir inicia um servidor TensorFlow e aguarda:
# Set the environment variable to allow reporting worker and ps failure to the
# coordinator. This is a workaround and won't be necessary in the future.
os.environ["GRPC_FAIL_FAST"] = "use_caller"
server = tf.distribute.Server(
cluster_resolver.cluster_spec(),
job_name=cluster_resolver.task_type,
task_index=cluster_resolver.task_id,
protocol=cluster_resolver.rpc_layer or "grpc",
start=True)
server.join()
Como lidar com falha de tarefa
Falha do trabalhador
tf.distribute.experimental.coordinator.ClusterCoordinator
ou Model.fit
fornecem tolerância a falhas integrada para falhas do trabalhador. Após a recuperação do trabalhador, a função de conjunto de dados fornecida anteriormente (para ClusterCoordinator.create_per_worker_dataset
para um loop de treinamento personalizado ou tf.keras.utils.experimental.DatasetCreator
para Model.fit
) será invocada nos trabalhadores para recriar os conjuntos de dados.
Falha do servidor de parâmetros ou do coordenador
No entanto, quando o coordenador vê um erro do servidor de parâmetros, ele gerará um UnavailableError
ou AbortedError
imediatamente. Você pode reiniciar o coordenador neste caso. O próprio coordenador também pode ficar indisponível. Portanto, algumas ferramentas são recomendadas para não perder o progresso do treinamento:
Para
Model.fit
, você deve usar um retorno de chamadaBackupAndRestore
, que trata o progresso de salvamento e restauração automaticamente. Consulte a seção de retornos de chamada e treinamento acima para obter um exemplo.Para um loop de treinamento personalizado, você deve verificar as variáveis do modelo periodicamente e carregar as variáveis do modelo de um ponto de verificação, se houver, antes do início do treinamento. O progresso do treinamento pode ser inferido aproximadamente a partir de
optimizer.iterations
se um otimizador tiver um ponto de verificação:
checkpoint_manager = tf.train.CheckpointManager(
tf.train.Checkpoint(model=model, optimizer=optimizer),
checkpoint_dir,
max_to_keep=3)
if checkpoint_manager.latest_checkpoint:
checkpoint = checkpoint_manager.checkpoint
checkpoint.restore(
checkpoint_manager.latest_checkpoint).assert_existing_objects_matched()
global_steps = int(optimizer.iterations.numpy())
starting_epoch = global_steps // steps_per_epoch
for _ in range(starting_epoch, num_epoches):
for _ in range(steps_per_epoch):
coordinator.schedule(step_fn, args=(per_worker_iterator,))
coordinator.join()
checkpoint_manager.save()
Buscando um RemoteValue
A busca de um RemoteValue
é garantida com sucesso se uma função for executada com sucesso. Isso ocorre porque atualmente o valor de retorno é copiado imediatamente para o coordenador após a execução de uma função. Se houver alguma falha do trabalhador durante a cópia, a função será repetida em outro trabalhador disponível. Portanto, se você deseja otimizar o desempenho, pode agendar funções sem um valor de retorno.
Relatório de erros
Uma vez que o coordenador vê um erro como UnavailableError
de servidores de parâmetros ou outros erros de aplicativo como um InvalidArgument
de tf.debugging.check_numerics
, ele cancelará todas as funções pendentes e enfileiradas antes de gerar o erro. Buscar seus RemoteValue
correspondentes gerará um CancelledError
.
Depois que um erro é gerado, o coordenador não levantará o mesmo erro ou qualquer erro de funções canceladas.
Melhoria de desempenho
Existem vários motivos possíveis se você encontrar problemas de desempenho ao treinar com ParameterServerStrategy
e ClusterResolver
.
Um motivo comum é que os servidores de parâmetros têm carga desequilibrada e alguns servidores de parâmetros muito carregados atingiram a capacidade. Também pode haver várias causas-raiz. Alguns métodos simples para mitigar esse problema são:
- Fragmente suas variáveis de modelo grandes especificando um
variable_partitioner
ao construir umParameterServerStrategy
. - Evite criar uma variável de ponto de acesso exigida por todos os servidores de parâmetros em uma única etapa, se possível. Por exemplo, use uma taxa de aprendizado constante ou subclasse
tf.keras.optimizers.schedules.LearningRateSchedule
em otimizadores, pois o comportamento padrão é que a taxa de aprendizado se tornará uma variável colocada em um servidor de parâmetros específico e solicitada por todos os outros servidores de parâmetros em cada etapa . - Embaralhe seus grandes vocabulários antes de passá-los para as camadas de pré-processamento do Keras.
Outra possível razão para problemas de desempenho é o coordenador. Sua primeira implementação de schedule
/ join
é baseada em Python e, portanto, pode ter sobrecarga de encadeamento. Também a latência entre o coordenador e os trabalhadores pode ser grande. Se esse é o caso,
Para
Model.fit
, você pode definir o argumentosteps_per_execution
fornecido emModel.compile
para um valor maior que 1.Para um loop de treinamento personalizado, você pode agrupar várias etapas em um único
tf.function
:
steps_per_invocation = 10
@tf.function
def step_fn(iterator):
for _ in range(steps_per_invocation):
features, labels = next(iterator)
def replica_fn(features, labels):
...
strategy.run(replica_fn, args=(features, labels))
Como a biblioteca é otimizada ainda mais, esperamos que a maioria dos usuários não precise empacotar manualmente as etapas no futuro.
Além disso, um pequeno truque para melhorar o desempenho é agendar funções sem um valor de retorno, conforme explicado na seção de tratamento de falhas de tarefas acima.
Limitações conhecidas
A maioria das limitações conhecidas já está coberta nas seções acima. Esta seção fornece um resumo.
ParameterServerStrategy
geral
-
os.environment["grpc_fail_fast"]="use_caller"
é necessário em todas as tarefas, incluindo o coordenador, para que a tolerância a falhas funcione corretamente. - O treinamento do servidor de parâmetros síncrono não é suportado.
- Geralmente, é necessário agrupar várias etapas em uma única função para obter o desempenho ideal.
- Não há suporte para carregar um saved_model via
tf.saved_model.load
contendo variáveis fragmentadas. Observe que o carregamento de tal saved_model usando o TensorFlow Serving deve funcionar. - Não há suporte para carregar um ponto de verificação contendo variáveis de slot do otimizador fragmentado em um número diferente de fragmentos.
- Não há suporte para recuperação de falha do servidor de parâmetros sem reiniciar a tarefa do coordenador.
- O uso de
tf.lookup.StaticHashTable
(que é comumente empregado por algumas camadas de pré-processamento Keras, comotf.keras.layers.IntegerLookup
,tf.keras.layers.StringLookup
etf.keras.layers.TextVectorization
) resulta em recursos colocados em o coordenador neste momento com treinamento do servidor de parâmetros. Isso tem implicações de desempenho para RPCs de pesquisa de trabalhadores para o coordenador. Esta é uma alta prioridade atual para resolver.
Especificações do Model.fit
- O argumento
steps_per_epoch
é necessário emModel.fit
. Você pode selecionar um valor que forneça intervalos apropriados em uma época. - O
ParameterServerStrategy
não tem suporte para retornos de chamada personalizados com chamadas em nível de lote por motivos de desempenho. Você deve converter essas chamadas em chamadas de nível de época comsteps_per_epoch
adequadamente escolhidos, para que sejam chamadas a cada número de etapas desteps_per_epoch
. Os retornos de chamada integrados não são afetados: suas chamadas em nível de lote foram modificadas para serem de alto desempenho. O suporte a chamadas em nível de lote paraParameterServerStrategy
está sendo planejado. - Pela mesma razão, ao contrário de outras estratégias, a barra de progresso e as métricas são registradas apenas nos limites da época.
-
run_eagerly
não é suportado.
Especificidades do loop de treinamento personalizado
-
ClusterCoordinator.schedule
não oferece suporte a garantias de visitação para um conjunto de dados. - Quando
ClusterCoordinator.create_per_worker_dataset
é usado, todo o conjunto de dados deve ser criado dentro da função passada para ele. -
tf.data.Options
é ignorado em um conjunto de dados criado porClusterCoordinator.create_per_worker_dataset
.