Veja no TensorFlow.org | Executar no Google Colab | Ver fonte no GitHub | Baixar caderno |
Antes de executar este notebook Colab, verifique se o acelerador de hardware é uma TPU verificando as configurações do notebook: Runtime > Change runtime type > Hardware accelerator > TPU .
Configurar
import tensorflow as tf
import os
import tensorflow_datasets as tfds
/tmpfs/src/tf_docs_env/lib/python3.6/site-packages/requests/__init__.py:104: RequestsDependencyWarning: urllib3 (1.26.8) or chardet (2.3.0)/charset_normalizer (2.0.11) doesn't match a supported version! RequestsDependencyWarning)
Inicialização da TPU
Normalmente, as TPUs são trabalhadores do Cloud TPU, que são diferentes do processo local que executa o programa Python do usuário. Assim, você precisa fazer algum trabalho de inicialização para se conectar ao cluster remoto e inicializar as TPUs. Observe que o argumento tpu
para tf.distribute.cluster_resolver.TPUClusterResolver
é um endereço especial apenas para o Colab. Se você estiver executando seu código no Google Compute Engine (GCE), deverá passar o nome do Cloud TPU.
resolver = tf.distribute.cluster_resolver.TPUClusterResolver(tpu='')
tf.config.experimental_connect_to_cluster(resolver)
# This is the TPU initialization code that has to be at the beginning.
tf.tpu.experimental.initialize_tpu_system(resolver)
print("All devices: ", tf.config.list_logical_devices('TPU'))
INFO:tensorflow:Clearing out eager caches INFO:tensorflow:Clearing out eager caches INFO:tensorflow:Initializing the TPU system: grpc://10.240.1.10:8470 INFO:tensorflow:Initializing the TPU system: grpc://10.240.1.10:8470 INFO:tensorflow:Finished initializing TPU system. INFO:tensorflow:Finished initializing TPU system. All devices: [LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:0', device_type='TPU'), LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:1', device_type='TPU'), LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:2', device_type='TPU'), LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:3', device_type='TPU'), LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:4', device_type='TPU'), LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:5', device_type='TPU'), LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:6', device_type='TPU'), LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:7', device_type='TPU')]
Posicionamento manual do dispositivo
Depois que a TPU for inicializada, você poderá usar o posicionamento manual do dispositivo para colocar o cálculo em um único dispositivo TPU:
a = tf.constant([[1.0, 2.0, 3.0], [4.0, 5.0, 6.0]])
b = tf.constant([[1.0, 2.0], [3.0, 4.0], [5.0, 6.0]])
with tf.device('/TPU:0'):
c = tf.matmul(a, b)
print("c device: ", c.device)
print(c)
c device: /job:worker/replica:0/task:0/device:TPU:0 tf.Tensor( [[22. 28.] [49. 64.]], shape=(2, 2), dtype=float32)
Estratégias de distribuição
Normalmente, você executa seu modelo em várias TPUs de maneira paralela de dados. Para distribuir seu modelo em várias TPUs (ou outros aceleradores), o TensorFlow oferece várias estratégias de distribuição. Você pode substituir sua estratégia de distribuição e o modelo será executado em qualquer dispositivo (TPU). Consulte o guia de estratégia de distribuição para obter mais informações.
Para demonstrar isso, crie um objeto tf.distribute.TPUStrategy
:
strategy = tf.distribute.TPUStrategy(resolver)
INFO:tensorflow:Found TPU system: INFO:tensorflow:Found TPU system: INFO:tensorflow:*** Num TPU Cores: 8 INFO:tensorflow:*** Num TPU Cores: 8 INFO:tensorflow:*** Num TPU Workers: 1 INFO:tensorflow:*** Num TPU Workers: 1 INFO:tensorflow:*** Num TPU Cores Per Worker: 8 INFO:tensorflow:*** Num TPU Cores Per Worker: 8 INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:localhost/replica:0/task:0/device:CPU:0, CPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:localhost/replica:0/task:0/device:CPU:0, CPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:CPU:0, CPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:CPU:0, CPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:0, TPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:0, TPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:1, TPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:1, TPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:2, TPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:2, TPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:3, TPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:3, TPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:4, TPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:4, TPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:5, TPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:5, TPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:6, TPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:6, TPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:7, TPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:7, TPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU_SYSTEM:0, TPU_SYSTEM, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU_SYSTEM:0, TPU_SYSTEM, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:XLA_CPU:0, XLA_CPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:XLA_CPU:0, XLA_CPU, 0, 0)
Para replicar um cálculo para que ele possa ser executado em todos os núcleos da TPU, você pode passá-lo para a API strategy.run
. Abaixo está um exemplo que mostra todos os núcleos recebendo as mesmas entradas (a, b)
e realizando a multiplicação de matrizes em cada núcleo de forma independente. As saídas serão os valores de todas as réplicas.
@tf.function
def matmul_fn(x, y):
z = tf.matmul(x, y)
return z
z = strategy.run(matmul_fn, args=(a, b))
print(z)
PerReplica:{ 0: tf.Tensor( [[22. 28.] [49. 64.]], shape=(2, 2), dtype=float32), 1: tf.Tensor( [[22. 28.] [49. 64.]], shape=(2, 2), dtype=float32), 2: tf.Tensor( [[22. 28.] [49. 64.]], shape=(2, 2), dtype=float32), 3: tf.Tensor( [[22. 28.] [49. 64.]], shape=(2, 2), dtype=float32), 4: tf.Tensor( [[22. 28.] [49. 64.]], shape=(2, 2), dtype=float32), 5: tf.Tensor( [[22. 28.] [49. 64.]], shape=(2, 2), dtype=float32), 6: tf.Tensor( [[22. 28.] [49. 64.]], shape=(2, 2), dtype=float32), 7: tf.Tensor( [[22. 28.] [49. 64.]], shape=(2, 2), dtype=float32) }
Classificação em TPUs
Tendo coberto os conceitos básicos, considere um exemplo mais concreto. Esta seção demonstra como usar a estratégia de distribuição — tf.distribute.TPUStrategy
— para treinar um modelo Keras em um Cloud TPU.
Definir um modelo Keras
Comece com uma definição de um modelo Sequential
Keras para classificação de imagem no conjunto de dados MNIST usando Keras. Não é diferente do que você usaria se estivesse treinando em CPUs ou GPUs. Observe que a criação do modelo Keras precisa estar dentro do strategy.scope
, para que as variáveis possam ser criadas em cada dispositivo TPU. Outras partes do código não precisam estar dentro do escopo da estratégia.
def create_model():
return tf.keras.Sequential(
[tf.keras.layers.Conv2D(256, 3, activation='relu', input_shape=(28, 28, 1)),
tf.keras.layers.Conv2D(256, 3, activation='relu'),
tf.keras.layers.Flatten(),
tf.keras.layers.Dense(256, activation='relu'),
tf.keras.layers.Dense(128, activation='relu'),
tf.keras.layers.Dense(10)])
Carregar o conjunto de dados
O uso eficiente da API tf.data.Dataset
é fundamental ao usar uma Cloud TPU, pois é impossível usar as Cloud TPUs a menos que você possa alimentá-las com dados com rapidez suficiente. Você pode saber mais sobre o desempenho do conjunto de dados no guia de desempenho do pipeline de entrada .
Para todos os experimentos, exceto os mais simples (usando tf.data.Dataset.from_tensor_slices
ou outros dados no gráfico), você precisa armazenar todos os arquivos de dados lidos pelo conjunto de dados em buckets do Google Cloud Storage (GCS).
Para a maioria dos casos de uso, é recomendado converter seus dados no formato TFRecord
e usar um tf.data.TFRecordDataset
para lê-los. Verifique o tutorial TFRecord e tf.Example para obter detalhes sobre como fazer isso. Não é um requisito difícil e você pode usar outros leitores de conjunto de dados, como tf.data.FixedLengthRecordDataset
ou tf.data.TextLineDataset
.
Você pode carregar pequenos conjuntos de dados inteiros na memória usando tf.data.Dataset.cache
.
Independentemente do formato de dados usado, é altamente recomendável que você use arquivos grandes na ordem de 100 MB. Isso é especialmente importante nessa configuração de rede, pois a sobrecarga de abrir um arquivo é significativamente maior.
Conforme mostrado no código abaixo, você deve usar o módulo tensorflow_datasets
para obter uma cópia dos dados de treinamento e teste do MNIST. Observe que try_gcs
é especificado para usar uma cópia que está disponível em um bucket público do GCS. Se você não especificar isso, a TPU não poderá acessar os dados baixados.
def get_dataset(batch_size, is_training=True):
split = 'train' if is_training else 'test'
dataset, info = tfds.load(name='mnist', split=split, with_info=True,
as_supervised=True, try_gcs=True)
# Normalize the input data.
def scale(image, label):
image = tf.cast(image, tf.float32)
image /= 255.0
return image, label
dataset = dataset.map(scale)
# Only shuffle and repeat the dataset in training. The advantage of having an
# infinite dataset for training is to avoid the potential last partial batch
# in each epoch, so that you don't need to think about scaling the gradients
# based on the actual batch size.
if is_training:
dataset = dataset.shuffle(10000)
dataset = dataset.repeat()
dataset = dataset.batch(batch_size)
return dataset
Treinar o modelo usando APIs de alto nível Keras
Você pode treinar seu modelo com o fit
de Keras e compile
APIs. Não há nada específico de TPU nesta etapa - você escreve o código como se estivesse usando várias GPUs e uma MirroredStrategy
em vez da TPUStrategy
. Você pode aprender mais no tutorial Treinamento distribuído com Keras .
with strategy.scope():
model = create_model()
model.compile(optimizer='adam',
loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
metrics=['sparse_categorical_accuracy'])
batch_size = 200
steps_per_epoch = 60000 // batch_size
validation_steps = 10000 // batch_size
train_dataset = get_dataset(batch_size, is_training=True)
test_dataset = get_dataset(batch_size, is_training=False)
model.fit(train_dataset,
epochs=5,
steps_per_epoch=steps_per_epoch,
validation_data=test_dataset,
validation_steps=validation_steps)
Epoch 1/5 300/300 [==============================] - 18s 32ms/step - loss: 0.1433 - sparse_categorical_accuracy: 0.9564 - val_loss: 0.0452 - val_sparse_categorical_accuracy: 0.9859 Epoch 2/5 300/300 [==============================] - 6s 21ms/step - loss: 0.0335 - sparse_categorical_accuracy: 0.9898 - val_loss: 0.0318 - val_sparse_categorical_accuracy: 0.9899 Epoch 3/5 300/300 [==============================] - 6s 21ms/step - loss: 0.0199 - sparse_categorical_accuracy: 0.9935 - val_loss: 0.0397 - val_sparse_categorical_accuracy: 0.9866 Epoch 4/5 300/300 [==============================] - 6s 21ms/step - loss: 0.0109 - sparse_categorical_accuracy: 0.9964 - val_loss: 0.0436 - val_sparse_categorical_accuracy: 0.9892 Epoch 5/5 300/300 [==============================] - 6s 21ms/step - loss: 0.0103 - sparse_categorical_accuracy: 0.9963 - val_loss: 0.0481 - val_sparse_categorical_accuracy: 0.9881 <keras.callbacks.History at 0x7f0d485602e8>
Para reduzir a sobrecarga do Python e maximizar o desempenho de sua TPU, passe o argumento — steps_per_execution
— para Model.compile
. Neste exemplo, aumenta o rendimento em cerca de 50%:
with strategy.scope():
model = create_model()
model.compile(optimizer='adam',
# Anything between 2 and `steps_per_epoch` could help here.
steps_per_execution = 50,
loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
metrics=['sparse_categorical_accuracy'])
model.fit(train_dataset,
epochs=5,
steps_per_epoch=steps_per_epoch,
validation_data=test_dataset,
validation_steps=validation_steps)
Epoch 1/5 300/300 [==============================] - 12s 41ms/step - loss: 0.1515 - sparse_categorical_accuracy: 0.9537 - val_loss: 0.0416 - val_sparse_categorical_accuracy: 0.9863 Epoch 2/5 300/300 [==============================] - 3s 10ms/step - loss: 0.0366 - sparse_categorical_accuracy: 0.9891 - val_loss: 0.0410 - val_sparse_categorical_accuracy: 0.9875 Epoch 3/5 300/300 [==============================] - 3s 10ms/step - loss: 0.0191 - sparse_categorical_accuracy: 0.9938 - val_loss: 0.0432 - val_sparse_categorical_accuracy: 0.9865 Epoch 4/5 300/300 [==============================] - 3s 10ms/step - loss: 0.0141 - sparse_categorical_accuracy: 0.9951 - val_loss: 0.0447 - val_sparse_categorical_accuracy: 0.9875 Epoch 5/5 300/300 [==============================] - 3s 11ms/step - loss: 0.0093 - sparse_categorical_accuracy: 0.9968 - val_loss: 0.0426 - val_sparse_categorical_accuracy: 0.9884 <keras.callbacks.History at 0x7f0d0463cd68>
Treinar o modelo usando um loop de treinamento personalizado
Você também pode criar e treinar seu modelo usando as APIs tf.function
e tf.distribute
diretamente. Você pode usar a API strategy.experimental_distribute_datasets_from_function
para distribuir o conjunto de dados com uma função de conjunto de dados. Observe que, no exemplo abaixo, o tamanho do lote passado para o conjunto de dados é o tamanho do lote por réplica em vez do tamanho do lote global. Para saber mais, confira o tutorial Treinamento personalizado com tf.distribute.Strategy .
Primeiro, crie o modelo, conjuntos de dados e tf.functions:
# Create the model, optimizer and metrics inside the strategy scope, so that the
# variables can be mirrored on each device.
with strategy.scope():
model = create_model()
optimizer = tf.keras.optimizers.Adam()
training_loss = tf.keras.metrics.Mean('training_loss', dtype=tf.float32)
training_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
'training_accuracy', dtype=tf.float32)
# Calculate per replica batch size, and distribute the datasets on each TPU
# worker.
per_replica_batch_size = batch_size // strategy.num_replicas_in_sync
train_dataset = strategy.experimental_distribute_datasets_from_function(
lambda _: get_dataset(per_replica_batch_size, is_training=True))
@tf.function
def train_step(iterator):
"""The step function for one training step."""
def step_fn(inputs):
"""The computation to run on each TPU device."""
images, labels = inputs
with tf.GradientTape() as tape:
logits = model(images, training=True)
loss = tf.keras.losses.sparse_categorical_crossentropy(
labels, logits, from_logits=True)
loss = tf.nn.compute_average_loss(loss, global_batch_size=batch_size)
grads = tape.gradient(loss, model.trainable_variables)
optimizer.apply_gradients(list(zip(grads, model.trainable_variables)))
training_loss.update_state(loss * strategy.num_replicas_in_sync)
training_accuracy.update_state(labels, logits)
strategy.run(step_fn, args=(next(iterator),))
WARNING:tensorflow:From <ipython-input-1-5625c2a14441>:15: StrategyBase.experimental_distribute_datasets_from_function (from tensorflow.python.distribute.distribute_lib) is deprecated and will be removed in a future version. Instructions for updating: rename to distribute_datasets_from_function WARNING:tensorflow:From <ipython-input-1-5625c2a14441>:15: StrategyBase.experimental_distribute_datasets_from_function (from tensorflow.python.distribute.distribute_lib) is deprecated and will be removed in a future version. Instructions for updating: rename to distribute_datasets_from_function
Em seguida, execute o loop de treinamento:
steps_per_eval = 10000 // batch_size
train_iterator = iter(train_dataset)
for epoch in range(5):
print('Epoch: {}/5'.format(epoch))
for step in range(steps_per_epoch):
train_step(train_iterator)
print('Current step: {}, training loss: {}, accuracy: {}%'.format(
optimizer.iterations.numpy(),
round(float(training_loss.result()), 4),
round(float(training_accuracy.result()) * 100, 2)))
training_loss.reset_states()
training_accuracy.reset_states()
Epoch: 0/5 Current step: 300, training loss: 0.1339, accuracy: 95.79% Epoch: 1/5 Current step: 600, training loss: 0.0333, accuracy: 98.91% Epoch: 2/5 Current step: 900, training loss: 0.0176, accuracy: 99.43% Epoch: 3/5 Current step: 1200, training loss: 0.0126, accuracy: 99.61% Epoch: 4/5 Current step: 1500, training loss: 0.0122, accuracy: 99.61%
Melhorando o desempenho com várias etapas dentro tf.function
Você pode melhorar o desempenho executando várias etapas em um tf.function
. Isso é obtido envolvendo a chamada strategy.run
com um tf.range
dentro de tf.function
e o AutoGraph o converterá em um tf.while_loop
no trabalhador da TPU.
Apesar do desempenho aprimorado, existem compensações com esse método em comparação com a execução de uma única etapa dentro tf.function
. Executar várias etapas em um tf.function
é menos flexível - você não pode executar coisas ansiosamente ou código Python arbitrário dentro das etapas.
@tf.function
def train_multiple_steps(iterator, steps):
"""The step function for one training step."""
def step_fn(inputs):
"""The computation to run on each TPU device."""
images, labels = inputs
with tf.GradientTape() as tape:
logits = model(images, training=True)
loss = tf.keras.losses.sparse_categorical_crossentropy(
labels, logits, from_logits=True)
loss = tf.nn.compute_average_loss(loss, global_batch_size=batch_size)
grads = tape.gradient(loss, model.trainable_variables)
optimizer.apply_gradients(list(zip(grads, model.trainable_variables)))
training_loss.update_state(loss * strategy.num_replicas_in_sync)
training_accuracy.update_state(labels, logits)
for _ in tf.range(steps):
strategy.run(step_fn, args=(next(iterator),))
# Convert `steps_per_epoch` to `tf.Tensor` so the `tf.function` won't get
# retraced if the value changes.
train_multiple_steps(train_iterator, tf.convert_to_tensor(steps_per_epoch))
print('Current step: {}, training loss: {}, accuracy: {}%'.format(
optimizer.iterations.numpy(),
round(float(training_loss.result()), 4),
round(float(training_accuracy.result()) * 100, 2)))
Current step: 1800, training loss: 0.0081, accuracy: 99.74%
Próximos passos
- Documentação do Google Cloud TPU : como configurar e executar um Google Cloud TPU.
- Notebooks do Google Cloud TPU Colab : exemplos de treinamento de ponta a ponta.
- Guia de desempenho do Google Cloud TPU : melhore ainda mais o desempenho do Cloud TPU ajustando os parâmetros de configuração do Cloud TPU para seu aplicativo
- Treinamento distribuído com TensorFlow : como usar estratégias de distribuição — incluindo
tf.distribute.TPUStrategy
— com exemplos que mostram as práticas recomendadas.