Treinamento do servidor de parâmetros com ParameterServerStrategy

Veja no TensorFlow.org Executar no Google Colab Ver fonte no GitHub Baixar caderno

Visão geral

O treinamento do servidor de parâmetros é um método comum de dados paralelos para aumentar o treinamento do modelo em várias máquinas.

Um cluster de treinamento do servidor de parâmetros consiste em trabalhadores e servidores de parâmetros . As variáveis ​​são criadas em servidores de parâmetros e são lidas e atualizadas pelos trabalhadores em cada etapa. Por padrão, os workers leem e atualizam essas variáveis ​​de forma independente, sem sincronizar entre si. É por isso que, às vezes, o treinamento no estilo do servidor de parâmetros é chamado de treinamento assíncrono .

No TensorFlow 2, o treinamento do servidor de parâmetros é desenvolvido pela classe tf.distribute.experimental.ParameterServerStrategy , que distribui as etapas de treinamento para um cluster que pode ser dimensionado para milhares de trabalhadores (acompanhados por servidores de parâmetros).

Métodos de treinamento suportados

Existem dois principais métodos de treinamento suportados:

Um cluster com jobs e tarefas

Independentemente da API escolhida ( Model.fit ou um loop de treinamento personalizado), o treinamento distribuído no TensorFlow 2 envolve: um 'cluster' com vários 'jobs' e cada um dos jobs pode ter uma ou mais 'tasks' .

Ao usar o treinamento do servidor de parâmetros, é recomendável ter:

  • Um trabalho de coordenador (que tem o nome de trabalho chief )
  • Vários trabalhos de trabalhador (nome do trabalho worker ); e
  • Trabalhos de servidor de vários parâmetros (nome do trabalho ps )

Enquanto o coordenador cria recursos, despacha tarefas de treinamento, escreve pontos de verificação e lida com falhas de tarefas, trabalhadores e servidores de parâmetros executam o tf.distribute.Server que escuta as solicitações do coordenador.

Treinamento do servidor de parâmetros com a API Model.fit

O treinamento do servidor de parâmetros com a API Model.fit requer que o coordenador use um objeto tf.distribute.experimental.ParameterServerStrategy e um tf.keras.utils.experimental.DatasetCreator como entrada. Semelhante ao uso de Model.fit sem estratégia ou com outras estratégias, o fluxo de trabalho envolve a criação e compilação do modelo, preparando os retornos de chamada, seguidos por uma chamada de Model.fit .

Treinamento do servidor de parâmetros com um loop de treinamento personalizado

Com loops de treinamento personalizados, a classe tf.distribute.experimental.coordinator.ClusterCoordinator é o principal componente usado para o coordenador.

A API mais importante fornecida pelo objeto ClusterCoordinator é schedule :

  • A API de schedule enfileira um tf.function e retorna um RemoteValue semelhante ao futuro imediatamente.
  • As funções enfileiradas serão despachadas para trabalhadores remotos em threads em segundo plano e seus RemoteValue s serão preenchidos de forma assíncrona.
  • Como o schedule não requer atribuição de trabalhador, o tf.function passado pode ser executado em qualquer trabalhador disponível.
  • Se o trabalhador em que ela é executada ficar indisponível antes de sua conclusão, a função será repetida em outro trabalhador disponível.
  • Devido a esse fato e ao fato de que a execução da função não é atômica, uma função pode ser executada mais de uma vez.

Além de despachar funções remotas, o ClusterCoordinator também ajuda a criar conjuntos de dados em todos os trabalhadores e reconstruir esses conjuntos de dados quando um trabalhador se recupera de uma falha.

Configuração do tutorial

O tutorial se ramificará em Model.fit e caminhos de loop de treinamento personalizados, e você pode escolher aquele que atende às suas necessidades. Seções diferentes de "Treinando com X" são aplicáveis ​​a ambos os caminhos.

pip install portpicker

Configuração do cluster

Conforme mencionado acima, um cluster de treinamento de servidor de parâmetros requer uma tarefa de coordenador que executa seu programa de treinamento, um ou vários workers e tarefas de servidor de parâmetros que executam servidores tf.distribute.Server — tf.distribute.Server — e possivelmente uma tarefa de avaliação adicional que executa avaliação secundária (veja a seção de avaliação do carro lateral abaixo). Os requisitos para configurá-los são:

  • A tarefa do coordenador precisa conhecer os endereços e as portas de todos os outros servidores TensorFlow, exceto o avaliador.
  • Os workers e os servidores de parâmetros precisam saber em qual porta eles precisam escutar. Para simplificar, geralmente você pode passar as informações completas do cluster ao criar servidores TensorFlow nessas tarefas.
  • A tarefa do avaliador não precisa conhecer a configuração do cluster de treinamento. Se isso acontecer, ele não deve tentar se conectar ao cluster de treinamento.
  • Os workers e os servidores de parâmetros devem ter tipos de tarefas como "worker" e "ps" , respectivamente. O coordenador deve usar "chief" como o tipo de tarefa por motivos de legado.

Neste tutorial, você criará um cluster em processo para que todo o treinamento do servidor de parâmetros possa ser executado no Colab. Você aprenderá como configurar clusters reais em uma seção posterior.

Cluster em processo

Você começará criando vários servidores TensorFlow com antecedência e se conectará a eles posteriormente. Observe que isso é apenas para fins de demonstração deste tutorial, e em treinamento real os servidores serão iniciados em máquinas "worker" e "ps" .

def create_in_process_cluster(num_workers, num_ps):
  """Creates and starts local servers and returns the cluster_resolver."""
  worker_ports = [portpicker.pick_unused_port() for _ in range(num_workers)]
  ps_ports = [portpicker.pick_unused_port() for _ in range(num_ps)]

  cluster_dict = {}
  cluster_dict["worker"] = ["localhost:%s" % port for port in worker_ports]
  if num_ps > 0:
    cluster_dict["ps"] = ["localhost:%s" % port for port in ps_ports]

  cluster_spec = tf.train.ClusterSpec(cluster_dict)

  # Workers need some inter_ops threads to work properly.
  worker_config = tf.compat.v1.ConfigProto()
  if multiprocessing.cpu_count() < num_workers + 1:
    worker_config.inter_op_parallelism_threads = num_workers + 1

  for i in range(num_workers):
    tf.distribute.Server(
        cluster_spec,
        job_name="worker",
        task_index=i,
        config=worker_config,
        protocol="grpc")

  for i in range(num_ps):
    tf.distribute.Server(
        cluster_spec,
        job_name="ps",
        task_index=i,
        protocol="grpc")

  cluster_resolver = tf.distribute.cluster_resolver.SimpleClusterResolver(
      cluster_spec, rpc_layer="grpc")
  return cluster_resolver

# Set the environment variable to allow reporting worker and ps failure to the
# coordinator. This is a workaround and won't be necessary in the future.
os.environ["GRPC_FAIL_FAST"] = "use_caller"

NUM_WORKERS = 3
NUM_PS = 2
cluster_resolver = create_in_process_cluster(NUM_WORKERS, NUM_PS)

A configuração de cluster em processo é frequentemente usada em testes de unidade, como aqui .

Outra opção para teste local é iniciar processos na máquina local - confira Treinamento de vários trabalhadores com Keras para obter um exemplo dessa abordagem.

Instanciar um ParameterServerStrategy

Antes de mergulhar no código de treinamento, vamos instanciar um objeto ParameterServerStrategy . Observe que isso é necessário independentemente de você estar procedendo com Model.fit ou um loop de treinamento personalizado. O argumento variable_partitioner será explicado na seção Variable sharding .

variable_partitioner = (
    tf.distribute.experimental.partitioners.MinSizePartitioner(
        min_shard_bytes=(256 << 10),
        max_shards=NUM_PS))

strategy = tf.distribute.experimental.ParameterServerStrategy(
    cluster_resolver,
    variable_partitioner=variable_partitioner)
INFO:tensorflow:`tf.distribute.experimental.ParameterServerStrategy` is initialized with cluster_spec: ClusterSpec({'ps': ['localhost:19069', 'localhost:19342'], 'worker': ['localhost:18007', 'localhost:20252', 'localhost:23613']})
INFO:tensorflow:ParameterServerStrategyV2 is now connecting to cluster with cluster_spec: ClusterSpec({'ps': ['localhost:19069', 'localhost:19342'], 'worker': ['localhost:18007', 'localhost:20252', 'localhost:23613']})
INFO:tensorflow:ParameterServerStrategy (CentralStorageStrategy if you are using a single machine) with compute_devices = ['/job:chief/replica:0/task:0/device:GPU:0'], variable_device = '/job:chief/replica:0/task:0/device:GPU:0'
INFO:tensorflow:Number of GPUs on workers: 1

Para usar GPUs para treinamento, aloque GPUs visíveis para cada trabalhador. O ParameterServerStrategy usará todas as GPUs disponíveis em cada trabalhador, com a restrição de que todos os trabalhadores devem ter o mesmo número de GPUs disponíveis.

Fragmentação variável

A fragmentação de variável refere-se à divisão de uma variável em várias variáveis ​​menores, que são chamadas de fragmentos . A fragmentação variável pode ser útil para distribuir a carga da rede ao acessar esses fragmentos. Também é útil distribuir computação e armazenamento de uma variável normal em vários servidores de parâmetros.

Para habilitar a fragmentação de variável, você pode passar um variable_partitioner ao construir um objeto ParameterServerStrategy . O variable_partitioner será invocado sempre que uma variável for criada e espera-se que retorne o número de shards ao longo de cada dimensão da variável. Alguns variable_partitioner s prontos para uso são fornecidos, como tf.distribute.experimental.partitioners.MinSizePartitioner . Recomenda-se usar particionadores baseados em tamanho, como tf.distribute.experimental.partitioners.MinSizePartitioner para evitar particionar variáveis ​​pequenas, que podem ter um impacto negativo na velocidade de treinamento do modelo.

Quando um variable_partitioner é passado e se você criar uma variável diretamente sob strategy.scope() , ela se tornará um tipo de contêiner com uma propriedade variables que fornece acesso à lista de shards. Na maioria dos casos, esse contêiner será convertido automaticamente em um tensor concatenando todos os shards. Como resultado, ela pode ser usada como uma variável normal. Por outro lado, alguns métodos do TensorFlow, como tf.nn.embedding_lookup , fornecem implementação eficiente para esse tipo de contêiner e, nesses métodos, a concatenação automática será evitada.

Consulte os documentos da API de tf.distribute.experimental.ParameterServerStrategy para obter mais detalhes.

Treinamento com Model.fit

O Keras fornece uma API de treinamento fácil de usar via Model.fit que lida com o loop de treinamento sob o capô, com a flexibilidade do train_step substituível e retornos de chamada, que fornecem funcionalidades como salvar pontos de verificação ou salvar resumos para o TensorBoard. Com Model.fit , o mesmo código de treinamento pode ser usado para outras estratégias com uma simples troca do objeto de estratégia.

Dados de entrada

Model.fit com treinamento de servidor de parâmetros requer que os dados de entrada sejam fornecidos em um callable que receba um único argumento do tipo tf.distribute.InputContext e retorne um tf.data.Dataset . Em seguida, crie um objeto tf.keras.utils.experimental.DatasetCreator que receba tal callable e um objeto tf.distribute.InputOptions opcional por meio do argumento input_options .

Observe que é recomendável embaralhar e repetir os dados com o treinamento do servidor de parâmetros e especificar steps_per_epoch na chamada de fit para que a biblioteca conheça os limites da época.

Consulte o tutorial de entrada distribuída para obter mais informações sobre o argumento InputContext .

def dataset_fn(input_context):
  global_batch_size = 64
  batch_size = input_context.get_per_replica_batch_size(global_batch_size)

  x = tf.random.uniform((10, 10))
  y = tf.random.uniform((10,))

  dataset = tf.data.Dataset.from_tensor_slices((x, y)).shuffle(10).repeat()
  dataset = dataset.shard(
      input_context.num_input_pipelines,
      input_context.input_pipeline_id)
  dataset = dataset.batch(batch_size)
  dataset = dataset.prefetch(2)

  return dataset

dc = tf.keras.utils.experimental.DatasetCreator(dataset_fn)

O código em dataset_fn será invocado no dispositivo de entrada, que geralmente é a CPU, em cada uma das máquinas de trabalho.

Construção e compilação de modelos

Agora, você criará um tf.keras.Model — um modelo tf.keras.models.Sequential trivial para fins de demonstração — seguido por uma chamada Model.compile para incorporar componentes, como um otimizador, métricas ou parâmetros como steps_per_execution :

with strategy.scope():
  model = tf.keras.models.Sequential([tf.keras.layers.Dense(10)])

  model.compile(tf.keras.optimizers.SGD(), loss='mse', steps_per_execution=10)

Retorno de chamadas e treinamento

Antes de chamar model.fit para o treinamento real, vamos preparar os retornos de chamada necessários para tarefas comuns, como:

  • ModelCheckpoint : para salvar os pesos do modelo.
  • BackupAndRestore : para garantir que o progresso do treinamento seja automaticamente copiado e recuperado se o cluster apresentar indisponibilidade (como anulação ou preempção); ou
  • TensorBoard : para salvar os relatórios de progresso em arquivos de resumo, que são visualizados na ferramenta TensorBoard.
working_dir = '/tmp/my_working_dir'
log_dir = os.path.join(working_dir, 'log')
ckpt_filepath = os.path.join(working_dir, 'ckpt')
backup_dir = os.path.join(working_dir, 'backup')

callbacks = [
    tf.keras.callbacks.TensorBoard(log_dir=log_dir),
    tf.keras.callbacks.ModelCheckpoint(filepath=ckpt_filepath),
    tf.keras.callbacks.BackupAndRestore(backup_dir=backup_dir),
]

model.fit(dc, epochs=5, steps_per_epoch=20, callbacks=callbacks)
Epoch 1/5
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
2022-01-26 05:32:01.399347: W tensorflow/python/util/util.cc:368] Sets are not currently considered sequences, but this may change in the future, so consider avoiding using them.
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
20/20 - 4s - loss: 0.5761 - 4s/epoch - 185ms/step
Epoch 2/5
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
20/20 - 1s - loss: 0.4423 - 561ms/epoch - 28ms/step
Epoch 3/5
WARNING:tensorflow:5 out of the last 5 calls to <function MultiDeviceSaver.save.<locals>.tf_function_save at 0x7f89783e7560> triggered tf.function retracing. Tracing is expensive and the excessive number of tracings could be due to (1) creating @tf.function repeatedly in a loop, (2) passing tensors with different shapes, (3) passing Python objects instead of tensors. For (1), please define your @tf.function outside of the loop. For (2), @tf.function has experimental_relax_shapes=True option that relaxes argument shapes that can avoid unnecessary retracing. For (3), please refer to https://www.tensorflow.org/guide/function#controlling_retracing and https://www.tensorflow.org/api_docs/python/tf/function for  more details.
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
WARNING:tensorflow:6 out of the last 6 calls to <function MultiDeviceSaver.save.<locals>.tf_function_save at 0x7f897851f050> triggered tf.function retracing. Tracing is expensive and the excessive number of tracings could be due to (1) creating @tf.function repeatedly in a loop, (2) passing tensors with different shapes, (3) passing Python objects instead of tensors. For (1), please define your @tf.function outside of the loop. For (2), @tf.function has experimental_relax_shapes=True option that relaxes argument shapes that can avoid unnecessary retracing. For (3), please refer to https://www.tensorflow.org/guide/function#controlling_retracing and https://www.tensorflow.org/api_docs/python/tf/function for  more details.
20/20 - 0s - loss: 0.3983 - 392ms/epoch - 20ms/step
Epoch 4/5
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
20/20 - 1s - loss: 0.3592 - 507ms/epoch - 25ms/step
Epoch 5/5
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
20/20 - 0s - loss: 0.2705 - 384ms/epoch - 19ms/step
<keras.callbacks.History at 0x7f89984ca890>

Uso direto com ClusterCoordinator (opcional)

Mesmo se você escolher o caminho de treinamento Model.fit , você pode opcionalmente instanciar um objeto tf.distribute.experimental.coordinator.ClusterCoordinator para agendar outras funções que você gostaria que fossem executadas nos workers. Consulte a seção Treinamento com um loop de treinamento personalizado para obter mais detalhes e exemplos.

Treinamento com um loop de treinamento personalizado

O uso de loops de treinamento personalizados com tf.distribute.Strategy oferece grande flexibilidade para definir loops de treinamento. Com o ParameterServerStrategy definido acima (como strategy ), você usará um tf.distribute.experimental.coordinator.ClusterCoordinator para despachar a execução das etapas de treinamento para trabalhadores remotos.

Em seguida, você criará um modelo, definirá um conjunto de dados e uma função de etapa, como fez no loop de treinamento com outros tf.distribute.Strategy s. Você pode encontrar mais detalhes no tutorial Treinamento personalizado com tf.distribute.Strategy .

Para garantir uma pré-busca eficiente do conjunto de dados, use as APIs de criação de conjunto de dados distribuídas recomendadas mencionadas na seção Etapas de treinamento de despacho para funcionários remotos abaixo. Além disso, certifique-se de chamar Strategy.run dentro do worker_fn para aproveitar ao máximo as GPUs alocadas aos trabalhadores. As demais etapas são as mesmas para treinamento com ou sem GPUs.

Vamos criar esses componentes nas seguintes etapas:

Configure os dados

Primeiro, escreva uma função que crie um conjunto de dados que inclua a lógica de pré-processamento implementada pelas camadas de pré-processamento do Keras .

Você criará essas camadas fora do dataset_fn mas aplicará a transformação dentro do dataset_fn , pois envolverá o dataset_fn em um tf.function , que não permite que variáveis ​​sejam criadas dentro dele.

feature_vocab = [
    "avenger", "ironman", "batman", "hulk", "spiderman", "kingkong", "wonder_woman"
]
label_vocab = ["yes", "no"]

with strategy.scope():
  feature_lookup_layer = tf.keras.layers.StringLookup(
      vocabulary=feature_vocab,
      mask_token=None)
  label_lookup_layer = tf.keras.layers.StringLookup(
      vocabulary=label_vocab,
      num_oov_indices=0,
      mask_token=None)

  raw_feature_input = tf.keras.layers.Input(
      shape=(3,),
      dtype=tf.string,
      name="feature")
  feature_id_input = feature_lookup_layer(raw_feature_input)
  feature_preprocess_stage = tf.keras.Model(
      {"features": raw_feature_input},
      feature_id_input)

  raw_label_input = tf.keras.layers.Input(
      shape=(1,),
      dtype=tf.string,
      name="label")
  label_id_input = label_lookup_layer(raw_label_input)

  label_preprocess_stage = tf.keras.Model(
      {"label": raw_label_input},
      label_id_input)
/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/numpy/core/numeric.py:2446: FutureWarning: elementwise comparison failed; returning scalar instead, but in the future will perform elementwise comparison
  return bool(asarray(a1 == a2).all())

Gere exemplos de brinquedos em um conjunto de dados:

def feature_and_label_gen(num_examples=200):
  examples = {"features": [], "label": []}
  for _ in range(num_examples):
    features = random.sample(feature_vocab, 3)
    label = ["yes"] if "avenger" in features else ["no"]
    examples["features"].append(features)
    examples["label"].append(label)
  return examples

examples = feature_and_label_gen()

Em seguida, crie o conjunto de dados de treinamento envolvido em um dataset_fn :

def dataset_fn(_):
  raw_dataset = tf.data.Dataset.from_tensor_slices(examples)

  train_dataset = raw_dataset.map(
      lambda x: (
          {"features": feature_preprocess_stage(x["features"])},
          label_preprocess_stage(x["label"])
      )).shuffle(200).batch(32).repeat()
  return train_dataset

Construir o modelo

Em seguida, crie o modelo e outros objetos. Certifique-se de criar todas as variáveis ​​em strategy.scope .

# These variables created under the `strategy.scope` will be placed on parameter
# servers in a round-robin fashion.
with strategy.scope():
  # Create the model. The input needs to be compatible with Keras processing layers.
  model_input = tf.keras.layers.Input(
      shape=(3,), dtype=tf.int64, name="model_input")

  emb_layer = tf.keras.layers.Embedding(
      input_dim=len(feature_lookup_layer.get_vocabulary()), output_dim=16384)
  emb_output = tf.reduce_mean(emb_layer(model_input), axis=1)
  dense_output = tf.keras.layers.Dense(units=1, activation="sigmoid")(emb_output)
  model = tf.keras.Model({"features": model_input}, dense_output)

  optimizer = tf.keras.optimizers.RMSprop(learning_rate=0.1)
  accuracy = tf.keras.metrics.Accuracy()

Vamos confirmar que o uso de FixedShardsPartitioner dividiu todas as variáveis ​​em dois shards e cada shard foi atribuído a diferentes servidores de parâmetros:

assert len(emb_layer.weights) == 2
assert emb_layer.weights[0].shape == (4, 16384)
assert emb_layer.weights[1].shape == (4, 16384)
assert emb_layer.weights[0].device == "/job:ps/replica:0/task:0/device:CPU:0"
assert emb_layer.weights[1].device == "/job:ps/replica:0/task:1/device:CPU:0"

Defina a etapa de treinamento

Terceiro, crie a etapa de treinamento envolvida em um tf.function :

@tf.function
def step_fn(iterator):

  def replica_fn(batch_data, labels):
    with tf.GradientTape() as tape:
      pred = model(batch_data, training=True)
      per_example_loss = tf.keras.losses.BinaryCrossentropy(
              reduction=tf.keras.losses.Reduction.NONE)(labels, pred)
      loss = tf.nn.compute_average_loss(per_example_loss)
      gradients = tape.gradient(loss, model.trainable_variables)

    optimizer.apply_gradients(zip(gradients, model.trainable_variables))

    actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
    accuracy.update_state(labels, actual_pred)
    return loss

  batch_data, labels = next(iterator)
  losses = strategy.run(replica_fn, args=(batch_data, labels))
  return strategy.reduce(tf.distribute.ReduceOp.SUM, losses, axis=None)

Na função de etapa de treinamento acima, chamar Strategy.run e Strategy.reduce no step_fn pode oferecer suporte a várias GPUs por trabalhador. Se os trabalhadores tiverem GPUs alocadas, o Strategy.run distribuirá os conjuntos de dados em várias réplicas.

Enviar etapas de treinamento para trabalhadores remotos

Depois que todos os cálculos forem definidos por ParameterServerStrategy , você usará a classe tf.distribute.experimental.coordinator.ClusterCoordinator para criar recursos e distribuir as etapas de treinamento para trabalhadores remotos.

Vamos primeiro criar um objeto ClusterCoordinator e passar o objeto de estratégia:

coordinator = tf.distribute.experimental.coordinator.ClusterCoordinator(strategy)

Em seguida, crie um conjunto de dados por trabalhador e um iterador. No per_worker_dataset_fn abaixo, é recomendado envolver o dataset_fn em strategy.distribute_datasets_from_function para permitir uma pré-busca eficiente para GPUs sem problemas.

@tf.function
def per_worker_dataset_fn():
  return strategy.distribute_datasets_from_function(dataset_fn)

per_worker_dataset = coordinator.create_per_worker_dataset(per_worker_dataset_fn)
per_worker_iterator = iter(per_worker_dataset)
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).

A etapa final é distribuir a computação para trabalhadores remotos usando ClusterCoordinator.schedule :

  • O método schedule enfileira um tf.function e retorna um RemoteValue semelhante ao futuro imediatamente. As funções enfileiradas serão despachadas para trabalhadores remotos em threads em segundo plano e o RemoteValue será preenchido de forma assíncrona.
  • O método join ( ClusterCoordinator.join ) pode ser usado para esperar até que todas as funções agendadas sejam executadas.
num_epoches = 4
steps_per_epoch = 5
for i in range(num_epoches):
  accuracy.reset_states()
  for _ in range(steps_per_epoch):
    coordinator.schedule(step_fn, args=(per_worker_iterator,))
  # Wait at epoch boundaries.
  coordinator.join()
  print ("Finished epoch %d, accuracy is %f." % (i, accuracy.result().numpy()))
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
Finished epoch 0, accuracy is 0.543750.
Finished epoch 1, accuracy is 0.543750.
Finished epoch 2, accuracy is 0.950000.
Finished epoch 3, accuracy is 1.000000.

Aqui está como você pode buscar o resultado de um RemoteValue :

loss = coordinator.schedule(step_fn, args=(per_worker_iterator,))
print ("Final loss is %f" % loss.fetch())
Final loss is 0.000000

Como alternativa, você pode iniciar todas as etapas e fazer algo enquanto aguarda a conclusão:

for _ in range(total_steps):
  coordinator.schedule(step_fn, args=(per_worker_iterator,))
while not coordinator.done():
  time.sleep(10)
  # Do something like logging metrics or writing checkpoints.

Para obter o fluxo de trabalho completo de treinamento e veiculação deste exemplo específico, confira este teste .

Mais sobre a criação de conjuntos de dados

O conjunto de dados no código acima é criado usando a API ClusterCoordinator.create_per_worker_dataset ). Ele cria um conjunto de dados por trabalhador e retorna um objeto de contêiner. Você pode chamar o método iter nele para criar um iterador por trabalhador. O iterador por trabalhador contém um iterador por trabalhador e a fatia correspondente de um trabalhador será substituída no argumento de entrada da função passada para o método ClusterCoordinator.schedule antes que a função seja executada em um trabalhador específico.

Atualmente, o método ClusterCoordinator.schedule assume que os workers são equivalentes e, portanto, assume que os conjuntos de dados em diferentes workers são os mesmos, exceto que podem ser embaralhados de forma diferente se contiverem uma operação Dataset.shuffle . Por isso, também é recomendável que os conjuntos de dados sejam repetidos indefinidamente e você agende um número finito de etapas em vez de depender do OutOfRangeError de um conjunto de dados.

Outra observação importante é que os conjuntos de dados tf.data não suportam serialização e desserialização implícitas entre os limites da tarefa. Portanto, é importante criar todo o conjunto de dados dentro da função passada para ClusterCoordinator.create_per_worker_dataset .

Avaliação

Há mais de uma maneira de definir e executar um loop de avaliação no treinamento distribuído. Cada um tem seus próprios prós e contras, conforme descrito abaixo. O método de avaliação em linha é recomendado se você não tiver preferência.

Avaliação em linha

Nesse método, o coordenador alterna entre treinamento e avaliação e, por isso, é chamado de avaliação inline .

Existem vários benefícios da avaliação em linha. Por exemplo:

  • Ele pode suportar grandes modelos de avaliação e conjuntos de dados de avaliação que uma única tarefa não pode conter.
  • Os resultados da avaliação podem ser usados ​​para tomar decisões para treinar a próxima época.

Existem duas maneiras de implementar a avaliação em linha: avaliação direta e avaliação distribuída.

  • Avaliação direta : Para modelos pequenos e conjuntos de dados de avaliação, o coordenador pode executar a avaliação diretamente no modelo distribuído com o conjunto de dados de avaliação no coordenador:
eval_dataset = tf.data.Dataset.from_tensor_slices(
      feature_and_label_gen(num_examples=16)).map(
          lambda x: (
              {"features": feature_preprocess_stage(x["features"])},
              label_preprocess_stage(x["label"])
          )).batch(8)

eval_accuracy = tf.keras.metrics.Accuracy()

for batch_data, labels in eval_dataset:
  pred = model(batch_data, training=False)
  actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
  eval_accuracy.update_state(labels, actual_pred)

print ("Evaluation accuracy: %f" % eval_accuracy.result())
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).
Evaluation accuracy: 1.000000
  • Avaliação distribuída : para grandes modelos ou conjuntos de dados que são inviáveis ​​de serem executados diretamente no coordenador, a tarefa do coordenador pode distribuir tarefas de avaliação para os trabalhadores por meio dos métodos ClusterCoordinator.schedule / ClusterCoordinator.join :
with strategy.scope():
  # Define the eval metric on parameter servers.
  eval_accuracy = tf.keras.metrics.Accuracy()

@tf.function
def eval_step(iterator):
  def replica_fn(batch_data, labels):
    pred = model(batch_data, training=False)
    actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
    eval_accuracy.update_state(labels, actual_pred)
  batch_data, labels = next(iterator)
  strategy.run(replica_fn, args=(batch_data, labels))

def eval_dataset_fn():
  return tf.data.Dataset.from_tensor_slices(
      feature_and_label_gen(num_examples=16)).map(
          lambda x: (
              {"features": feature_preprocess_stage(x["features"])},
              label_preprocess_stage(x["label"])
          )).shuffle(16).repeat().batch(8)

per_worker_eval_dataset = coordinator.create_per_worker_dataset(eval_dataset_fn)
per_worker_eval_iterator = iter(per_worker_eval_dataset)

eval_steps_per_epoch = 2
for _ in range(eval_steps_per_epoch):
  coordinator.schedule(eval_step, args=(per_worker_eval_iterator,))
coordinator.join()
print ("Evaluation accuracy: %f" % eval_accuracy.result())
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).
WARNING:tensorflow:1 GPUs are allocated per worker. Please use DistributedDataset by calling strategy.experimental_distribute_dataset or strategy.distribute_datasets_from_function to make best use of GPU resources
WARNING:tensorflow:1 GPUs are allocated per worker. Please use DistributedDataset by calling strategy.experimental_distribute_dataset or strategy.distribute_datasets_from_function to make best use of GPU resources
WARNING:tensorflow:1 GPUs are allocated per worker. Please use DistributedDataset by calling strategy.experimental_distribute_dataset or strategy.distribute_datasets_from_function to make best use of GPU resources
Evaluation accuracy: 1.000000

Avaliação do carro lateral

Outro método é chamado de avaliação de carro lateral, onde você cria uma tarefa de avaliador dedicada que lê repetidamente os pontos de verificação e executa a avaliação em um ponto de verificação mais recente. Ele permite que seu programa de treinamento termine mais cedo se você não precisar alterar seu ciclo de treinamento com base nos resultados da avaliação. No entanto, requer uma tarefa de avaliador adicional e checkpoints periódicos para desencadear a avaliação. A seguir está um possível loop de avaliação de side-car:

checkpoint_dir = ...
eval_model = ...
eval_data = ...
checkpoint = tf.train.Checkpoint(model=eval_model)

for latest_checkpoint in tf.train.checkpoints_iterator(
    checkpoint_dir):
  try:
    checkpoint.restore(latest_checkpoint).expect_partial()
  except (tf.errors.OpError,) as e:
    # checkpoint may be deleted by training when it is about to read it.
    continue

  # Optionally add callbacks to write summaries.
  eval_model.evaluate(eval_data)

  # Evaluation finishes when it has evaluated the last epoch.
  if latest_checkpoint.endswith('-{}'.format(train_epoches)):
    break

Aglomerados no mundo real

Em um ambiente de produção real, você executará todas as tarefas em diferentes processos em diferentes máquinas. A maneira mais simples de configurar informações de cluster em cada tarefa é definir variáveis ​​de ambiente "TF_CONFIG" e usar um tf.distribute.cluster_resolver.TFConfigClusterResolver para analisar "TF_CONFIG" .

Para obter uma descrição geral sobre as variáveis ​​de ambiente "TF_CONFIG" , consulte o guia de treinamento distribuído .

Se você iniciar suas tarefas de treinamento usando o Kubernetes ou outros modelos de configuração, é muito provável que esses modelos já tenham definido “TF_CONFIG" para você.

Defina a variável de ambiente "TF_CONFIG"

Suponha que você tenha 3 trabalhadores e 2 servidores de parâmetros, o "TF_CONFIG" do trabalhador 1 pode ser:

os.environ["TF_CONFIG"] = json.dumps({
    "cluster": {
        "worker": ["host1:port", "host2:port", "host3:port"],
        "ps": ["host4:port", "host5:port"],
        "chief": ["host6:port"]
    },
    "task": {"type": "worker", "index": 1}
})

O "TF_CONFIG" do avaliador pode ser:

os.environ["TF_CONFIG"] = json.dumps({
    "cluster": {
        "evaluator": ["host7:port"]
    },
    "task": {"type": "evaluator", "index": 0}
})

A parte "cluster" na string "TF_CONFIG" acima para o avaliador é opcional.

Se você usar o mesmo binário para todas as tarefas

Se você preferir executar todas essas tarefas usando um único binário, precisará permitir que seu programa se ramifique em diferentes funções no início:

cluster_resolver = tf.distribute.cluster_resolver.TFConfigClusterResolver()
if cluster_resolver.task_type in ("worker", "ps"):
  # Start a TensorFlow server and wait.
elif cluster_resolver.task_type == "evaluator":
  # Run side-car evaluation
else:
  # Run the coordinator.

O código a seguir inicia um servidor TensorFlow e aguarda:

# Set the environment variable to allow reporting worker and ps failure to the
# coordinator. This is a workaround and won't be necessary in the future.
os.environ["GRPC_FAIL_FAST"] = "use_caller"

server = tf.distribute.Server(
    cluster_resolver.cluster_spec(),
    job_name=cluster_resolver.task_type,
    task_index=cluster_resolver.task_id,
    protocol=cluster_resolver.rpc_layer or "grpc",
    start=True)
server.join()

Como lidar com falha de tarefa

Falha do trabalhador

tf.distribute.experimental.coordinator.ClusterCoordinator ou Model.fit fornecem tolerância a falhas integrada para falhas do trabalhador. Após a recuperação do trabalhador, a função de conjunto de dados fornecida anteriormente (para ClusterCoordinator.create_per_worker_dataset para um loop de treinamento personalizado ou tf.keras.utils.experimental.DatasetCreator para Model.fit ) será invocada nos trabalhadores para recriar os conjuntos de dados.

Falha do servidor de parâmetros ou do coordenador

No entanto, quando o coordenador vê um erro do servidor de parâmetros, ele gerará um UnavailableError ou AbortedError imediatamente. Você pode reiniciar o coordenador neste caso. O próprio coordenador também pode ficar indisponível. Portanto, algumas ferramentas são recomendadas para não perder o progresso do treinamento:

  • Para Model.fit , você deve usar um retorno de chamada BackupAndRestore , que trata o progresso de salvamento e restauração automaticamente. Consulte a seção de retornos de chamada e treinamento acima para obter um exemplo.

  • Para um loop de treinamento personalizado, você deve verificar as variáveis ​​do modelo periodicamente e carregar as variáveis ​​do modelo de um ponto de verificação, se houver, antes do início do treinamento. O progresso do treinamento pode ser inferido aproximadamente a partir de optimizer.iterations se um otimizador tiver um ponto de verificação:

checkpoint_manager = tf.train.CheckpointManager(
    tf.train.Checkpoint(model=model, optimizer=optimizer),
    checkpoint_dir,
    max_to_keep=3)
if checkpoint_manager.latest_checkpoint:
  checkpoint = checkpoint_manager.checkpoint
  checkpoint.restore(
      checkpoint_manager.latest_checkpoint).assert_existing_objects_matched()

global_steps = int(optimizer.iterations.numpy())
starting_epoch = global_steps // steps_per_epoch

for _ in range(starting_epoch, num_epoches):
  for _ in range(steps_per_epoch):
    coordinator.schedule(step_fn, args=(per_worker_iterator,))
  coordinator.join()
  checkpoint_manager.save()

Buscando um RemoteValue

A busca de um RemoteValue é garantida com sucesso se uma função for executada com sucesso. Isso ocorre porque atualmente o valor de retorno é copiado imediatamente para o coordenador após a execução de uma função. Se houver alguma falha do trabalhador durante a cópia, a função será repetida em outro trabalhador disponível. Portanto, se você deseja otimizar o desempenho, pode agendar funções sem um valor de retorno.

Relatório de erros

Uma vez que o coordenador vê um erro como UnavailableError de servidores de parâmetros ou outros erros de aplicativo como um InvalidArgument de tf.debugging.check_numerics , ele cancelará todas as funções pendentes e enfileiradas antes de gerar o erro. Buscar seus RemoteValue correspondentes gerará um CancelledError .

Depois que um erro é gerado, o coordenador não levantará o mesmo erro ou qualquer erro de funções canceladas.

Melhoria de desempenho

Existem vários motivos possíveis se você encontrar problemas de desempenho ao treinar com ParameterServerStrategy e ClusterResolver .

Um motivo comum é que os servidores de parâmetros têm carga desequilibrada e alguns servidores de parâmetros muito carregados atingiram a capacidade. Também pode haver várias causas-raiz. Alguns métodos simples para mitigar esse problema são:

  1. Fragmente suas variáveis ​​de modelo grandes especificando um variable_partitioner ao construir um ParameterServerStrategy .
  2. Evite criar uma variável de ponto de acesso exigida por todos os servidores de parâmetros em uma única etapa, se possível. Por exemplo, use uma taxa de aprendizado constante ou subclasse tf.keras.optimizers.schedules.LearningRateSchedule em otimizadores, pois o comportamento padrão é que a taxa de aprendizado se tornará uma variável colocada em um servidor de parâmetros específico e solicitada por todos os outros servidores de parâmetros em cada etapa .
  3. Embaralhe seus grandes vocabulários antes de passá-los para as camadas de pré-processamento do Keras.

Outra possível razão para problemas de desempenho é o coordenador. Sua primeira implementação de schedule / join é baseada em Python e, portanto, pode ter sobrecarga de encadeamento. Também a latência entre o coordenador e os trabalhadores pode ser grande. Se esse é o caso,

  • Para Model.fit , você pode definir o argumento steps_per_execution fornecido em Model.compile para um valor maior que 1.

  • Para um loop de treinamento personalizado, você pode agrupar várias etapas em um único tf.function :

steps_per_invocation = 10

@tf.function
def step_fn(iterator):
  for _ in range(steps_per_invocation):
    features, labels = next(iterator)
    def replica_fn(features, labels):
      ...

    strategy.run(replica_fn, args=(features, labels))

Como a biblioteca é otimizada ainda mais, esperamos que a maioria dos usuários não precise empacotar manualmente as etapas no futuro.

Além disso, um pequeno truque para melhorar o desempenho é agendar funções sem um valor de retorno, conforme explicado na seção de tratamento de falhas de tarefas acima.

Limitações conhecidas

A maioria das limitações conhecidas já está coberta nas seções acima. Esta seção fornece um resumo.

ParameterServerStrategy geral

  • os.environment["grpc_fail_fast"]="use_caller" é necessário em todas as tarefas, incluindo o coordenador, para que a tolerância a falhas funcione corretamente.
  • O treinamento do servidor de parâmetros síncrono não é suportado.
  • Geralmente, é necessário agrupar várias etapas em uma única função para obter o desempenho ideal.
  • Não há suporte para carregar um saved_model via tf.saved_model.load contendo variáveis ​​fragmentadas. Observe que o carregamento de tal saved_model usando o TensorFlow Serving deve funcionar.
  • Não há suporte para carregar um ponto de verificação contendo variáveis ​​de slot do otimizador fragmentado em um número diferente de fragmentos.
  • Não há suporte para recuperação de falha do servidor de parâmetros sem reiniciar a tarefa do coordenador.
  • O uso de tf.lookup.StaticHashTable (que é comumente empregado por algumas camadas de pré-processamento Keras, como tf.keras.layers.IntegerLookup , tf.keras.layers.StringLookup e tf.keras.layers.TextVectorization ) resulta em recursos colocados em o coordenador neste momento com treinamento do servidor de parâmetros. Isso tem implicações de desempenho para RPCs de pesquisa de trabalhadores para o coordenador. Esta é uma alta prioridade atual para resolver.

Especificações do Model.fit

  • O argumento steps_per_epoch é necessário em Model.fit . Você pode selecionar um valor que forneça intervalos apropriados em uma época.
  • O ParameterServerStrategy não tem suporte para retornos de chamada personalizados com chamadas em nível de lote por motivos de desempenho. Você deve converter essas chamadas em chamadas de nível de época com steps_per_epoch adequadamente escolhidos, para que sejam chamadas a cada número de etapas de steps_per_epoch . Os retornos de chamada integrados não são afetados: suas chamadas em nível de lote foram modificadas para serem de alto desempenho. O suporte a chamadas em nível de lote para ParameterServerStrategy está sendo planejado.
  • Pela mesma razão, ao contrário de outras estratégias, a barra de progresso e as métricas são registradas apenas nos limites da época.
  • run_eagerly não é suportado.

Especificidades do loop de treinamento personalizado