Lihat di TensorFlow.org | Jalankan di Google Colab | Lihat sumber di GitHub | Unduh buku catatan |
Tutorial ini menunjukkan cara menggunakan tf.distribute.Strategy
dengan loop pelatihan khusus. Kami akan melatih model CNN sederhana pada dataset fashion MNIST. Kumpulan data mode MNIST berisi 60.000 gambar kereta berukuran 28 x 28 dan 10.000 gambar uji berukuran 28 x 28.
Kami menggunakan loop pelatihan khusus untuk melatih model kami karena mereka memberi kami fleksibilitas dan kontrol yang lebih besar pada pelatihan. Selain itu, lebih mudah untuk men-debug model dan loop pelatihan.
# Import TensorFlow
import tensorflow as tf
# Helper libraries
import numpy as np
import os
print(tf.__version__)
2.8.0-rc1
Unduh kumpulan data mode MNIST
fashion_mnist = tf.keras.datasets.fashion_mnist
(train_images, train_labels), (test_images, test_labels) = fashion_mnist.load_data()
# Adding a dimension to the array -> new shape == (28, 28, 1)
# We are doing this because the first layer in our model is a convolutional
# layer and it requires a 4D input (batch_size, height, width, channels).
# batch_size dimension will be added later on.
train_images = train_images[..., None]
test_images = test_images[..., None]
# Getting the images in [0, 1] range.
train_images = train_images / np.float32(255)
test_images = test_images / np.float32(255)
Buat strategi untuk mendistribusikan variabel dan grafik
Bagaimana cara kerja strategi tf.distribute.MirroredStrategy
?
- Semua variabel dan model grafik direplikasi pada replika.
- Masukan didistribusikan secara merata di seluruh replika.
- Setiap replika menghitung kerugian dan gradien untuk input yang diterimanya.
- Gradien disinkronkan di semua replika dengan menjumlahkannya.
- Setelah sinkronisasi, pembaruan yang sama dilakukan pada salinan variabel di setiap replika.
# If the list of devices is not specified in the
# `tf.distribute.MirroredStrategy` constructor, it will be auto-detected.
strategy = tf.distribute.MirroredStrategy()
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
print ('Number of devices: {}'.format(strategy.num_replicas_in_sync))
Number of devices: 1
Siapkan pipa input
Ekspor grafik dan variabel ke format SavedModel platform-agnostik. Setelah model Anda disimpan, Anda dapat memuatnya dengan atau tanpa cakupan.
BUFFER_SIZE = len(train_images)
BATCH_SIZE_PER_REPLICA = 64
GLOBAL_BATCH_SIZE = BATCH_SIZE_PER_REPLICA * strategy.num_replicas_in_sync
EPOCHS = 10
Buat kumpulan data dan distribusikan:
train_dataset = tf.data.Dataset.from_tensor_slices((train_images, train_labels)).shuffle(BUFFER_SIZE).batch(GLOBAL_BATCH_SIZE)
test_dataset = tf.data.Dataset.from_tensor_slices((test_images, test_labels)).batch(GLOBAL_BATCH_SIZE)
train_dist_dataset = strategy.experimental_distribute_dataset(train_dataset)
test_dist_dataset = strategy.experimental_distribute_dataset(test_dataset)
2022-01-26 05:45:53.991501: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2" op: "TensorSliceDataset" input: "Placeholder/_0" input: "Placeholder/_1" attr { key: "Toutput_types" value { list { type: DT_FLOAT type: DT_UINT8 } } } attr { key: "_cardinality" value { i: 60000 } } attr { key: "is_files" value { b: false } } attr { key: "metadata" value { s: "\n\024TensorSliceDataset:0" } } attr { key: "output_shapes" value { list { shape { dim { size: 28 } dim { size: 28 } dim { size: 1 } } shape { } } } } experimental_type { type_id: TFT_PRODUCT args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_UINT8 } } } } args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_UINT8 } } } } } 2022-01-26 05:45:54.034762: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2" op: "TensorSliceDataset" input: "Placeholder/_0" input: "Placeholder/_1" attr { key: "Toutput_types" value { list { type: DT_FLOAT type: DT_UINT8 } } } attr { key: "_cardinality" value { i: 10000 } } attr { key: "is_files" value { b: false } } attr { key: "metadata" value { s: "\n\024TensorSliceDataset:3" } } attr { key: "output_shapes" value { list { shape { dim { size: 28 } dim { size: 28 } dim { size: 1 } } shape { } } } } experimental_type { type_id: TFT_PRODUCT args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_UINT8 } } } } args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_UINT8 } } } } }
Buat modelnya
Buat model menggunakan tf.keras.Sequential
. Anda juga dapat menggunakan Model Subclassing API untuk melakukan ini.
def create_model():
model = tf.keras.Sequential([
tf.keras.layers.Conv2D(32, 3, activation='relu'),
tf.keras.layers.MaxPooling2D(),
tf.keras.layers.Conv2D(64, 3, activation='relu'),
tf.keras.layers.MaxPooling2D(),
tf.keras.layers.Flatten(),
tf.keras.layers.Dense(64, activation='relu'),
tf.keras.layers.Dense(10)
])
return model
# Create a checkpoint directory to store the checkpoints.
checkpoint_dir = './training_checkpoints'
checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt")
Tentukan fungsi kerugian
Biasanya, pada satu mesin dengan 1 GPU/CPU, kerugian dibagi dengan jumlah contoh dalam kumpulan input.
Jadi, bagaimana cara menghitung kerugian saat menggunakan tf.distribute.Strategy
?
Sebagai contoh, katakanlah Anda memiliki 4 GPU dan ukuran batch 64. Satu batch input didistribusikan ke seluruh replika (4 GPU), setiap replika mendapatkan input berukuran 16.
Model pada setiap replika melakukan forward pass dengan inputnya masing-masing dan menghitung kerugiannya. Sekarang, alih-alih membagi kerugian dengan jumlah contoh dalam input masing-masing (BATCH_SIZE_PER_REPLICA = 16), kerugian harus dibagi dengan GLOBAL_BATCH_SIZE (64).
Kenapa melakukan ini?
- Ini perlu dilakukan karena setelah gradien dihitung pada setiap replika, gradien disinkronkan di seluruh replika dengan menjumlahkannya .
Bagaimana cara melakukannya di TensorFlow?
Jika Anda menulis loop pelatihan khusus, seperti dalam tutorial ini, Anda harus menjumlahkan kerugian per contoh dan membagi jumlahnya dengan GLOBAL_BATCH_SIZE:
scale_loss = tf.reduce_sum(loss) * (1. / GLOBAL_BATCH_SIZE)
atau Anda dapat menggunakantf.nn.compute_average_loss
yang mengambil kerugian per contoh, bobot sampel opsional, dan GLOBAL_BATCH_SIZE sebagai argumen dan mengembalikan skala kerugian.Jika Anda menggunakan kerugian regularisasi dalam model Anda, maka Anda perlu menskalakan nilai kerugian dengan jumlah replika. Anda dapat melakukannya dengan menggunakan fungsi
tf.nn.scale_regularization_loss
.Menggunakan
tf.reduce_mean
tidak disarankan. Melakukannya akan membagi kerugian dengan ukuran batch aktual per replika yang dapat bervariasi dari langkah ke langkah.Pengurangan dan penskalaan ini dilakukan secara otomatis dalam keras
model.compile
danmodel.fit
Jika menggunakan kelas
tf.keras.losses
(seperti pada contoh di bawah), pengurangan kerugian harus secara eksplisit ditentukan menjadi salah satu dariNONE
atauSUM
.AUTO
danSUM_OVER_BATCH_SIZE
tidak diizinkan saat digunakan dengantf.distribute.Strategy
.AUTO
tidak diizinkan karena pengguna harus secara eksplisit memikirkan pengurangan apa yang mereka inginkan untuk memastikannya benar dalam kasus terdistribusi.SUM_OVER_BATCH_SIZE
tidak diizinkan karena saat ini hanya akan membagi dengan ukuran batch per replika, dan membiarkan pembagian dengan jumlah replika kepada pengguna, yang mungkin mudah dilewatkan. Jadi alih-alih kami meminta pengguna melakukan pengurangan sendiri secara eksplisit.Jika
labels
multidimensi, rata-rataper_example_loss
di seluruh jumlah elemen di setiap sampel. Misalnya, jika bentukpredictions
adalah(batch_size, H, W, n_classes)
danlabels
adalah(batch_size, H, W)
, Anda perlu memperbaruiper_example_loss
seperti:per_example_loss /= tf.cast(tf.reduce_prod(tf.shape(labels)[1:]), tf.float32)
with strategy.scope():
# Set reduction to `none` so we can do the reduction afterwards and divide by
# global batch size.
loss_object = tf.keras.losses.SparseCategoricalCrossentropy(
from_logits=True,
reduction=tf.keras.losses.Reduction.NONE)
def compute_loss(labels, predictions):
per_example_loss = loss_object(labels, predictions)
return tf.nn.compute_average_loss(per_example_loss, global_batch_size=GLOBAL_BATCH_SIZE)
Tentukan metrik untuk melacak kehilangan dan akurasi
Metrik ini melacak kehilangan pengujian dan pelatihan serta akurasi pengujian. Anda dapat menggunakan .result()
untuk mendapatkan akumulasi statistik kapan saja.
with strategy.scope():
test_loss = tf.keras.metrics.Mean(name='test_loss')
train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
name='train_accuracy')
test_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
name='test_accuracy')
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',). INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',). INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',). INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',). INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',). INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',). INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',). INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',). INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',). INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
Lingkaran pelatihan
# model, optimizer, and checkpoint must be created under `strategy.scope`.
with strategy.scope():
model = create_model()
optimizer = tf.keras.optimizers.Adam()
checkpoint = tf.train.Checkpoint(optimizer=optimizer, model=model)
def train_step(inputs):
images, labels = inputs
with tf.GradientTape() as tape:
predictions = model(images, training=True)
loss = compute_loss(labels, predictions)
gradients = tape.gradient(loss, model.trainable_variables)
optimizer.apply_gradients(zip(gradients, model.trainable_variables))
train_accuracy.update_state(labels, predictions)
return loss
def test_step(inputs):
images, labels = inputs
predictions = model(images, training=False)
t_loss = loss_object(labels, predictions)
test_loss.update_state(t_loss)
test_accuracy.update_state(labels, predictions)
# `run` replicates the provided computation and runs it
# with the distributed input.
@tf.function
def distributed_train_step(dataset_inputs):
per_replica_losses = strategy.run(train_step, args=(dataset_inputs,))
return strategy.reduce(tf.distribute.ReduceOp.SUM, per_replica_losses,
axis=None)
@tf.function
def distributed_test_step(dataset_inputs):
return strategy.run(test_step, args=(dataset_inputs,))
for epoch in range(EPOCHS):
# TRAIN LOOP
total_loss = 0.0
num_batches = 0
for x in train_dist_dataset:
total_loss += distributed_train_step(x)
num_batches += 1
train_loss = total_loss / num_batches
# TEST LOOP
for x in test_dist_dataset:
distributed_test_step(x)
if epoch % 2 == 0:
checkpoint.save(checkpoint_prefix)
template = ("Epoch {}, Loss: {}, Accuracy: {}, Test Loss: {}, "
"Test Accuracy: {}")
print (template.format(epoch+1, train_loss,
train_accuracy.result()*100, test_loss.result(),
test_accuracy.result()*100))
test_loss.reset_states()
train_accuracy.reset_states()
test_accuracy.reset_states()
Epoch 1, Loss: 0.5106383562088013, Accuracy: 81.77999877929688, Test Loss: 0.39399346709251404, Test Accuracy: 85.79000091552734 Epoch 2, Loss: 0.3362727463245392, Accuracy: 87.91333770751953, Test Loss: 0.35871225595474243, Test Accuracy: 86.7699966430664 Epoch 3, Loss: 0.2928692400455475, Accuracy: 89.2683334350586, Test Loss: 0.2999486029148102, Test Accuracy: 89.04000091552734 Epoch 4, Loss: 0.2605818510055542, Accuracy: 90.41999816894531, Test Loss: 0.28474125266075134, Test Accuracy: 89.47000122070312 Epoch 5, Loss: 0.23641237616539001, Accuracy: 91.32166290283203, Test Loss: 0.26421546936035156, Test Accuracy: 90.41000366210938 Epoch 6, Loss: 0.2192477434873581, Accuracy: 91.90499877929688, Test Loss: 0.2650589942932129, Test Accuracy: 90.4800033569336 Epoch 7, Loss: 0.20016911625862122, Accuracy: 92.66999816894531, Test Loss: 0.25025954842567444, Test Accuracy: 90.9000015258789 Epoch 8, Loss: 0.18381091952323914, Accuracy: 93.26499938964844, Test Loss: 0.2585820257663727, Test Accuracy: 90.95999908447266 Epoch 9, Loss: 0.1699329912662506, Accuracy: 93.67500305175781, Test Loss: 0.26234227418899536, Test Accuracy: 91.0199966430664 Epoch 10, Loss: 0.15756534039974213, Accuracy: 94.16333770751953, Test Loss: 0.25516414642333984, Test Accuracy: 90.93000030517578
Hal-hal yang perlu diperhatikan dalam contoh di atas:
- Kami mengulangi
train_dist_dataset
dantest_dist_dataset
menggunakan konstruksifor x in ...
- Kerugian yang diskalakan adalah nilai kembalian dari
distributed_train_step
. Nilai ini dikumpulkan di seluruh replika menggunakan panggilantf.distribute.Strategy.reduce
dan kemudian lintas batch dengan menjumlahkan nilai kembalian panggilantf.distribute.Strategy.reduce
. -
tf.keras.Metrics
harus diperbarui di dalamtrain_step
dantest_step
yang dieksekusi olehtf.distribute.Strategy.run
. *tf.distribute.Strategy.run
mengembalikan hasil dari setiap replika lokal dalam strategi, dan ada beberapa cara untuk menggunakan hasil ini. Anda dapat melakukantf.distribute.Strategy.reduce
untuk mendapatkan nilai agregat. Anda juga dapat melakukantf.distribute.Strategy.experimental_local_results
untuk mendapatkan daftar nilai yang terkandung dalam hasil, satu per replika lokal.
Kembalikan pos pemeriksaan dan tes terbaru
Sebuah model checkpoint dengan tf.distribute.Strategy
dapat dipulihkan dengan atau tanpa strategi.
eval_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
name='eval_accuracy')
new_model = create_model()
new_optimizer = tf.keras.optimizers.Adam()
test_dataset = tf.data.Dataset.from_tensor_slices((test_images, test_labels)).batch(GLOBAL_BATCH_SIZE)
@tf.function
def eval_step(images, labels):
predictions = new_model(images, training=False)
eval_accuracy(labels, predictions)
checkpoint = tf.train.Checkpoint(optimizer=new_optimizer, model=new_model)
checkpoint.restore(tf.train.latest_checkpoint(checkpoint_dir))
for images, labels in test_dataset:
eval_step(images, labels)
print ('Accuracy after restoring the saved model without strategy: {}'.format(
eval_accuracy.result()*100))
Accuracy after restoring the saved model without strategy: 91.0199966430664
Cara alternatif untuk mengulangi kumpulan data
Menggunakan iterator
Jika Anda ingin mengulangi sejumlah langkah tertentu dan tidak melalui seluruh kumpulan data, Anda dapat membuat iterator menggunakan panggilan iter
dan panggilan eksplisit next
pada iterator. Anda dapat memilih untuk mengulangi dataset baik di dalam maupun di luar fungsi tf. Berikut adalah cuplikan kecil yang menunjukkan iterasi kumpulan data di luar tf.function menggunakan iterator.
for _ in range(EPOCHS):
total_loss = 0.0
num_batches = 0
train_iter = iter(train_dist_dataset)
for _ in range(10):
total_loss += distributed_train_step(next(train_iter))
num_batches += 1
average_train_loss = total_loss / num_batches
template = ("Epoch {}, Loss: {}, Accuracy: {}")
print (template.format(epoch+1, average_train_loss, train_accuracy.result()*100))
train_accuracy.reset_states()
Epoch 10, Loss: 0.17486707866191864, Accuracy: 93.4375 Epoch 10, Loss: 0.12386945635080338, Accuracy: 95.3125 Epoch 10, Loss: 0.16411852836608887, Accuracy: 93.90625 Epoch 10, Loss: 0.10728752613067627, Accuracy: 96.40625 Epoch 10, Loss: 0.11865834891796112, Accuracy: 95.625 Epoch 10, Loss: 0.12875251471996307, Accuracy: 95.15625 Epoch 10, Loss: 0.1189488023519516, Accuracy: 95.625 Epoch 10, Loss: 0.1456708014011383, Accuracy: 95.15625 Epoch 10, Loss: 0.12446556240320206, Accuracy: 95.3125 Epoch 10, Loss: 0.1380888819694519, Accuracy: 95.46875
Iterasi di dalam tf.function
Anda juga dapat mengulangi seluruh input train_dist_dataset
di dalam tf.function menggunakan konstruksi for x in ...
atau dengan membuat iterator seperti yang kita lakukan di atas. Contoh di bawah ini mendemonstrasikan membungkus satu epoch pelatihan dalam tf.function dan mengulangi train_dist_dataset
di dalam fungsi.
@tf.function
def distributed_train_epoch(dataset):
total_loss = 0.0
num_batches = 0
for x in dataset:
per_replica_losses = strategy.run(train_step, args=(x,))
total_loss += strategy.reduce(
tf.distribute.ReduceOp.SUM, per_replica_losses, axis=None)
num_batches += 1
return total_loss / tf.cast(num_batches, dtype=tf.float32)
for epoch in range(EPOCHS):
train_loss = distributed_train_epoch(train_dist_dataset)
template = ("Epoch {}, Loss: {}, Accuracy: {}")
print (template.format(epoch+1, train_loss, train_accuracy.result()*100))
train_accuracy.reset_states()
/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow/python/data/ops/dataset_ops.py:449: UserWarning: To make it possible to preserve tf.data options across serialization boundaries, their implementation has moved to be part of the TensorFlow graph. As a consequence, the options value is in general no longer known at graph construction time. Invoking this method in graph mode retains the legacy behavior of the original implementation, but note that the returned value might not reflect the actual value of the options. warnings.warn("To make it possible to preserve tf.data options across " Epoch 1, Loss: 0.14398494362831116, Accuracy: 94.63999938964844 Epoch 2, Loss: 0.13246288895606995, Accuracy: 94.97333526611328 Epoch 3, Loss: 0.11922841519117355, Accuracy: 95.63833618164062 Epoch 4, Loss: 0.11084160208702087, Accuracy: 95.99333190917969 Epoch 5, Loss: 0.10420522093772888, Accuracy: 96.0816650390625 Epoch 6, Loss: 0.09215126931667328, Accuracy: 96.63500213623047 Epoch 7, Loss: 0.0878651961684227, Accuracy: 96.67666625976562 Epoch 8, Loss: 0.07854588329792023, Accuracy: 97.09333038330078 Epoch 9, Loss: 0.07217177003622055, Accuracy: 97.34833526611328 Epoch 10, Loss: 0.06753655523061752, Accuracy: 97.48999786376953
Melacak kehilangan pelatihan di seluruh replika
Kami tidak menyarankan penggunaan tf.metrics.Mean
untuk melacak kerugian pelatihan di berbagai replika, karena perhitungan penskalaan kerugian yang dilakukan.
Misalnya, jika Anda menjalankan pekerjaan pelatihan dengan karakteristik berikut:
- Dua replika
- Dua sampel diproses pada setiap replika
- Nilai kerugian yang dihasilkan: [2, 3] dan [4, 5] pada setiap replika
- Ukuran batch global = 4
Dengan penskalaan kerugian, Anda menghitung nilai kerugian per sampel pada setiap replika dengan menambahkan nilai kerugian, lalu membaginya dengan ukuran kumpulan global. Dalam hal ini: (2 + 3) / 4 = 1.25
dan (4 + 5) / 4 = 2.25
.
Jika Anda menggunakan tf.metrics.Mean
untuk melacak kerugian di kedua replika, hasilnya akan berbeda. Dalam contoh ini, Anda mendapatkan total
3,50 dan count
2, yang menghasilkan total
/ count
= 1,75 saat result()
dipanggil pada metrik. Kerugian yang dihitung dengan tf.keras.Metrics
diskalakan dengan faktor tambahan yang sama dengan jumlah replika yang disinkronkan.
Panduan dan contoh
Berikut adalah beberapa contoh untuk menggunakan strategi distribusi dengan loop pelatihan khusus:
- Panduan pelatihan terdistribusi
- Contoh DenseNet menggunakan
MirroredStrategy
. - Contoh BERT dilatih menggunakan
MirroredStrategy
danTPUStrategy
. Contoh ini sangat membantu untuk memahami cara memuat dari pos pemeriksaan dan menghasilkan pos pemeriksaan berkala selama pelatihan terdistribusi, dll. - Contoh NCF dilatih menggunakan
MirroredStrategy
yang dapat diaktifkan menggunakan flagkeras_use_ctl
. - Contoh NMT dilatih menggunakan
MirroredStrategy
.
Contoh lainnya tercantum dalam panduan strategi Distribusi .
Langkah selanjutnya
- Coba
tf.distribute.Strategy
API baru pada model Anda. - Kunjungi bagian Performa dalam panduan untuk mempelajari lebih lanjut tentang strategi dan fitur lain yang dapat Anda gunakan untuk mengoptimalkan performa model TensorFlow Anda.