Treinamento distribuído com TensorFlow

Veja no TensorFlow.org Executar no Google Colab Ver fonte no GitHub Baixar caderno

Visão geral

tf.distribute.Strategy é uma API do TensorFlow para distribuir treinamento em várias GPUs, várias máquinas ou TPUs. Usando esta API, você pode distribuir seus modelos existentes e código de treinamento com alterações mínimas de código.

tf.distribute.Strategy foi projetado com estes objetivos principais em mente:

  • Fácil de usar e oferece suporte a vários segmentos de usuários, incluindo pesquisadores, engenheiros de aprendizado de máquina etc.
  • Forneça um bom desempenho fora da caixa.
  • Fácil alternância entre estratégias.

Você pode distribuir treinamento usando tf.distribute.Strategy com uma API de alto nível como Keras Model.fit , bem como loops de treinamento personalizados (e, em geral, qualquer computação usando TensorFlow).

No TensorFlow 2.x, você pode executar seus programas avidamente ou em um gráfico usando tf.function . tf.distribute.Strategy pretende oferecer suporte a esses dois modos de execução, mas funciona melhor com tf.function . O modo Eager é recomendado apenas para fins de depuração e não é compatível com tf.distribute.TPUStrategy . Embora o treinamento seja o foco deste guia, essa API também pode ser usada para distribuir avaliação e previsão em diferentes plataformas.

Você pode usar tf.distribute.Strategy com poucas alterações em seu código, porque os componentes subjacentes do TensorFlow foram alterados para se tornarem cientes da estratégia. Isso inclui variáveis, camadas, modelos, otimizadores, métricas, resumos e pontos de verificação.

Neste guia, você aprenderá sobre vários tipos de estratégias e como usá-las em diferentes situações. Para saber como depurar problemas de desempenho, confira o guia de desempenho Optimize TensorFlow GPU .

Configurar o TensorFlow

import tensorflow as tf

Tipos de estratégias

tf.distribute.Strategy pretende cobrir vários casos de uso em diferentes eixos. Algumas dessas combinações são atualmente suportadas e outras serão adicionadas no futuro. Alguns desses eixos são:

  • Treinamento síncrono vs assíncrono: Estas são duas maneiras comuns de distribuir treinamento com paralelismo de dados. No treinamento de sincronização, todos os trabalhadores treinam em diferentes fatias de dados de entrada em sincronia e agregam gradientes em cada etapa. No treinamento assíncrono, todos os trabalhadores estão treinando independentemente sobre os dados de entrada e atualizando as variáveis ​​de forma assíncrona. Normalmente, o treinamento de sincronização é suportado por meio de redução total e assíncrona por meio da arquitetura do servidor de parâmetros.
  • Plataforma de hardware: convém dimensionar seu treinamento para várias GPUs em uma máquina ou várias máquinas em uma rede (com 0 ou mais GPUs cada) ou em Cloud TPUs.

Para dar suporte a esses casos de uso, o TensorFlow tem MirroredStrategy , TPUStrategy , MultiWorkerMirroredStrategy , ParameterServerStrategy , CentralStorageStrategy , além de outras estratégias disponíveis. A próxima seção explica quais deles têm suporte em quais cenários no TensorFlow. Aqui está uma visão geral rápida:

API de treinamento MirroredStrategy TPUStrategy MultiWorkerMirroredStrategy CentralStorageStrategy ParameterServerStrategy
Keras Model.fit Compatível Compatível Compatível Suporte experimental Suporte experimental
Loop de treinamento personalizado Compatível Compatível Compatível Suporte experimental Suporte experimental
API do estimador Suporte limitado Não suportado Suporte limitado Suporte limitado Suporte limitado

Estratégia espelhada

tf.distribute.MirroredStrategy suporta treinamento distribuído síncrono em várias GPUs em uma máquina. Ele cria uma réplica por dispositivo GPU. Cada variável no modelo é espelhada em todas as réplicas. Juntas, essas variáveis ​​formam uma única variável conceitual chamada MirroredVariable . Essas variáveis ​​são mantidas em sincronia entre si aplicando atualizações idênticas.

Algoritmos eficientes de redução total são usados ​​para comunicar as atualizações de variáveis ​​entre os dispositivos. All-reduce agrega tensores em todos os dispositivos adicionando-os e disponibilizando-os em cada dispositivo. É um algoritmo fundido que é muito eficiente e pode reduzir significativamente a sobrecarga de sincronização. Existem muitos algoritmos e implementações de redução total disponíveis, dependendo do tipo de comunicação disponível entre os dispositivos. Por padrão, ele usa a NVIDIA Collective Communication Library ( NCCL ) como a implementação de redução total. Você pode escolher entre algumas outras opções ou escrever a sua própria.

Aqui está a maneira mais simples de criar MirroredStrategy :

mirrored_strategy = tf.distribute.MirroredStrategy()
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)

Isso criará uma instância MirroredStrategy , que usará todas as GPUs visíveis para o TensorFlow e NCCL, como a comunicação entre dispositivos.

Se você deseja usar apenas algumas das GPUs em sua máquina, pode fazê-lo assim:

mirrored_strategy = tf.distribute.MirroredStrategy(devices=["/gpu:0", "/gpu:1"])
WARNING:tensorflow:Some requested devices in `tf.distribute.Strategy` are not visible to TensorFlow: /job:localhost/replica:0/task:0/device:GPU:1,/job:localhost/replica:0/task:0/device:GPU:0
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:GPU:1')

Se você deseja substituir a comunicação entre dispositivos, pode fazê-lo usando o argumento cross_device_ops fornecendo uma instância de tf.distribute.CrossDeviceOps . Atualmente, tf.distribute.HierarchicalCopyAllReduce e tf.distribute.ReductionToOneDevice são duas opções diferentes de tf.distribute.NcclAllReduce , que é o padrão.

mirrored_strategy = tf.distribute.MirroredStrategy(
    cross_device_ops=tf.distribute.HierarchicalCopyAllReduce())
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)

Estratégia TPU

tf.distribute.TPUStrategy permite que você execute seu treinamento do TensorFlow em unidades de processamento de tensor (TPUs) . As TPUs são ASICs especializadas do Google projetadas para acelerar drasticamente as cargas de trabalho de aprendizado de máquina. Eles estão disponíveis no Google Colab , no TPU Research Cloud e no Cloud TPU .

Em termos de arquitetura de treinamento distribuído, TPUStrategy é o mesmo MirroredStrategy — ele implementa treinamento distribuído síncrono. As TPUs fornecem sua própria implementação de operações de redução total e outras operações coletivas eficientes em vários núcleos de TPU, que são usados ​​no TPUStrategy .

Aqui está como você instanciaria TPUStrategy :

cluster_resolver = tf.distribute.cluster_resolver.TPUClusterResolver(
    tpu=tpu_address)
tf.config.experimental_connect_to_cluster(cluster_resolver)
tf.tpu.experimental.initialize_tpu_system(cluster_resolver)
tpu_strategy = tf.distribute.TPUStrategy(cluster_resolver)

A instância TPUClusterResolver ajuda a localizar as TPUs. No Colab, você não precisa especificar nenhum argumento para ele.

Se você quiser usar isso para Cloud TPUs:

  • Você deve especificar o nome do seu recurso TPU no argumento tpu .
  • Você deve inicializar o sistema TPU explicitamente no início do programa. Isso é necessário antes que as TPUs possam ser usadas para computação. A inicialização do sistema TPU também elimina a memória da TPU, portanto, é importante concluir esta etapa primeiro para evitar a perda de estado.

MultiWorkerMirroredStrategy

tf.distribute.MultiWorkerMirroredStrategy é muito semelhante a MirroredStrategy . Ele implementa treinamento distribuído síncrono em vários trabalhadores, cada um com potencialmente várias GPUs. Semelhante ao tf.distribute.MirroredStrategy , ele cria cópias de todas as variáveis ​​no modelo em cada dispositivo em todos os trabalhadores.

Aqui está a maneira mais simples de criar MultiWorkerMirroredStrategy :

strategy = tf.distribute.MultiWorkerMirroredStrategy()
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled.
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:GPU:0',), communication = CommunicationImplementation.AUTO

MultiWorkerMirroredStrategy tem duas implementações para comunicações entre dispositivos. CommunicationImplementation.RING é baseado em RPC e suporta CPUs e GPUs. CommunicationImplementation.NCCL usa NCCL e fornece desempenho de última geração em GPUs, mas não suporta CPUs. CollectiveCommunication.AUTO adia a escolha para o Tensorflow. Você pode especificá-los da seguinte maneira:

communication_options = tf.distribute.experimental.CommunicationOptions(
    implementation=tf.distribute.experimental.CommunicationImplementation.NCCL)
strategy = tf.distribute.MultiWorkerMirroredStrategy(
    communication_options=communication_options)
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled.
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:GPU:0',), communication = CommunicationImplementation.NCCL

Uma das principais diferenças para obter o treinamento de vários trabalhadores, em comparação com o treinamento de várias GPUs, é a configuração de vários trabalhadores. A variável de ambiente 'TF_CONFIG' é a maneira padrão no TensorFlow de especificar a configuração do cluster para cada trabalhador que faz parte do cluster. Saiba mais na seção de configuração do TF_CONFIG deste documento.

Para obter mais detalhes sobre MultiWorkerMirroredStrategy , considere os seguintes tutoriais:

ParameterServerStrategy

O treinamento do servidor de parâmetros é um método comum de dados paralelos para aumentar o treinamento do modelo em várias máquinas. Um cluster de treinamento do servidor de parâmetros consiste em trabalhadores e servidores de parâmetros. As variáveis ​​são criadas em servidores de parâmetros e são lidas e atualizadas pelos trabalhadores em cada etapa. Confira o tutorial de treinamento do servidor de parâmetros para obter detalhes.

No TensorFlow 2, o treinamento do servidor de parâmetros usa uma arquitetura baseada em coordenador central por meio da classe tf.distribute.experimental.coordinator.ClusterCoordinator .

Nesta implementação, as tarefas do worker e parameter server executam tf.distribute.Server s que escutam as tarefas do coordenador. O coordenador cria recursos, despacha tarefas de treinamento, escreve pontos de verificação e lida com falhas de tarefas.

Na programação em execução no coordenador, você usará um objeto ParameterServerStrategy para definir uma etapa de treinamento e usará um ClusterCoordinator para despachar etapas de treinamento para trabalhadores remotos. Aqui está a maneira mais simples de criá-los:

strategy = tf.distribute.experimental.ParameterServerStrategy(
    tf.distribute.cluster_resolver.TFConfigClusterResolver(),
    variable_partitioner=variable_partitioner)
coordinator = tf.distribute.experimental.coordinator.ClusterCoordinator(
    strategy)

Para saber mais sobre ParameterServerStrategy , confira o treinamento do servidor de parâmetros com Keras Model.fit e um tutorial de loop de treinamento personalizado .

No TensorFlow 1, o ParameterServerStrategy está disponível apenas com um Estimator por meio do símbolo tf.compat.v1.distribute.experimental.ParameterServerStrategy .

Estratégia de armazenamento central

tf.distribute.experimental.CentralStorageStrategy também faz treinamento síncrono. As variáveis ​​não são espelhadas, em vez disso, são colocadas na CPU e as operações são replicadas em todas as GPUs locais. Se houver apenas uma GPU, todas as variáveis ​​e operações serão colocadas nessa GPU.

Crie uma instância de CentralStorageStrategy :

central_storage_strategy = tf.distribute.experimental.CentralStorageStrategy()
INFO:tensorflow:ParameterServerStrategy (CentralStorageStrategy if you are using a single machine) with compute_devices = ['/job:localhost/replica:0/task:0/device:GPU:0'], variable_device = '/job:localhost/replica:0/task:0/device:GPU:0'

Isso criará uma instância CentralStorageStrategy que usará todas as GPUs e CPU visíveis. A atualização das variáveis ​​nas réplicas será agregada antes de ser aplicada às variáveis.

Outras estratégias

Além das estratégias acima, há duas outras estratégias que podem ser úteis para prototipagem e depuração ao usar APIs tf.distribute .

Estratégia padrão

A Estratégia Padrão é uma estratégia de distribuição que está presente quando nenhuma estratégia de distribuição explícita está no escopo. Ele implementa a interface tf.distribute.Strategy , mas é um pass-through e não fornece distribuição real. Por exemplo, Strategy.run(fn) simplesmente chamará fn . O código escrito usando essa estratégia deve se comportar exatamente como o código escrito sem nenhuma estratégia. Você pode pensar nisso como uma estratégia "sem operação".

A Estratégia Padrão é um singleton – e não se pode criar mais instâncias dele. Ele pode ser obtido usando tf.distribute.get_strategy fora do escopo de qualquer estratégia explícita (a mesma API que pode ser usada para obter a estratégia atual dentro do escopo de uma estratégia explícita).

default_strategy = tf.distribute.get_strategy()

Essa estratégia serve a dois propósitos principais:

  • Ele permite escrever código de biblioteca com reconhecimento de distribuição incondicionalmente. Por exemplo, em tf.optimizer s você pode usar tf.distribute.get_strategy e usar essa estratégia para reduzir gradientes—ela sempre retornará um objeto de estratégia no qual você pode chamar a API Strategy.reduce .
# In optimizer or other library code
# Get currently active strategy
strategy = tf.distribute.get_strategy()
strategy.reduce("SUM", 1., axis=None)  # reduce some values
1.0
  • Semelhante ao código de biblioteca, ele pode ser usado para escrever programas de usuários finais para trabalhar com e sem estratégia de distribuição, sem exigir lógica condicional. Aqui está um trecho de código de exemplo que ilustra isso:
if tf.config.list_physical_devices('GPU'):
  strategy = tf.distribute.MirroredStrategy()
else:  # Use the Default Strategy
  strategy = tf.distribute.get_strategy()

with strategy.scope():
  # Do something interesting
  print(tf.Variable(1.))
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
MirroredVariable:{
  0: <tf.Variable 'Variable:0' shape=() dtype=float32, numpy=1.0>
}

OneDeviceStrategy

tf.distribute.OneDeviceStrategy é uma estratégia para colocar todas as variáveis ​​e computação em um único dispositivo especificado.

strategy = tf.distribute.OneDeviceStrategy(device="/gpu:0")

Essa estratégia é distinta da Estratégia Padrão de várias maneiras. Na estratégia padrão, a lógica de posicionamento da variável permanece inalterada quando comparada à execução do TensorFlow sem nenhuma estratégia de distribuição. Mas ao usar OneDeviceStrategy , todas as variáveis ​​criadas em seu escopo são colocadas explicitamente no dispositivo especificado. Além disso, quaisquer funções chamadas via OneDeviceStrategy.run também serão colocadas no dispositivo especificado.

A entrada distribuída por meio dessa estratégia será pré-buscada para o dispositivo especificado. Na Estratégia Padrão, não há distribuição de entrada.

Semelhante à estratégia padrão, essa estratégia também pode ser usada para testar seu código antes de alternar para outras estratégias que realmente distribuem para vários dispositivos/máquinas. Isso exercitará o mecanismo da estratégia de distribuição um pouco mais do que a Estratégia Padrão, mas não em toda a extensão do uso, por exemplo, MirroredStrategy ou TPUStrategy . Se você quiser um código que se comporte como se não houvesse uma estratégia, use a Estratégia Padrão.

Até agora você aprendeu sobre diferentes estratégias e como você pode instanciá-las. As próximas seções mostram as diferentes maneiras pelas quais você pode usá-los para distribuir seu treinamento.

Use tf.distribute.Strategy com Keras Model.fit

tf.distribute.Strategy está integrado ao tf.keras , que é a implementação do TensorFlow da especificação da API Keras . tf.keras é uma API de alto nível para construir e treinar modelos. Ao integrar o backend tf.keras , é fácil distribuir seu treinamento escrito na estrutura de treinamento Keras usando Model.fit .

Aqui está o que você precisa alterar no seu código:

  1. Crie uma instância do tf.distribute.Strategy apropriado.
  2. Mova a criação do modelo, otimizador e métricas Keras dentro do strategy.scope .

As estratégias de distribuição do TensorFlow são compatíveis com todos os tipos de modelos Keras — Sequential , Functional e subclassed .

Aqui está um trecho de código para fazer isso para um modelo Keras muito simples com uma camada Dense :

mirrored_strategy = tf.distribute.MirroredStrategy()

with mirrored_strategy.scope():
  model = tf.keras.Sequential([tf.keras.layers.Dense(1, input_shape=(1,))])

model.compile(loss='mse', optimizer='sgd')
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).

Este exemplo usa MirroredStrategy , para que você possa executá-lo em uma máquina com várias GPUs. strategy.scope() indica ao Keras qual estratégia usar para distribuir o treinamento. A criação de modelos/otimizadores/métricas dentro desse escopo permite que você crie variáveis ​​distribuídas em vez de variáveis ​​regulares. Uma vez que isso esteja configurado, você pode ajustar seu modelo como faria normalmente. MirroredStrategy cuida de replicar o treinamento do modelo nas GPUs disponíveis, agregando gradientes e muito mais.

dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(10)
model.fit(dataset, epochs=2)
model.evaluate(dataset)
Epoch 1/2
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
2021-10-26 01:27:56.527729: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:695] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorDataset/_2"
op: "TensorDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_FLOAT
    }
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 1
        }
      }
      shape {
        dim {
          size: 1
        }
      }
    }
  }
}
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
10/10 [==============================] - 3s 2ms/step - loss: 2.2552
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
Epoch 2/2
10/10 [==============================] - 0s 2ms/step - loss: 0.9968
2021-10-26 01:27:59.372113: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:695] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorDataset/_2"
op: "TensorDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_FLOAT
    }
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 1
        }
      }
      shape {
        dim {
          size: 1
        }
      }
    }
  }
}
10/10 [==============================] - 1s 2ms/step - loss: 0.6190
0.6190494298934937

Aqui, um tf.data.Dataset fornece a entrada de treinamento e avaliação. Você também pode usar matrizes NumPy:

import numpy as np

inputs, targets = np.ones((100, 1)), np.ones((100, 1))
model.fit(inputs, targets, epochs=2, batch_size=10)
Epoch 1/2
2021-10-26 01:28:00.609977: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:695] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Did not find a shardable source, walked to a node which is not a dataset: name: "FlatMapDataset/_9"
op: "FlatMapDataset"
input: "PrefetchDataset/_8"
attr {
  key: "Targuments"
  value {
    list {
    }
  }
}
attr {
  key: "f"
  value {
    func {
      name: "__inference_Dataset_flat_map_slice_batch_indices_997"
    }
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 10
        }
      }
    }
  }
}
attr {
  key: "output_types"
  value {
    list {
      type: DT_INT64
    }
  }
}
. Consider either turning off auto-sharding or switching the auto_shard_policy to DATA to shard this dataset. You can do this by creating a new `tf.data.Options()` object then setting `options.experimental_distribute.auto_shard_policy = AutoShardPolicy.DATA` before applying the options object to the dataset via `dataset.with_options(options)`.
10/10 [==============================] - 1s 2ms/step - loss: 0.4406
Epoch 2/2
10/10 [==============================] - 0s 2ms/step - loss: 0.1947
<keras.callbacks.History at 0x7fb81813d2d0>

Em ambos os casos - com Dataset ou NumPy - cada lote da entrada fornecida é dividido igualmente entre as várias réplicas. Por exemplo, se você estiver usando o MirroredStrategy com 2 GPUs, cada lote de tamanho 10 será dividido entre as 2 GPUs, com cada uma recebendo 5 exemplos de entrada em cada etapa. Cada época será treinada mais rapidamente à medida que você adiciona mais GPUs. Normalmente, você deseja aumentar o tamanho do lote à medida que adiciona mais aceleradores, de modo a fazer uso efetivo do poder de computação extra. Você também precisará reajustar sua taxa de aprendizado, dependendo do modelo. Você pode usar strategy.num_replicas_in_sync para obter o número de réplicas.

# Compute a global batch size using a number of replicas.
BATCH_SIZE_PER_REPLICA = 5
global_batch_size = (BATCH_SIZE_PER_REPLICA *
                     mirrored_strategy.num_replicas_in_sync)
dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100)
dataset = dataset.batch(global_batch_size)

LEARNING_RATES_BY_BATCH_SIZE = {5: 0.1, 10: 0.15}
learning_rate = LEARNING_RATES_BY_BATCH_SIZE[global_batch_size]

O que é suportado agora?

API de treinamento MirroredStrategy TPUStrategy MultiWorkerMirroredStrategy ParameterServerStrategy CentralStorageStrategy
Keras Model.fit Compatível Compatível Compatível Suporte experimental Suporte experimental

Exemplos e tutoriais

Aqui está uma lista de tutoriais e exemplos que ilustram a integração acima de ponta a ponta com Keras Model.fit :

  1. Tutorial : Treinamento com Model.fit e MirroredStrategy .
  2. Tutorial : Treinamento com Model.fit e MultiWorkerMirroredStrategy .
  3. Guia : Contém um exemplo de uso de Model.fit e TPUStrategy .
  4. Tutorial : Treinamento do servidor de parâmetros com Model.fit e ParameterServerStrategy .
  5. Tutorial : Ajustando o BERT para muitas tarefas do benchmark GLUE com Model.fit e TPUStrategy .
  6. Repositório do TensorFlow Model Garden contendo coleções de modelos de última geração implementados usando várias estratégias.

Use tf.distribute.Strategy com loops de treinamento personalizados

Conforme demonstrado acima, usar tf.distribute.Strategy com Keras Model.fit requer a alteração de apenas algumas linhas do seu código. Com um pouco mais de esforço, você também pode usar tf.distribute.Strategy com loops de treinamento personalizados .

Se você precisar de mais flexibilidade e controle sobre seus loops de treinamento do que é possível com Estimator ou Keras, você pode escrever loops de treinamento personalizados. Por exemplo, ao usar um GAN, você pode querer executar um número diferente de etapas geradoras ou discriminadoras a cada rodada. Da mesma forma, as estruturas de alto nível não são muito adequadas para o treinamento de Aprendizado por Reforço.

As classes tf.distribute.Strategy fornecem um conjunto básico de métodos para dar suporte a loops de treinamento personalizados. O uso deles pode exigir uma pequena reestruturação do código inicialmente, mas, uma vez feito isso, você poderá alternar entre GPUs, TPUs e várias máquinas simplesmente alterando a instância da estratégia.

Abaixo está um breve trecho que ilustra este caso de uso para um exemplo de treinamento simples usando o mesmo modelo Keras de antes.

Primeiro, crie o modelo e o otimizador dentro do escopo da estratégia. Isso garante que quaisquer variáveis ​​criadas com o modelo e o otimizador sejam variáveis ​​espelhadas.

with mirrored_strategy.scope():
  model = tf.keras.Sequential([tf.keras.layers.Dense(1, input_shape=(1,))])
  optimizer = tf.keras.optimizers.SGD()

Em seguida, crie o conjunto de dados de entrada e chame tf.distribute.Strategy.experimental_distribute_dataset para distribuir o conjunto de dados com base na estratégia.

dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(
    global_batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
2021-10-26 01:28:01.831942: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:695] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorDataset/_2"
op: "TensorDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_FLOAT
    }
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 1
        }
      }
      shape {
        dim {
          size: 1
        }
      }
    }
  }
}

Em seguida, defina uma etapa do treinamento. Use tf.GradientTape para calcular gradientes e o otimizador para aplicar esses gradientes para atualizar as variáveis ​​do seu modelo. Para distribuir esta etapa de treinamento, coloque-a em uma função train_step e passe-a para tf.distribute.Strategy.run junto com as entradas do conjunto de dados que você obteve do dist_dataset criado anteriormente:

loss_object = tf.keras.losses.BinaryCrossentropy(
  from_logits=True,
  reduction=tf.keras.losses.Reduction.NONE)

def compute_loss(labels, predictions):
  per_example_loss = loss_object(labels, predictions)
  return tf.nn.compute_average_loss(per_example_loss, global_batch_size=global_batch_size)

def train_step(inputs):
  features, labels = inputs

  with tf.GradientTape() as tape:
    predictions = model(features, training=True)
    loss = compute_loss(labels, predictions)

  gradients = tape.gradient(loss, model.trainable_variables)
  optimizer.apply_gradients(zip(gradients, model.trainable_variables))
  return loss

@tf.function
def distributed_train_step(dist_inputs):
  per_replica_losses = mirrored_strategy.run(train_step, args=(dist_inputs,))
  return mirrored_strategy.reduce(tf.distribute.ReduceOp.SUM, per_replica_losses,
                         axis=None)

Algumas outras coisas a serem observadas no código acima:

  1. Você usou tf.nn.compute_average_loss para calcular a perda. tf.nn.compute_average_loss soma a perda por exemplo e divide a soma pelo global_batch_size . Isso é importante porque depois que os gradientes são calculados em cada réplica, eles são agregados entre as réplicas somando -os.
  2. Você também usou a API tf.distribute.Strategy.reduce para agregar os resultados retornados por tf.distribute.Strategy.run . tf.distribute.Strategy.run retorna resultados de cada réplica local na estratégia e há várias maneiras de consumir esse resultado. Você pode reduce -los para obter um valor agregado. Você também pode fazer tf.distribute.Strategy.experimental_local_results para obter a lista de valores contidos no resultado, um por réplica local.
  3. Quando você chama apply_gradients em um escopo de estratégia de distribuição, seu comportamento é modificado. Especificamente, antes de aplicar gradientes em cada instância paralela durante o treinamento síncrono, ele executa uma soma sobre todas as réplicas dos gradientes.

Por fim, depois de definir a etapa de treinamento, você pode iterar sobre dist_dataset e executar o treinamento em um loop:

for dist_inputs in dist_dataset:
  print(distributed_train_step(dist_inputs))
tf.Tensor(0.18686396, shape=(), dtype=float32)
tf.Tensor(0.18628375, shape=(), dtype=float32)
tf.Tensor(0.18570684, shape=(), dtype=float32)
tf.Tensor(0.18513316, shape=(), dtype=float32)
tf.Tensor(0.1845627, shape=(), dtype=float32)
tf.Tensor(0.18399543, shape=(), dtype=float32)
tf.Tensor(0.18343134, shape=(), dtype=float32)
tf.Tensor(0.18287037, shape=(), dtype=float32)
tf.Tensor(0.18231256, shape=(), dtype=float32)
tf.Tensor(0.18175781, shape=(), dtype=float32)
tf.Tensor(0.18120615, shape=(), dtype=float32)
tf.Tensor(0.18065754, shape=(), dtype=float32)
tf.Tensor(0.18011193, shape=(), dtype=float32)
tf.Tensor(0.17956935, shape=(), dtype=float32)
tf.Tensor(0.17902976, shape=(), dtype=float32)
tf.Tensor(0.17849308, shape=(), dtype=float32)
tf.Tensor(0.17795937, shape=(), dtype=float32)
tf.Tensor(0.17742859, shape=(), dtype=float32)
tf.Tensor(0.17690066, shape=(), dtype=float32)
tf.Tensor(0.17637561, shape=(), dtype=float32)

No exemplo acima, você iterou no dist_dataset para fornecer entrada ao seu treinamento. Você também recebe o tf.distribute.Strategy.make_experimental_numpy_dataset para suportar entradas NumPy. Você pode usar essa API para criar um conjunto de dados antes de chamar tf.distribute.Strategy.experimental_distribute_dataset .

Outra maneira de iterar sobre seus dados é usar iteradores explicitamente. Você pode querer fazer isso quando quiser executar um determinado número de etapas em vez de iterar em todo o conjunto de dados. A iteração acima agora seria modificada para primeiro criar um iterador e depois chamar explicitamente next nele para obter os dados de entrada.

iterator = iter(dist_dataset)
for _ in range(10):
  print(distributed_train_step(next(iterator)))
tf.Tensor(0.17585339, shape=(), dtype=float32)
tf.Tensor(0.17533402, shape=(), dtype=float32)
tf.Tensor(0.17481743, shape=(), dtype=float32)
tf.Tensor(0.17430364, shape=(), dtype=float32)
tf.Tensor(0.17379259, shape=(), dtype=float32)
tf.Tensor(0.17328428, shape=(), dtype=float32)
tf.Tensor(0.17277871, shape=(), dtype=float32)
tf.Tensor(0.17227581, shape=(), dtype=float32)
tf.Tensor(0.17177561, shape=(), dtype=float32)
tf.Tensor(0.17127804, shape=(), dtype=float32)

Isso abrange o caso mais simples de usar a API tf.distribute.Strategy para distribuir loops de treinamento personalizados.

O que é suportado agora?

API de treinamento MirroredStrategy TPUStrategy MultiWorkerMirroredStrategy ParameterServerStrategy CentralStorageStrategy
Loop de treinamento personalizado Compatível Compatível Compatível Suporte experimental Suporte experimental

Exemplos e tutoriais

Aqui estão alguns exemplos de uso de estratégias de distribuição com loops de treinamento personalizados:

  1. Tutorial : Treinamento com um loop de treinamento personalizado e MirroredStrategy .
  2. Tutorial : Treinamento com um loop de treinamento personalizado e MultiWorkerMirroredStrategy .
  3. Guia : Contém um exemplo de um loop de treinamento personalizado com TPUStrategy .
  4. Tutorial : Treinamento do servidor de parâmetros com um loop de treinamento personalizado e ParameterServerStrategy .
  5. Repositório do TensorFlow Model Garden contendo coleções de modelos de última geração implementados usando várias estratégias.

Outros tópicos

Esta seção abrange alguns tópicos relevantes para vários casos de uso.

Configurando a variável de ambiente TF_CONFIG

Para treinamento de vários trabalhadores, conforme mencionado anteriormente, você precisa configurar a variável de ambiente 'TF_CONFIG' para cada binário em execução em seu cluster. A variável de ambiente 'TF_CONFIG' é uma string JSON que especifica quais tarefas constituem um cluster, seus endereços e a função de cada tarefa no cluster. O tensorflow/ecosystem fornece um modelo Kubernetes, que configura 'TF_CONFIG' para suas tarefas de treinamento.

Existem dois componentes de 'TF_CONFIG' : um cluster e uma tarefa.

  • Um cluster fornece informações sobre o cluster de treinamento, que é um dict que consiste em diferentes tipos de trabalhos, como trabalhadores. No treinamento de vários trabalhadores, geralmente há um trabalhador que assume um pouco mais de responsabilidade, como salvar o ponto de verificação e escrever o arquivo de resumo para o TensorBoard, além do que um trabalhador comum faz. Tal trabalhador é referido como o trabalhador "chefe", e é costume que o trabalhador com índice 0 seja apontado como o trabalhador chefe (na verdade, é assim que tf.distribute.Strategy é implementado).
  • Uma tarefa, por outro lado, fornece informações sobre a tarefa atual. O primeiro cluster de componentes é o mesmo para todos os trabalhadores e a segunda tarefa do componente é diferente em cada trabalhador e especifica o tipo e o índice desse trabalhador.

Um exemplo de 'TF_CONFIG' é:

os.environ["TF_CONFIG"] = json.dumps({
    "cluster": {
        "worker": ["host1:port", "host2:port", "host3:port"],
        "ps": ["host4:port", "host5:port"]
    },
   "task": {"type": "worker", "index": 1}
})

Este 'TF_CONFIG' especifica que existem três trabalhadores e duas tarefas "ps" no "cluster" junto com seus hosts e portas. A parte "task" especifica a função da tarefa atual no "cluster" —trabalhador 1 (o segundo trabalhador). As funções válidas em um cluster são "chief" , "worker" , "ps" e "evaluator" . Não deve haver nenhum trabalho "ps" , exceto ao usar tf.distribute.experimental.ParameterServerStrategy .

Qual é o próximo?

tf.distribute.Strategy está ativamente em desenvolvimento. Experimente e forneça seus comentários usando os problemas do GitHub .