Lihat di TensorFlow.org | Jalankan di Google Colab | Lihat sumber di GitHub | Unduh buku catatan |
Ringkasan
Tutorial ini mendemonstrasikan pelatihan multi-pekerja dengan API loop pelatihan khusus, yang didistribusikan melalui MultiWorkerMirroredStrategy, sehingga model Keras yang dirancang untuk dijalankan pada pekerja tunggal dapat bekerja dengan mulus pada banyak pekerja dengan perubahan kode minimal.
Kami menggunakan loop pelatihan khusus untuk melatih model kami karena mereka memberi kami fleksibilitas dan kontrol yang lebih besar pada pelatihan. Selain itu, lebih mudah untuk men-debug model dan loop pelatihan. Informasi lebih rinci tersedia di Menulis loop pelatihan dari awal .
Jika Anda mencari cara menggunakan MultiWorkerMirroredStrategy
dengan keras model.fit
, lihat tutorial ini sebagai gantinya.
Pelatihan Terdistribusi dalam panduan TensorFlow tersedia untuk ikhtisar tentang strategi distribusi yang didukung TensorFlow bagi mereka yang tertarik untuk memahami lebih dalam tentang tf.distribute.Strategy
API.
Mempersiapkan
Pertama, beberapa impor yang diperlukan.
import json
import os
import sys
Sebelum mengimpor TensorFlow, buat beberapa perubahan pada lingkungan.
Nonaktifkan semua GPU. Ini mencegah kesalahan yang disebabkan oleh semua pekerja yang mencoba menggunakan GPU yang sama. Untuk aplikasi nyata setiap pekerja akan berada di mesin yang berbeda.
os.environ["CUDA_VISIBLE_DEVICES"] = "-1"
Setel ulang variabel lingkungan TF_CONFIG
, Anda akan melihat lebih banyak tentang ini nanti.
os.environ.pop('TF_CONFIG', None)
Pastikan direktori saat ini berada di jalur python. Ini memungkinkan notebook untuk mengimpor file yang ditulis oleh %%writefile
nanti.
if '.' not in sys.path:
sys.path.insert(0, '.')
Sekarang impor TensorFlow.
import tensorflow as tf
Definisi set data dan model
Selanjutnya buat file mnist.py
dengan model sederhana dan pengaturan dataset. File python ini akan digunakan oleh proses pekerja dalam tutorial ini:
%%writefile mnist.py
import os
import tensorflow as tf
import numpy as np
def mnist_dataset(batch_size):
(x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
# The `x` arrays are in uint8 and have values in the range [0, 255].
# You need to convert them to float32 with values in the range [0, 1]
x_train = x_train / np.float32(255)
y_train = y_train.astype(np.int64)
train_dataset = tf.data.Dataset.from_tensor_slices(
(x_train, y_train)).shuffle(60000)
return train_dataset
def dataset_fn(global_batch_size, input_context):
batch_size = input_context.get_per_replica_batch_size(global_batch_size)
dataset = mnist_dataset(batch_size)
dataset = dataset.shard(input_context.num_input_pipelines,
input_context.input_pipeline_id)
dataset = dataset.batch(batch_size)
return dataset
def build_cnn_model():
return tf.keras.Sequential([
tf.keras.Input(shape=(28, 28)),
tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
tf.keras.layers.Conv2D(32, 3, activation='relu'),
tf.keras.layers.Flatten(),
tf.keras.layers.Dense(128, activation='relu'),
tf.keras.layers.Dense(10)
])
Writing mnist.py
Konfigurasi Multi-Pekerja
Sekarang mari kita memasuki dunia pelatihan multi-pekerja. Di TensorFlow, variabel lingkungan TF_CONFIG
diperlukan untuk pelatihan di beberapa mesin, yang masing-masing mungkin memiliki peran yang berbeda. TF_CONFIG
digunakan di bawah ini, adalah string JSON yang digunakan untuk menentukan konfigurasi cluster pada setiap pekerja yang merupakan bagian dari cluster. Ini adalah metode default untuk menentukan cluster, menggunakan cluster_resolver.TFConfigClusterResolver
, tetapi ada opsi lain yang tersedia di distribute.cluster_resolver
.
Deskripsikan cluster Anda
Berikut adalah contoh konfigurasi:
tf_config = {
'cluster': {
'worker': ['localhost:12345', 'localhost:23456']
},
'task': {'type': 'worker', 'index': 0}
}
Ini adalah serial TF_CONFIG
yang sama dengan string JSON:
json.dumps(tf_config)
'{"cluster": {"worker": ["localhost:12345", "localhost:23456"]}, "task": {"type": "worker", "index": 0} }'
Ada dua komponen TF_CONFIG
: cluster
dan task
.
cluster
adalah sama untuk semua pekerja dan memberikan informasi tentang cluster pelatihan, yang merupakan dict yang terdiri dari berbagai jenis pekerjaan sepertiworker
. Dalam pelatihan multi-pekerja denganMultiWorkerMirroredStrategy
, biasanya ada satuworker
yang mengambil sedikit tanggung jawab seperti menyimpan pos pemeriksaan dan menulis file ringkasan untuk TensorBoard selain apa yang dilakukanworker
biasa. Pekerja seperti itu disebut sebagai pekerjachief
, dan merupakan kebiasaan bahwaworker
denganindex
0 ditunjuk sebagaiworker
utama (sebenarnya beginilah caratf.distribute.Strategy
diterapkan).task
memberikan informasi tentang tugas saat ini dan berbeda pada setiap pekerja. Ini menentukantype
danindex
pekerja itu.
Dalam contoh ini, Anda mengatur type
tugas ke "worker"
dan index
tugas ke 0
. Mesin ini adalah pekerja pertama dan akan diangkat sebagai kepala pekerja dan melakukan lebih banyak pekerjaan daripada yang lain. Perhatikan bahwa mesin lain harus memiliki variabel lingkungan TF_CONFIG
yang disetel juga, dan itu harus memiliki dict cluster
yang sama, tetapi type
tugas atau index
tugas yang berbeda tergantung pada peran mesin tersebut.
Untuk tujuan ilustrasi, tutorial ini menunjukkan bagaimana seseorang dapat mengatur TF_CONFIG
dengan 2 pekerja di localhost
. Dalam praktiknya, pengguna akan membuat banyak pekerja pada alamat/port IP eksternal, dan menyetel TF_CONFIG
pada setiap pekerja dengan tepat.
Dalam contoh ini Anda akan menggunakan 2 pekerja, TF_CONFIG
pekerja pertama ditampilkan di atas. Untuk pekerja kedua Anda akan mengatur tf_config['task']['index']=1
Di atas, tf_config
hanyalah variabel lokal di python. Untuk benar-benar menggunakannya untuk mengonfigurasi pelatihan, kamus ini perlu diserialisasikan sebagai JSON, dan ditempatkan di variabel lingkungan TF_CONFIG
.
Variabel lingkungan dan subproses di notebook
Subproses mewarisi variabel lingkungan dari induknya. Jadi, jika Anda menetapkan variabel lingkungan dalam proses jupyter notebook
ini:
os.environ['GREETINGS'] = 'Hello TensorFlow!'
Anda dapat mengakses variabel lingkungan dari subproses:
echo ${GREETINGS}
Hello TensorFlow!
Di bagian berikutnya, Anda akan menggunakan ini untuk meneruskan TF_CONFIG
ke subproses pekerja. Anda tidak akan pernah benar-benar meluncurkan pekerjaan Anda dengan cara ini, tetapi itu sudah cukup untuk tujuan tutorial ini: Untuk mendemonstrasikan contoh multi-pekerja minimal.
MultiWorkerMirroredStrategy
Untuk melatih model, gunakan instance tf.distribute.MultiWorkerMirroredStrategy
, yang membuat salinan semua variabel dalam lapisan model pada setiap perangkat di semua pekerja. Panduan tf.distribute.Strategy
memiliki rincian lebih lanjut tentang strategi ini.
strategy = tf.distribute.MultiWorkerMirroredStrategy()
2022-01-26 05:35:39.353025: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:CPU:0',), communication = CommunicationImplementation.AUTO 2022-01-26 05:35:39.353298: E tensorflow/stream_executor/cuda/cuda_diagnostics.cc:313] kernel version 470.63.1 does not match DSO version 470.86.0 -- cannot find working devices in this configuration
Gunakan tf.distribute.Strategy.scope
untuk menentukan bahwa strategi harus digunakan saat membangun model Anda. Ini menempatkan Anda dalam " konteks replika silang " untuk strategi ini, yang berarti strategi tersebut mengendalikan hal-hal seperti penempatan variabel.
import mnist
with strategy.scope():
# Model building needs to be within `strategy.scope()`.
multi_worker_model = mnist.build_cnn_model()
Auto-sharding data Anda ke seluruh pekerja
Dalam pelatihan multi-pekerja, sharding kumpulan data tidak selalu diperlukan, namun ini memberi Anda semantik tepat satu kali yang membuat lebih banyak pelatihan lebih dapat direproduksi, yaitu pelatihan pada banyak pekerja harus sama dengan pelatihan pada satu pekerja. Catatan: kinerja dapat terpengaruh dalam beberapa kasus.
Lihat: distribute_datasets_from_function
per_worker_batch_size = 64
num_workers = len(tf_config['cluster']['worker'])
global_batch_size = per_worker_batch_size * num_workers
with strategy.scope():
multi_worker_dataset = strategy.distribute_datasets_from_function(
lambda input_context: mnist.dataset_fn(global_batch_size, input_context))
Tentukan Loop Pelatihan Kustom dan Latih modelnya
Tentukan pengoptimal
with strategy.scope():
# The creation of optimizer and train_accuracy will need to be in
# `strategy.scope()` as well, since they create variables.
optimizer = tf.keras.optimizers.RMSprop(learning_rate=0.001)
train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
name='train_accuracy')
Tentukan langkah pelatihan dengan tf.function
@tf.function
def train_step(iterator):
"""Training step function."""
def step_fn(inputs):
"""Per-Replica step function."""
x, y = inputs
with tf.GradientTape() as tape:
predictions = multi_worker_model(x, training=True)
per_batch_loss = tf.keras.losses.SparseCategoricalCrossentropy(
from_logits=True,
reduction=tf.keras.losses.Reduction.NONE)(y, predictions)
loss = tf.nn.compute_average_loss(
per_batch_loss, global_batch_size=global_batch_size)
grads = tape.gradient(loss, multi_worker_model.trainable_variables)
optimizer.apply_gradients(
zip(grads, multi_worker_model.trainable_variables))
train_accuracy.update_state(y, predictions)
return loss
per_replica_losses = strategy.run(step_fn, args=(next(iterator),))
return strategy.reduce(
tf.distribute.ReduceOp.SUM, per_replica_losses, axis=None)
Menyimpan dan memulihkan pos pemeriksaan
Implementasi pos pemeriksaan dalam Loop Pelatihan Kustom mengharuskan pengguna untuk menanganinya alih-alih menggunakan panggilan balik yang keras. Ini memungkinkan Anda untuk menyimpan bobot model dan mengembalikannya tanpa harus menyimpan keseluruhan model.
from multiprocessing import util
checkpoint_dir = os.path.join(util.get_temp_dir(), 'ckpt')
def _is_chief(task_type, task_id, cluster_spec):
return (task_type is None
or task_type == 'chief'
or (task_type == 'worker'
and task_id == 0
and "chief" not in cluster_spec.as_dict()))
def _get_temp_dir(dirpath, task_id):
base_dirpath = 'workertemp_' + str(task_id)
temp_dir = os.path.join(dirpath, base_dirpath)
tf.io.gfile.makedirs(temp_dir)
return temp_dir
def write_filepath(filepath, task_type, task_id, cluster_spec):
dirpath = os.path.dirname(filepath)
base = os.path.basename(filepath)
if not _is_chief(task_type, task_id, cluster_spec):
dirpath = _get_temp_dir(dirpath, task_id)
return os.path.join(dirpath, base)
Di sini, Anda akan membuat satu tf.train.Checkpoint
yang melacak model, yang dikelola oleh tf.train.CheckpointManager
sehingga hanya pos pemeriksaan terbaru yang dipertahankan.
epoch = tf.Variable(
initial_value=tf.constant(0, dtype=tf.dtypes.int64), name='epoch')
step_in_epoch = tf.Variable(
initial_value=tf.constant(0, dtype=tf.dtypes.int64),
name='step_in_epoch')
task_type, task_id = (strategy.cluster_resolver.task_type,
strategy.cluster_resolver.task_id)
# We normally don't need to manually instantiate a ClusterSpec, but in this
# illustrative example we did not set TF_CONFIG before initializing the
# strategy. See the next section for "real-world" usage.
cluster_spec = tf.train.ClusterSpec(tf_config['cluster'])
checkpoint = tf.train.Checkpoint(
model=multi_worker_model, epoch=epoch, step_in_epoch=step_in_epoch)
write_checkpoint_dir = write_filepath(checkpoint_dir, task_type, task_id,
cluster_spec)
checkpoint_manager = tf.train.CheckpointManager(
checkpoint, directory=write_checkpoint_dir, max_to_keep=1)
Sekarang, ketika Anda perlu memulihkan, Anda dapat menemukan pos pemeriksaan terbaru yang disimpan menggunakan fungsi tf.train.latest_checkpoint
yang nyaman.
latest_checkpoint = tf.train.latest_checkpoint(checkpoint_dir)
if latest_checkpoint:
checkpoint.restore(latest_checkpoint)
Setelah memulihkan pos pemeriksaan, Anda dapat melanjutkan dengan melatih loop pelatihan khusus Anda.
num_epochs = 3
num_steps_per_epoch = 70
while epoch.numpy() < num_epochs:
iterator = iter(multi_worker_dataset)
total_loss = 0.0
num_batches = 0
while step_in_epoch.numpy() < num_steps_per_epoch:
total_loss += train_step(iterator)
num_batches += 1
step_in_epoch.assign_add(1)
train_loss = total_loss / num_batches
print('Epoch: %d, accuracy: %f, train_loss: %f.'
%(epoch.numpy(), train_accuracy.result(), train_loss))
train_accuracy.reset_states()
# Once the `CheckpointManager` is set up, you're now ready to save, and remove
# the checkpoints non-chief workers saved.
checkpoint_manager.save()
if not _is_chief(task_type, task_id, cluster_spec):
tf.io.gfile.rmtree(write_checkpoint_dir)
epoch.assign_add(1)
step_in_epoch.assign(0)
2022-01-26 05:35:40.200068: W tensorflow/core/framework/dataset.cc:768] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations. Epoch: 0, accuracy: 0.849107, train_loss: 0.491886. Epoch: 1, accuracy: 0.937835, train_loss: 0.197650. Epoch: 2, accuracy: 0.963170, train_loss: 0.129683.
Penyiapan kode lengkap pada pekerja
Untuk benar-benar berjalan dengan MultiWorkerMirroredStrategy
Anda harus menjalankan proses pekerja dan meneruskan TF_CONFIG
kepada mereka.
Seperti file mnist.py
yang ditulis sebelumnya, berikut ini adalah main.py
yang berisi kode yang sama dengan yang kita jalani langkah demi langkah sebelumnya di colab ini, kita hanya menulisnya ke file sehingga masing-masing pekerja akan menjalankannya:
File: main.py
%%writefile main.py
import os
import json
import tensorflow as tf
import mnist
from multiprocessing import util
per_worker_batch_size = 64
tf_config = json.loads(os.environ['TF_CONFIG'])
num_workers = len(tf_config['cluster']['worker'])
global_batch_size = per_worker_batch_size * num_workers
num_epochs = 3
num_steps_per_epoch=70
# Checkpoint saving and restoring
def _is_chief(task_type, task_id, cluster_spec):
return (task_type is None
or task_type == 'chief'
or (task_type == 'worker'
and task_id == 0
and 'chief' not in cluster_spec.as_dict()))
def _get_temp_dir(dirpath, task_id):
base_dirpath = 'workertemp_' + str(task_id)
temp_dir = os.path.join(dirpath, base_dirpath)
tf.io.gfile.makedirs(temp_dir)
return temp_dir
def write_filepath(filepath, task_type, task_id, cluster_spec):
dirpath = os.path.dirname(filepath)
base = os.path.basename(filepath)
if not _is_chief(task_type, task_id, cluster_spec):
dirpath = _get_temp_dir(dirpath, task_id)
return os.path.join(dirpath, base)
checkpoint_dir = os.path.join(util.get_temp_dir(), 'ckpt')
# Define Strategy
strategy = tf.distribute.MultiWorkerMirroredStrategy()
with strategy.scope():
# Model building/compiling need to be within `strategy.scope()`.
multi_worker_model = mnist.build_cnn_model()
multi_worker_dataset = strategy.distribute_datasets_from_function(
lambda input_context: mnist.dataset_fn(global_batch_size, input_context))
optimizer = tf.keras.optimizers.RMSprop(learning_rate=0.001)
train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
name='train_accuracy')
@tf.function
def train_step(iterator):
"""Training step function."""
def step_fn(inputs):
"""Per-Replica step function."""
x, y = inputs
with tf.GradientTape() as tape:
predictions = multi_worker_model(x, training=True)
per_batch_loss = tf.keras.losses.SparseCategoricalCrossentropy(
from_logits=True,
reduction=tf.keras.losses.Reduction.NONE)(y, predictions)
loss = tf.nn.compute_average_loss(
per_batch_loss, global_batch_size=global_batch_size)
grads = tape.gradient(loss, multi_worker_model.trainable_variables)
optimizer.apply_gradients(
zip(grads, multi_worker_model.trainable_variables))
train_accuracy.update_state(y, predictions)
return loss
per_replica_losses = strategy.run(step_fn, args=(next(iterator),))
return strategy.reduce(
tf.distribute.ReduceOp.SUM, per_replica_losses, axis=None)
epoch = tf.Variable(
initial_value=tf.constant(0, dtype=tf.dtypes.int64), name='epoch')
step_in_epoch = tf.Variable(
initial_value=tf.constant(0, dtype=tf.dtypes.int64),
name='step_in_epoch')
task_type, task_id, cluster_spec = (strategy.cluster_resolver.task_type,
strategy.cluster_resolver.task_id,
strategy.cluster_resolver.cluster_spec())
checkpoint = tf.train.Checkpoint(
model=multi_worker_model, epoch=epoch, step_in_epoch=step_in_epoch)
write_checkpoint_dir = write_filepath(checkpoint_dir, task_type, task_id,
cluster_spec)
checkpoint_manager = tf.train.CheckpointManager(
checkpoint, directory=write_checkpoint_dir, max_to_keep=1)
# Restoring the checkpoint
latest_checkpoint = tf.train.latest_checkpoint(checkpoint_dir)
if latest_checkpoint:
checkpoint.restore(latest_checkpoint)
# Resume our CTL training
while epoch.numpy() < num_epochs:
iterator = iter(multi_worker_dataset)
total_loss = 0.0
num_batches = 0
while step_in_epoch.numpy() < num_steps_per_epoch:
total_loss += train_step(iterator)
num_batches += 1
step_in_epoch.assign_add(1)
train_loss = total_loss / num_batches
print('Epoch: %d, accuracy: %f, train_loss: %f.'
%(epoch.numpy(), train_accuracy.result(), train_loss))
train_accuracy.reset_states()
checkpoint_manager.save()
if not _is_chief(task_type, task_id, cluster_spec):
tf.io.gfile.rmtree(write_checkpoint_dir)
epoch.assign_add(1)
step_in_epoch.assign(0)
Writing main.py
Latih dan Evaluasi
Direktori saat ini sekarang berisi kedua file Python:
ls *.py
main.py mnist.py
Jadi json-serialize TF_CONFIG
dan tambahkan ke variabel lingkungan:
os.environ['TF_CONFIG'] = json.dumps(tf_config)
Sekarang, Anda dapat meluncurkan proses pekerja yang akan menjalankan main.py
dan menggunakan TF_CONFIG
:
# first kill any previous runs
%killbgscripts
All background processes were killed.
python main.py &> job_0.log
Ada beberapa hal yang perlu diperhatikan tentang perintah di atas:
- Ia menggunakan
%%bash
yang merupakan "ajaib" notebook untuk menjalankan beberapa perintah bash. - Ia menggunakan flag
--bg
untuk menjalankan prosesbash
di latar belakang, karena pekerja ini tidak akan berhenti. Itu menunggu semua pekerja sebelum dimulai.
Proses pekerja di latar belakang tidak akan mencetak output ke notebook ini, jadi &>
mengarahkan outputnya ke file, sehingga Anda dapat melihat apa yang terjadi.
Jadi, tunggu beberapa detik hingga proses dimulai:
import time
time.sleep(20)
Sekarang lihat apa yang telah dihasilkan ke file log pekerja sejauh ini:
cat job_0.log
2022-01-26 05:35:49.225025: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected 2022-01-26 05:35:49.225297: E tensorflow/stream_executor/cuda/cuda_diagnostics.cc:313] kernel version 470.63.1 does not match DSO version 470.86.0 -- cannot find working devices in this configuration
Baris terakhir dari file log harus mengatakan: Started server with target: grpc://localhost:12345
. Pekerja pertama sekarang siap, dan sedang menunggu semua pekerja lain siap untuk melanjutkan.
Jadi perbarui tf_config
untuk proses pekerja kedua untuk mengambil:
tf_config['task']['index'] = 1
os.environ['TF_CONFIG'] = json.dumps(tf_config)
Sekarang luncurkan pekerja kedua. Ini akan memulai pelatihan karena semua pekerja aktif (jadi proses ini tidak perlu dilatar belakangi):
python main.py > /dev/null 2>&1
Sekarang jika Anda memeriksa ulang log yang ditulis oleh pekerja pertama, Anda akan melihat bahwa ia berpartisipasi dalam pelatihan model itu:
cat job_0.log
2022-01-26 05:35:49.225025: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected 2022-01-26 05:35:49.225297: E tensorflow/stream_executor/cuda/cuda_diagnostics.cc:313] kernel version 470.63.1 does not match DSO version 470.86.0 -- cannot find working devices in this configuration 2022-01-26 05:36:10.343173: W tensorflow/core/framework/dataset.cc:768] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations. Epoch: 0, accuracy: 0.832589, train_loss: 0.531260. Epoch: 1, accuracy: 0.936161, train_loss: 0.214774. Epoch: 2, accuracy: 0.958594, train_loss: 0.140772.
# Delete the `TF_CONFIG`, and kill any background tasks so they don't affect the next section.
os.environ.pop('TF_CONFIG', None)
%killbgscripts
All background processes were killed.
Pelatihan multi pekerja secara mendalam
Tutorial ini telah mendemonstrasikan alur kerja Custom Training Loop
dari pengaturan multi-pekerja. Penjelasan rinci tentang topik lain tersedia di model.fit's guide
pengaturan multi-pekerja dan berlaku untuk CTL.
Lihat juga
- Pelatihan Terdistribusi dalam panduan TensorFlow memberikan gambaran umum tentang strategi distribusi yang tersedia.
- Model resmi , banyak di antaranya dapat dikonfigurasi untuk menjalankan beberapa strategi distribusi.
- Bagian Kinerja dalam panduan memberikan informasi tentang strategi dan alat lain yang dapat Anda gunakan untuk mengoptimalkan kinerja model TensorFlow Anda.