Veja no TensorFlow.org | Executar no Google Colab | Ver fonte no GitHub | Baixar caderno |
As APIs tf.distribute fornecem uma maneira fácil para os usuários dimensionarem seu treinamento de uma única máquina para várias máquinas. Ao dimensionar seu modelo, os usuários também precisam distribuir sua entrada em vários dispositivos. tf.distribute
fornece APIs com as quais você pode distribuir automaticamente sua entrada entre os dispositivos.
Este guia mostrará as diferentes maneiras pelas quais você pode criar conjuntos de dados distribuídos e iteradores usando APIs tf.distribute
. Além disso, serão abordados os seguintes tópicos:
- Opções de uso, fragmentação e lote ao usar
tf.distribute.Strategy.experimental_distribute_dataset
etf.distribute.Strategy.distribute_datasets_from_function
. - Diferentes maneiras de iterar no conjunto de dados distribuído.
- Diferenças entre as APIs
tf.distribute.Strategy.experimental_distribute_dataset
/tf.distribute.Strategy.distribute_datasets_from_function
e as APIstf.data
, bem como quaisquer limitações que os usuários possam encontrar em seu uso.
Este guia não cobre o uso de entrada distribuída com APIs Keras.
Conjuntos de dados distribuídos
Para usar APIs tf.distribute
para dimensionar, é recomendável que os usuários usem tf.data.Dataset
para representar sua entrada. tf.distribute
foi feito para funcionar eficientemente com tf.data.Dataset
(por exemplo, pré-busca automática de dados em cada dispositivo acelerador) com otimizações de desempenho sendo incorporadas regularmente à implementação. Se você tiver um caso de uso para usar algo diferente de tf.data.Dataset
, consulte uma seção posterior deste guia. Em um loop de treinamento não distribuído, os usuários primeiro criam uma instância tf.data.Dataset
e, em seguida, iteram sobre os elementos. Por exemplo:
import tensorflow as tf
# Helper libraries
import numpy as np
import os
print(tf.__version__)
2.8.0-rc1
global_batch_size = 16
# Create a tf.data.Dataset object.
dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(global_batch_size)
@tf.function
def train_step(inputs):
features, labels = inputs
return labels - 0.3 * features
# Iterate over the dataset using the for..in construct.
for inputs in dataset:
print(train_step(inputs))
tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) tf.Tensor( [[0.7] [0.7] [0.7] [0.7]], shape=(4, 1), dtype=float32)
Para permitir que os usuários usem a estratégia tf.distribute
com alterações mínimas no código existente de um usuário, foram introduzidas duas APIs que distribuiriam uma instância tf.data.Dataset
e retornariam um objeto de conjunto de dados distribuído. Um usuário pode então iterar sobre essa instância de conjunto de dados distribuído e treinar seu modelo como antes. Vejamos agora as duas APIs - tf.distribute.Strategy.experimental_distribute_dataset
e tf.distribute.Strategy.distribute_datasets_from_function
com mais detalhes:
tf.distribute.Strategy.experimental_distribute_dataset
Uso
Essa API usa uma instância tf.data.Dataset
como entrada e retorna uma instância tf.distribute.DistributedDataset
. Você deve agrupar o conjunto de dados de entrada com um valor igual ao tamanho global do lote. Esse tamanho de lote global é o número de amostras que você deseja processar em todos os dispositivos em uma etapa. Você pode iterar sobre esse conjunto de dados distribuído de maneira Pythonic ou criar um iterador usando iter
. O objeto retornado não é uma instância tf.data.Dataset
e não oferece suporte a nenhuma outra API que transforme ou inspecione o conjunto de dados de qualquer forma. Essa é a API recomendada se você não tiver maneiras específicas de fragmentar sua entrada em diferentes réplicas.
global_batch_size = 16
mirrored_strategy = tf.distribute.MirroredStrategy()
dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(global_batch_size)
# Distribute input using the `experimental_distribute_dataset`.
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
# 1 global batch of data fed to the model in 1 step.
print(next(iter(dist_dataset)))
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',) (<tf.Tensor: shape=(16, 1), dtype=float32, numpy= array([[1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.]], dtype=float32)>, <tf.Tensor: shape=(16, 1), dtype=float32, numpy= array([[1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.]], dtype=float32)>) 2022-01-26 05:34:05.342660: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorDataset/_2" op: "TensorDataset" input: "Placeholder/_0" input: "Placeholder/_1" attr { key: "Toutput_types" value { list { type: DT_FLOAT type: DT_FLOAT } } } attr { key: "_cardinality" value { i: 1 } } attr { key: "metadata" value { s: "\n\017TensorDataset:4" } } attr { key: "output_shapes" value { list { shape { dim { size: 1 } } shape { dim { size: 1 } } } } } experimental_type { type_id: TFT_PRODUCT args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } } } args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } } } }
Propriedades
Lote
tf.distribute
rebate a instância tf.data.Dataset
de entrada com um novo tamanho de lote que é igual ao tamanho do lote global dividido pelo número de réplicas em sincronia. O número de réplicas em sincronia é igual ao número de dispositivos que estão participando do gradiente reduzido durante o treinamento. Quando um usuário chama next
no iterador distribuído, um tamanho de lote de dados por réplica é retornado em cada réplica. A cardinalidade do conjunto de dados rebatizado sempre será um múltiplo do número de réplicas. Aqui estão alguns exemplos:
tf.data.Dataset.range(6).batch(4, drop_remainder=False)
- Sem distribuição:
- Lote 1: [0, 1, 2, 3]
- Lote 2: [4, 5]
Com distribuição em 2 réplicas. O último lote ([4, 5]) é dividido entre 2 réplicas.
Lote 1:
- Réplica 1:[0, 1]
- Réplica 2:[2, 3]
Lote 2:
- Réplica 2: [4]
- Réplica 2: [5]
tf.data.Dataset.range(4).batch(4)
- Sem distribuição:
- Lote 1: [[0], [1], [2], [3]]
- Com distribuição em 5 réplicas:
- Lote 1:
- Réplica 1: [0]
- Réplica 2: [1]
- Réplica 3: [2]
- Réplica 4: [3]
- Réplica 5: []
tf.data.Dataset.range(8).batch(4)
- Sem distribuição:
- Lote 1: [0, 1, 2, 3]
- Lote 2: [4, 5, 6, 7]
- Com distribuição em 3 réplicas:
- Lote 1:
- Réplica 1: [0, 1]
- Réplica 2: [2, 3]
- Réplica 3: []
- Lote 2:
- Réplica 1: [4, 5]
- Réplica 2: [6, 7]
- Réplica 3: []
O rebatch do conjunto de dados tem uma complexidade de espaço que aumenta linearmente com o número de réplicas. Isso significa que, para o caso de uso de treinamento de vários trabalhadores, o pipeline de entrada pode apresentar erros OOM.
Fragmentação
tf.distribute
também fragmenta automaticamente o conjunto de dados de entrada no treinamento de vários trabalhadores com MultiWorkerMirroredStrategy
e TPUStrategy
. Cada conjunto de dados é criado no dispositivo de CPU do trabalhador. A fragmentação automática de um conjunto de dados em um conjunto de trabalhadores significa que cada trabalhador recebe um subconjunto de todo o conjunto de dados (se o tf.data.experimental.AutoShardPolicy
correto estiver definido). Isso é para garantir que, em cada etapa, um tamanho de lote global de elementos de conjunto de dados não sobrepostos seja processado por cada trabalhador. Autosharding tem algumas opções diferentes que podem ser especificadas usando tf.data.experimental.DistributeOptions
. Observe que não há autosharding no treinamento de vários trabalhadores com ParameterServerStrategy
, e mais informações sobre a criação de conjuntos de dados com essa estratégia podem ser encontradas no tutorial Parameter Server Strategy .
dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(64).batch(16)
options = tf.data.Options()
options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.DATA
dataset = dataset.with_options(options)
Existem três opções diferentes que você pode definir para tf.data.experimental.AutoShardPolicy
:
- AUTO: Esta é a opção padrão, o que significa que uma tentativa de fragmentação será feita por FILE. A tentativa de fragmentação por FILE falhará se um conjunto de dados baseado em arquivo não for detectado.
tf.distribute
então retornará ao sharding por DATA. Observe que, se o conjunto de dados de entrada for baseado em arquivo, mas o número de arquivos for menor que o número de trabalhadores, umInvalidArgumentError
será gerado. Se isso acontecer, defina explicitamente a política comoAutoShardPolicy.DATA
ou divida sua fonte de entrada em arquivos menores, de modo que o número de arquivos seja maior que o número de trabalhadores. FILE: Esta é a opção se você deseja fragmentar os arquivos de entrada em todos os workers. Você deve usar esta opção se o número de arquivos de entrada for muito maior que o número de trabalhadores e os dados nos arquivos estiverem distribuídos uniformemente. A desvantagem dessa opção é ter trabalhadores ociosos se os dados nos arquivos não forem distribuídos uniformemente. Se o número de arquivos for menor que o número de trabalhadores, um
InvalidArgumentError
será gerado. Se isso acontecer, defina explicitamente a política comoAutoShardPolicy.DATA
. Por exemplo, vamos distribuir 2 arquivos por 2 workers com 1 réplica cada. O arquivo 1 contém [0, 1, 2, 3, 4, 5] e o arquivo 2 contém [6, 7, 8, 9, 10, 11]. Deixe o número total de réplicas sincronizadas ser 2 e o tamanho global do lote seja 4.- Trabalhador 0:
- Lote 1 = Réplica 1: [0, 1]
- Lote 2 = Réplica 1: [2, 3]
- Lote 3 = Réplica 1: [4]
- Lote 4 = Réplica 1: [5]
- Trabalhador 1:
- Lote 1 = Réplica 2: [6, 7]
- Lote 2 = Réplica 2: [8, 9]
- Lote 3 = Réplica 2: [10]
- Lote 4 = Réplica 2: [11]
DATA: Isso fará a fragmentação automática dos elementos em todos os trabalhadores. Cada um dos trabalhadores lerá todo o conjunto de dados e processará apenas o fragmento atribuído a ele. Todos os outros fragmentos serão descartados. Isso geralmente é usado se o número de arquivos de entrada for menor que o número de trabalhadores e você quiser uma melhor fragmentação de dados em todos os trabalhadores. A desvantagem é que todo o conjunto de dados será lido em cada trabalhador. Por exemplo, vamos distribuir 1 arquivo por 2 trabalhadores. O arquivo 1 contém [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]. Deixe o número total de réplicas em sincronia ser 2.
- Trabalhador 0:
- Lote 1 = Réplica 1: [0, 1]
- Lote 2 = Réplica 1: [4, 5]
- Lote 3 = Réplica 1: [8, 9]
- Trabalhador 1:
- Lote 1 = Réplica 2: [2, 3]
- Lote 2 = Réplica 2: [6, 7]
- Lote 3 = Réplica 2: [10, 11]
DESATIVADO: se você desativar o autosharding, cada trabalhador processará todos os dados. Por exemplo, vamos distribuir 1 arquivo por 2 trabalhadores. O arquivo 1 contém [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]. Deixe o número total de réplicas sincronizadas ser 2. Em seguida, cada trabalhador verá a seguinte distribuição:
- Trabalhador 0:
- Lote 1 = Réplica 1: [0, 1]
- Lote 2 = Réplica 1: [2, 3]
- Lote 3 = Réplica 1: [4, 5]
- Lote 4 = Réplica 1: [6, 7]
- Lote 5 = Réplica 1: [8, 9]
Lote 6 = Réplica 1: [10, 11]
Trabalhador 1:
Lote 1 = Réplica 2: [0, 1]
Lote 2 = Réplica 2: [2, 3]
Lote 3 = Réplica 2: [4, 5]
Lote 4 = Réplica 2: [6, 7]
Lote 5 = Réplica 2: [8, 9]
Lote 6 = Réplica 2: [10, 11]
Pré-busca
Por padrão, tf.distribute
adiciona uma transformação de pré-busca no final da instância tf.data.Dataset
fornecida pelo usuário. O argumento para a transformação de pré-busca que é buffer_size
é igual ao número de réplicas em sincronia.
tf.distribute.Strategy.distribute_datasets_from_function
Uso
Essa API usa uma função de entrada e retorna uma instância tf.distribute.DistributedDataset
. A função de entrada que os usuários passam tem um argumento tf.distribute.InputContext
e deve retornar uma instância tf.data.Dataset
. Com essa API, tf.distribute
não faz mais alterações na instância tf.data.Dataset
do usuário retornada da função de entrada. É responsabilidade do usuário agrupar e fragmentar o conjunto de dados. tf.distribute
chama a função de entrada no dispositivo de CPU de cada um dos trabalhadores. Além de permitir que os usuários especifiquem sua própria lógica de lote e fragmentação, essa API também demonstra melhor escalabilidade e desempenho em comparação com tf.distribute.Strategy.experimental_distribute_dataset
quando usada para treinamento de vários trabalhadores.
mirrored_strategy = tf.distribute.MirroredStrategy()
def dataset_fn(input_context):
batch_size = input_context.get_per_replica_batch_size(global_batch_size)
dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(64).batch(16)
dataset = dataset.shard(
input_context.num_input_pipelines, input_context.input_pipeline_id)
dataset = dataset.batch(batch_size)
dataset = dataset.prefetch(2) # This prefetches 2 batches per device.
return dataset
dist_dataset = mirrored_strategy.distribute_datasets_from_function(dataset_fn)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
Propriedades
Lote
A instância tf.data.Dataset
que é o valor de retorno da função de entrada deve ser agrupada usando o tamanho de lote por réplica. O tamanho do lote por réplica é o tamanho do lote global dividido pelo número de réplicas que estão participando do treinamento de sincronização. Isso ocorre porque o tf.distribute
chama a função de entrada no dispositivo da CPU de cada um dos trabalhadores. O conjunto de dados criado em um determinado trabalhador deve estar pronto para uso por todas as réplicas desse trabalhador.
Fragmentação
O objeto tf.distribute.InputContext
que é passado implicitamente como um argumento para a função de entrada do usuário é criado por tf.distribute
sob o capô. Ele tem informações sobre o número de trabalhadores, id de trabalhador atual, etc. Esta função de entrada pode lidar com sharding de acordo com as políticas definidas pelo usuário usando essas propriedades que fazem parte do objeto tf.distribute.InputContext
.
Pré-busca
tf.distribute
não adiciona uma transformação de pré-busca no final do tf.data.Dataset
retornado pela função de entrada fornecida pelo usuário.
Iteradores distribuídos
Semelhante às instâncias tf.data.Dataset
não distribuídas, você precisará criar um iterador nas instâncias tf.distribute.DistributedDataset
para iterar sobre ela e acessar os elementos no tf.distribute.DistributedDataset
. A seguir estão as maneiras pelas quais você pode criar um tf.distribute.DistributedIterator
e usá-lo para treinar seu modelo:
Usos
Use uma construção de loop Pythonic for
Você pode usar um loop Pythonic amigável para iterar sobre o tf.distribute.DistributedDataset
. Os elementos retornados do tf.distribute.DistributedIterator
podem ser um único tf.Tensor
ou um tf.distribute.DistributedValues
que contém um valor por réplica. Colocar o loop dentro de um tf.function
aumentará o desempenho. No entanto, break
e return
não são suportados atualmente para um loop em um tf.distribute.DistributedDataset
que é colocado dentro de um tf.function
.
global_batch_size = 16
mirrored_strategy = tf.distribute.MirroredStrategy()
dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(100).batch(global_batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
@tf.function
def train_step(inputs):
features, labels = inputs
return labels - 0.3 * features
for x in dist_dataset:
# train_step trains the model using the dataset elements
loss = mirrored_strategy.run(train_step, args=(x,))
print("Loss is ", loss)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',) 2022-01-26 05:34:05.431113: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorDataset/_2" op: "TensorDataset" input: "Placeholder/_0" input: "Placeholder/_1" attr { key: "Toutput_types" value { list { type: DT_FLOAT type: DT_FLOAT } } } attr { key: "_cardinality" value { i: 1 } } attr { key: "metadata" value { s: "\n\020TensorDataset:29" } } attr { key: "output_shapes" value { list { shape { dim { size: 1 } } shape { dim { size: 1 } } } } } experimental_type { type_id: TFT_PRODUCT args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } } } args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } } } } Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7]], shape=(4, 1), dtype=float32)
Use iter
para criar um iterador explícito
Para iterar sobre os elementos em uma instância tf.distribute.DistributedDataset
, você pode criar um tf.distribute.DistributedIterator
usando a API iter
nele. Com um iterador explícito, você pode iterar por um número fixo de etapas. Para obter o próximo elemento de uma instância tf.distribute.DistributedIterator
dist_iterator
, você pode chamar next(dist_iterator)
, dist_iterator.get_next()
ou dist_iterator.get_next_as_optional()
. Os dois primeiros são essencialmente os mesmos:
num_epochs = 10
steps_per_epoch = 5
for epoch in range(num_epochs):
dist_iterator = iter(dist_dataset)
for step in range(steps_per_epoch):
# train_step trains the model using the dataset elements
loss = mirrored_strategy.run(train_step, args=(next(dist_iterator),))
# which is the same as
# loss = mirrored_strategy.run(train_step, args=(dist_iterator.get_next(),))
print("Loss is ", loss)
Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32)
Com next()
ou tf.distribute.DistributedIterator.get_next()
, se o tf.distribute.DistributedIterator
chegar ao fim, um erro OutOfRange será lançado. O cliente pode pegar o erro no lado do python e continuar fazendo outros trabalhos, como checkpoints e avaliações. No entanto, isso não funcionará se você estiver usando um loop de treinamento de host (ou seja, execute várias etapas por tf.function
), que se parece com:
@tf.function
def train_fn(iterator):
for _ in tf.range(steps_per_loop):
strategy.run(step_fn, args=(next(iterator),))
train_fn
contém várias etapas envolvendo o corpo da etapa dentro de um tf.range
. Nesse caso, diferentes iterações no loop sem dependência podem iniciar em paralelo, portanto, um erro OutOfRange pode ser acionado em iterações posteriores antes que o cálculo das iterações anteriores termine. Assim que um erro OutOfRange for lançado, todas as operações na função serão encerradas imediatamente. Se este for um caso que você gostaria de evitar, uma alternativa que não gera um erro OutOfRange é tf.distribute.DistributedIterator.get_next_as_optional()
. get_next_as_optional
retorna um tf.experimental.Optional
que contém o próximo elemento ou nenhum valor se o tf.distribute.DistributedIterator
chegou ao fim.
# You can break the loop with get_next_as_optional by checking if the Optional contains value
global_batch_size = 4
steps_per_loop = 5
strategy = tf.distribute.MirroredStrategy(devices=["GPU:0", "CPU:0"])
dataset = tf.data.Dataset.range(9).batch(global_batch_size)
distributed_iterator = iter(strategy.experimental_distribute_dataset(dataset))
@tf.function
def train_fn(distributed_iterator):
for _ in tf.range(steps_per_loop):
optional_data = distributed_iterator.get_next_as_optional()
if not optional_data.has_value():
break
per_replica_results = strategy.run(lambda x:x, args=(optional_data.get_value(),))
tf.print(strategy.experimental_local_results(per_replica_results))
train_fn(distributed_iterator)
WARNING:tensorflow:There are non-GPU devices in `tf.distribute.Strategy`, not using nccl allreduce. INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:CPU:0') 2022-01-26 05:34:07.300202: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "RangeDataset/_3" op: "RangeDataset" input: "Const/_0" input: "Const/_1" input: "Const/_2" attr { key: "_cardinality" value { i: 9 } } attr { key: "metadata" value { s: "\n\020RangeDataset:104" } } attr { key: "output_shapes" value { list { shape { } } } } attr { key: "output_types" value { list { type: DT_INT64 } } } experimental_type { type_id: TFT_PRODUCT args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_INT64 } } } } args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_INT64 } } } } } 2022-01-26 05:34:07.355301: W tensorflow/core/framework/dataset.cc:768] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations. ([0 1], [2 3]) ([4 5], [6 7]) ([8], [])
Usando a propriedade element_spec
Se você passar os elementos de um conjunto de dados distribuído para um tf.function
e desejar uma garantia tf.TypeSpec
, poderá especificar o argumento input_signature
do tf.function
. A saída de um conjunto de dados distribuído é tf.distribute.DistributedValues
que pode representar a entrada para um único dispositivo ou vários dispositivos. Para obter o tf.TypeSpec
correspondente a esse valor distribuído, você pode usar a propriedade element_spec
do conjunto de dados distribuído ou objeto iterador distribuído.
global_batch_size = 16
epochs = 5
steps_per_epoch = 5
mirrored_strategy = tf.distribute.MirroredStrategy()
dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(100).batch(global_batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
@tf.function(input_signature=[dist_dataset.element_spec])
def train_step(per_replica_inputs):
def step_fn(inputs):
return 2 * inputs
return mirrored_strategy.run(step_fn, args=(per_replica_inputs,))
for _ in range(epochs):
iterator = iter(dist_dataset)
for _ in range(steps_per_epoch):
output = train_step(next(iterator))
tf.print(output)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',) 2022-01-26 05:34:07.611498: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorDataset/_2" op: "TensorDataset" input: "Placeholder/_0" input: "Placeholder/_1" attr { key: "Toutput_types" value { list { type: DT_FLOAT type: DT_FLOAT } } } attr { key: "_cardinality" value { i: 1 } } attr { key: "metadata" value { s: "\n\021TensorDataset:122" } } attr { key: "output_shapes" value { list { shape { dim { size: 1 } } shape { dim { size: 1 } } } } } experimental_type { type_id: TFT_PRODUCT args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } } } args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } } } } ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]])
Lotes Parciais
Lotes parciais são encontrados quando as instâncias tf.data.Dataset
que os usuários criam podem conter tamanhos de lote que não são divisíveis uniformemente pelo número de réplicas ou quando a cardinalidade da instância do conjunto de dados não é divisível pelo tamanho do lote. Isso significa que quando o conjunto de dados é distribuído em várias réplicas, a next
chamada em alguns iteradores resultará em um OutOfRangeError. Para lidar com esse caso de uso, tf.distribute
retorna lotes fictícios de tamanho de lote 0 em réplicas que não têm mais dados para processar.
Para o caso de trabalhador único, se os dados não forem retornados pela next
chamada no iterador, lotes fictícios de tamanho de lote 0 serão criados e usados junto com os dados reais no conjunto de dados. No caso de lotes parciais, o último lote global de dados conterá dados reais ao lado de lotes fictícios de dados. A condição de parada para processamento de dados agora verifica se alguma das réplicas possui dados. Se não houver dados em nenhuma das réplicas, um erro OutOfRange será lançado.
Para o caso de vários trabalhadores, o valor booleano que representa a presença de dados em cada um dos trabalhadores é agregado usando comunicação de réplica cruzada e isso é usado para identificar se todos os trabalhadores concluíram o processamento do conjunto de dados distribuído. Como isso envolve comunicação entre trabalhadores, há alguma penalidade de desempenho envolvida.
Ressalvas
Ao usar as APIs
tf.distribute.Strategy.experimental_distribute_dataset
com uma configuração de vários trabalhadores, os usuários passam umtf.data.Dataset
que lê arquivos. Setf.data.experimental.AutoShardPolicy
estiver definido comoAUTO
ouFILE
, o tamanho real do lote por etapa poderá ser menor do que o tamanho do lote global definido pelo usuário. Isso pode acontecer quando os elementos restantes no arquivo são menores que o tamanho global do lote. Os usuários podem esgotar o conjunto de dados sem depender do número de etapas a serem executadas ou definirtf.data.experimental.AutoShardPolicy
comoDATA
para contornar isso.No momento, as transformações de conjunto de dados com estado não são compatíveis com
tf.distribute
e quaisquer operações com estado que o conjunto de dados possa ter são ignoradas no momento. Por exemplo, se seu conjunto de dados tem ummap_fn
que usatf.random.uniform
para girar uma imagem, então você tem um gráfico de conjunto de dados que depende do estado (ou seja, a semente aleatória) na máquina local onde o processo python está sendo executado.As
tf.data.experimental.OptimizationOptions
experimentais que são desabilitadas por padrão podem em certos contextos -- como quando usadas junto comtf.distribute
-- causar uma degradação de desempenho. Você só deve habilitá-los depois de validar que eles beneficiam o desempenho de sua carga de trabalho em uma configuração de distribuição.Consulte este guia para saber como otimizar seu pipeline de entrada com
tf.data
em geral. Algumas dicas adicionais:Se você tiver vários trabalhadores e estiver usando
tf.data.Dataset.list_files
para criar um conjunto de dados de todos os arquivos que correspondam a um ou mais padrões glob, lembre-se de definir o argumentoseed
ou definashuffle=False
para que cada trabalhador fragmente o arquivo de forma consistente.Se o pipeline de entrada incluir tanto o embaralhamento dos dados no nível do registro quanto a análise dos dados, a menos que os dados não analisados sejam significativamente maiores do que os dados analisados (o que geralmente não é o caso), embaralhe primeiro e depois analise, conforme mostrado no exemplo a seguir. Isso pode beneficiar o uso e o desempenho da memória.
d = tf.data.Dataset.list_files(pattern, shuffle=False)
d = d.shard(num_workers, worker_index)
d = d.repeat(num_epochs)
d = d.shuffle(shuffle_buffer_size)
d = d.interleave(tf.data.TFRecordDataset,
cycle_length=num_readers, block_length=1)
d = d.map(parser_fn, num_parallel_calls=num_map_threads)
tf.data.Dataset.shuffle(buffer_size, seed=None, reshuffle_each_iteration=None)
mantém um buffer interno de elementosbuffer_size
e, assim, reduzirbuffer_size
pode aliviar o problema de OOM.A ordem em que os dados são processados pelos trabalhadores ao usar
tf.distribute.experimental_distribute_dataset
outf.distribute.distribute_datasets_from_function
não é garantida. Isso geralmente é necessário se você estiver usandotf.distribute
para dimensionar a previsão. No entanto, você pode inserir um índice para cada elemento no lote e ordenar as saídas de acordo. O snippet a seguir é um exemplo de como ordenar saídas.
mirrored_strategy = tf.distribute.MirroredStrategy()
dataset_size = 24
batch_size = 6
dataset = tf.data.Dataset.range(dataset_size).enumerate().batch(batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
def predict(index, inputs):
outputs = 2 * inputs
return index, outputs
result = {}
for index, inputs in dist_dataset:
output_index, outputs = mirrored_strategy.run(predict, args=(index, inputs))
indices = list(mirrored_strategy.experimental_local_results(output_index))
rindices = []
for a in indices:
rindices.extend(a.numpy())
outputs = list(mirrored_strategy.experimental_local_results(outputs))
routputs = []
for a in outputs:
routputs.extend(a.numpy())
for i, value in zip(rindices, routputs):
result[i] = value
print(result)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',) WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance. WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance. WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance. WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance. {0: 0, 1: 2, 2: 4, 3: 6, 4: 8, 5: 10, 6: 12, 7: 14, 8: 16, 9: 18, 10: 20, 11: 22, 12: 24, 13: 26, 14: 28, 15: 30, 16: 32, 17: 34, 18: 36, 19: 38, 20: 40, 21: 42, 22: 44, 23: 46} 2022-01-26 05:34:08.978884: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "RangeDataset/_3" op: "RangeDataset" input: "Const/_4" input: "Const/_1" input: "Const/_2" attr { key: "_cardinality" value { i: 9223372036854775807 } } attr { key: "metadata" value { s: "\n\020RangeDataset:162" } } attr { key: "output_shapes" value { list { shape { } } } } attr { key: "output_types" value { list { type: DT_INT64 } } } experimental_type { type_id: TFT_PRODUCT args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_INT64 } } } } args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_INT64 } } } } }
Como distribuo meus dados se não estiver usando uma instância tf.data.Dataset canônica?
Às vezes, os usuários não podem usar um tf.data.Dataset
para representar sua entrada e, posteriormente, as APIs mencionadas acima para distribuir o conjunto de dados para vários dispositivos. Nesses casos, você pode usar tensores brutos ou entradas de um gerador.
Use experimental_distribute_values_from_function para entradas de tensor arbitrárias
strategy.run
aceita tf.distribute.DistributedValues
que é a saída de next(iterator)
. Para passar os valores do tensor, use experimental_distribute_values_from_function
para construir tf.distribute.DistributedValues
a partir de tensores brutos.
mirrored_strategy = tf.distribute.MirroredStrategy()
worker_devices = mirrored_strategy.extended.worker_devices
def value_fn(ctx):
return tf.constant(1.0)
distributed_values = mirrored_strategy.experimental_distribute_values_from_function(value_fn)
for _ in range(4):
result = mirrored_strategy.run(lambda x:x, args=(distributed_values,))
print(result)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',) WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance. tf.Tensor(1.0, shape=(), dtype=float32) tf.Tensor(1.0, shape=(), dtype=float32) tf.Tensor(1.0, shape=(), dtype=float32) tf.Tensor(1.0, shape=(), dtype=float32)
Use tf.data.Dataset.from_generator se sua entrada for de um gerador
Se você tiver uma função geradora que deseja usar, poderá criar uma instância tf.data.Dataset
usando a API from_generator
.
mirrored_strategy = tf.distribute.MirroredStrategy()
def input_gen():
while True:
yield np.random.rand(4)
# use Dataset.from_generator
dataset = tf.data.Dataset.from_generator(
input_gen, output_types=(tf.float32), output_shapes=tf.TensorShape([4]))
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
iterator = iter(dist_dataset)
for _ in range(4):
mirrored_strategy.run(lambda x:x, args=(next(iterator),))
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',) 2022-01-26 05:34:09.091386: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Did not find a shardable source, walked to a node which is not a dataset: name: "FlatMapDataset/_2" op: "FlatMapDataset" input: "TensorDataset/_1" attr { key: "Targuments" value { list { } } } attr { key: "_cardinality" value { i: -2 } } attr { key: "f" value { func { name: "__inference_Dataset_flat_map_flat_map_fn_3980" } } } attr { key: "metadata" value { s: "\n\022FlatMapDataset:178" } } attr { key: "output_shapes" value { list { shape { dim { size: 4 } } } } } attr { key: "output_types" value { list { type: DT_FLOAT } } } experimental_type { type_id: TFT_PRODUCT args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } } } args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } } } } . Consider either turning off auto-sharding or switching the auto_shard_policy to DATA to shard this dataset. You can do this by creating a new `tf.data.Options()` object then setting `options.experimental_distribute.auto_shard_policy = AutoShardPolicy.DATA` before applying the options object to the dataset via `dataset.with_options(options)`.