Lihat di TensorFlow.org | Jalankan di Google Colab | Lihat sumber di GitHub | Unduh buku catatan |
Panduan ini menunjukkan cara memigrasikan alur kerja pelatihan terdistribusi multi-pekerja Anda dari TensorFlow 1 ke TensorFlow 2.
Untuk melakukan pelatihan multi-pekerja dengan CPU/GPU:
- Di TensorFlow 1, Anda biasanya menggunakan API
tf.estimator.train_and_evaluate
dantf.estimator.Estimator
. - Di TensorFlow 2, gunakan Keras API untuk menulis model, fungsi kerugian, pengoptimal, dan metrik. Kemudian, distribusikan pelatihan dengan Keras
Model.fit
API atau loop pelatihan khusus (dengantf.GradientTape
) ke beberapa pekerja dengantf.distribute.experimental.ParameterServerStrategy
atautf.distribute.MultiWorkerMirroredStrategy
. Untuk lebih jelasnya, lihat tutorial berikut:
Mempersiapkan
Mulailah dengan beberapa impor yang diperlukan dan kumpulan data sederhana untuk tujuan demonstrasi:
# The notebook uses a dataset instance for `Model.fit` with
# `ParameterServerStrategy`, which depends on symbols in TF 2.7.
# Install a utility needed for this demonstration
!pip install portpicker
import tensorflow as tf
import tensorflow.compat.v1 as tf1
features = [[1., 1.5], [2., 2.5], [3., 3.5]]
labels = [[0.3], [0.5], [0.7]]
eval_features = [[4., 4.5], [5., 5.5], [6., 6.5]]
eval_labels = [[0.8], [0.9], [1.]]
Anda akan memerlukan variabel lingkungan konfigurasi 'TF_CONFIG'
untuk pelatihan di beberapa mesin di TensorFlow. Gunakan 'TF_CONFIG'
untuk menentukan alamat 'cluster'
dan 'task'
. (Pelajari lebih lanjut di panduan Distributed_training .)
import json
import os
tf_config = {
'cluster': {
'chief': ['localhost:11111'],
'worker': ['localhost:12345', 'localhost:23456', 'localhost:21212'],
'ps': ['localhost:12121', 'localhost:13131'],
},
'task': {'type': 'chief', 'index': 0}
}
os.environ['TF_CONFIG'] = json.dumps(tf_config)
Gunakan pernyataan del
untuk menghapus variabel (tetapi dalam pelatihan multi-pekerja di dunia nyata di TensorFlow 1, Anda tidak perlu melakukan ini):
del os.environ['TF_CONFIG']
TensorFlow 1: Pelatihan terdistribusi multi-pekerja dengan tf.estimator API
Cuplikan kode berikut menunjukkan alur kerja kanonik pelatihan multi-pekerja di TF1: Anda akan menggunakan tf.estimator.Estimator
, tf.estimator.TrainSpec
, tf.estimator.EvalSpec
, dan tf.estimator.train_and_evaluate
API untuk didistribusikan pelatihan:
def _input_fn():
return tf1.data.Dataset.from_tensor_slices((features, labels)).batch(1)
def _eval_input_fn():
return tf1.data.Dataset.from_tensor_slices(
(eval_features, eval_labels)).batch(1)
def _model_fn(features, labels, mode):
logits = tf1.layers.Dense(1)(features)
loss = tf1.losses.mean_squared_error(labels=labels, predictions=logits)
optimizer = tf1.train.AdagradOptimizer(0.05)
train_op = optimizer.minimize(loss, global_step=tf1.train.get_global_step())
return tf1.estimator.EstimatorSpec(mode, loss=loss, train_op=train_op)
estimator = tf1.estimator.Estimator(model_fn=_model_fn)
train_spec = tf1.estimator.TrainSpec(input_fn=_input_fn)
eval_spec = tf1.estimator.EvalSpec(input_fn=_eval_input_fn)
tf1.estimator.train_and_evaluate(estimator, train_spec, eval_spec)
INFO:tensorflow:Using default config. WARNING:tensorflow:Using temporary folder as model directory: /tmp/tmpvfb91q_5 INFO:tensorflow:Using config: {'_model_dir': '/tmp/tmpvfb91q_5', '_tf_random_seed': None, '_save_summary_steps': 100, '_save_checkpoints_steps': None, '_save_checkpoints_secs': 600, '_session_config': allow_soft_placement: true graph_options { rewrite_options { meta_optimizer_iterations: ONE } } , '_keep_checkpoint_max': 5, '_keep_checkpoint_every_n_hours': 10000, '_log_step_count_steps': 100, '_train_distribute': None, '_device_fn': None, '_protocol': None, '_eval_distribute': None, '_experimental_distribute': None, '_experimental_max_worker_delay_secs': None, '_session_creation_timeout_secs': 7200, '_checkpoint_save_graph_def': True, '_service': None, '_cluster_spec': ClusterSpec({}), '_task_type': 'worker', '_task_id': 0, '_global_id_in_cluster': 0, '_master': '', '_evaluation_master': '', '_is_chief': True, '_num_ps_replicas': 0, '_num_worker_replicas': 1} INFO:tensorflow:Not using Distribute Coordinator. INFO:tensorflow:Running training and evaluation locally (non-distributed). INFO:tensorflow:Start train and evaluate loop. The evaluate will happen after every checkpoint. Checkpoint frequency is determined based on RunConfig arguments: save_checkpoints_steps None or save_checkpoints_secs 600. WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow/python/training/training_util.py:401: Variable.initialized_value (from tensorflow.python.ops.variables) is deprecated and will be removed in a future version. Instructions for updating: Use Variable.read_value. Variables in 2.X are initialized automatically both in eager and graph (inside tf.defun) contexts. INFO:tensorflow:Calling model_fn. WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow/python/training/adagrad.py:143: calling Constant.__init__ (from tensorflow.python.ops.init_ops) with dtype is deprecated and will be removed in a future version. Instructions for updating: Call initializer instance with the dtype argument instead of passing it to the constructor INFO:tensorflow:Done calling model_fn. INFO:tensorflow:Create CheckpointSaverHook. INFO:tensorflow:Graph was finalized. INFO:tensorflow:Running local_init_op. INFO:tensorflow:Done running local_init_op. INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 0... INFO:tensorflow:Saving checkpoints for 0 into /tmp/tmpvfb91q_5/model.ckpt. INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 0... INFO:tensorflow:loss = 0.038075272, step = 0 INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 3... INFO:tensorflow:Saving checkpoints for 3 into /tmp/tmpvfb91q_5/model.ckpt. INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 3... INFO:tensorflow:Calling model_fn. INFO:tensorflow:Done calling model_fn. INFO:tensorflow:Starting evaluation at 2021-11-13T02:31:06 INFO:tensorflow:Graph was finalized. INFO:tensorflow:Restoring parameters from /tmp/tmpvfb91q_5/model.ckpt-3 INFO:tensorflow:Running local_init_op. INFO:tensorflow:Done running local_init_op. INFO:tensorflow:Inference Time : 0.13630s INFO:tensorflow:Finished evaluation at 2021-11-13-02:31:06 INFO:tensorflow:Saving dict for global step 3: global_step = 3, loss = 0.005215075 INFO:tensorflow:Saving 'checkpoint_path' summary for global step 3: /tmp/tmpvfb91q_5/model.ckpt-3 INFO:tensorflow:Loss for final step: 0.061832994. ({'loss': 0.005215075, 'global_step': 3}, [])
TensorFlow 2: Pelatihan multi-pekerja dengan strategi distribusi
Di TensorFlow 2, pelatihan terdistribusi ke beberapa pekerja dengan CPU, GPU, dan TPU dilakukan melalui tf.distribute.Strategy
s.
Contoh berikut menunjukkan cara menggunakan dua strategi tersebut: tf.distribute.experimental.ParameterServerStrategy
dan tf.distribute.MultiWorkerMirroredStrategy
, keduanya dirancang untuk pelatihan CPU/GPU dengan banyak pekerja.
ParameterServerStrategy
mempekerjakan koordinator ( 'chief'
), yang membuatnya lebih ramah dengan lingkungan di notebook Colab ini. Anda akan menggunakan beberapa utilitas di sini untuk menyiapkan elemen pendukung yang penting untuk pengalaman yang dapat dijalankan di sini: Anda akan membuat cluster dalam proses , di mana utas digunakan untuk mensimulasikan server parameter ( 'ps'
) dan pekerja ( 'worker'
) . Untuk informasi selengkapnya tentang pelatihan server parameter, lihat pelatihan Server parameter dengan tutorial ParameterServerStrategy .
Dalam contoh ini, pertama-tama tentukan variabel lingkungan 'TF_CONFIG'
dengan tf.distribute.cluster_resolver.TFConfigClusterResolver
untuk memberikan informasi cluster. Jika Anda menggunakan sistem manajemen klaster untuk pelatihan terdistribusi Anda, periksa apakah sistem tersebut sudah menyediakan 'TF_CONFIG'
untuk Anda, dalam hal ini Anda tidak perlu menyetel variabel lingkungan ini secara eksplisit. (Pelajari lebih lanjut di bagian Menyiapkan variabel lingkungan 'TF_CONFIG'
dalam pelatihan Terdistribusi dengan panduan TensorFlow .)
# Find ports that are available for the `'chief'` (the coordinator),
# `'worker'`s, and `'ps'` (parameter servers).
import portpicker
chief_port = portpicker.pick_unused_port()
worker_ports = [portpicker.pick_unused_port() for _ in range(3)]
ps_ports = [portpicker.pick_unused_port() for _ in range(2)]
# Dump the cluster information to `'TF_CONFIG'`.
tf_config = {
'cluster': {
'chief': ["localhost:%s" % chief_port],
'worker': ["localhost:%s" % port for port in worker_ports],
'ps': ["localhost:%s" % port for port in ps_ports],
},
'task': {'type': 'chief', 'index': 0}
}
os.environ['TF_CONFIG'] = json.dumps(tf_config)
# Use a cluster resolver to bridge the information to the strategy created below.
cluster_resolver = tf.distribute.cluster_resolver.TFConfigClusterResolver()
Kemudian, buat tf.distribute.Server
s untuk pekerja dan server parameter satu per satu:
# Workers need some inter_ops threads to work properly.
# This is only needed for this notebook to demo. Real servers
# should not need this.
worker_config = tf.compat.v1.ConfigProto()
worker_config.inter_op_parallelism_threads = 4
for i in range(3):
tf.distribute.Server(
cluster_resolver.cluster_spec(),
job_name="worker",
task_index=i,
config=worker_config)
for i in range(2):
tf.distribute.Server(
cluster_resolver.cluster_spec(),
job_name="ps",
task_index=i)
Dalam pelatihan terdistribusi di dunia nyata, alih-alih memulai semua tf.distribute.Server
s pada koordinator, Anda akan menggunakan beberapa mesin, dan yang ditetapkan sebagai "worker"
s dan "ps"
(server parameter) masing-masing akan menjalankan tf.distribute.Server
. Lihat Cluster di bagian dunia nyata di tutorial pelatihan server Parameter untuk detail lebih lanjut.
Dengan semuanya siap, buat objek ParameterServerStrategy
:
strategy = tf.distribute.experimental.ParameterServerStrategy(cluster_resolver)
INFO:tensorflow:`tf.distribute.experimental.ParameterServerStrategy` is initialized with cluster_spec: ClusterSpec({'chief': ['localhost:16660'], 'ps': ['localhost:15313', 'localhost:20369'], 'worker': ['localhost:21380', 'localhost:18699', 'localhost:19420']}) INFO:tensorflow:ParameterServerStrategyV2 is now connecting to cluster with cluster_spec: ClusterSpec({'chief': ['localhost:16660'], 'ps': ['localhost:15313', 'localhost:20369'], 'worker': ['localhost:21380', 'localhost:18699', 'localhost:19420']}) INFO:tensorflow:ParameterServerStrategy (CentralStorageStrategy if you are using a single machine) with compute_devices = ['/job:chief/replica:0/task:0/device:GPU:0'], variable_device = '/job:chief/replica:0/task:0/device:GPU:0' INFO:tensorflow:Number of GPUs on workers: 1
Setelah Anda membuat objek strategi, tentukan model, pengoptimal, dan variabel lainnya, dan panggil Keras Model.compile
dalam Strategy.scope
API untuk mendistribusikan pelatihan. (Lihat dokumen Strategy.scope
API untuk informasi lebih lanjut.)
Jika Anda lebih memilih untuk menyesuaikan pelatihan Anda dengan, misalnya, menentukan gerakan maju dan mundur, lihat Pelatihan dengan bagian loop pelatihan kustom di Tutorial pelatihan server parameter untuk detail selengkapnya.
dataset = tf.data.Dataset.from_tensor_slices(
(features, labels)).shuffle(10).repeat().batch(64)
eval_dataset = tf.data.Dataset.from_tensor_slices(
(eval_features, eval_labels)).repeat().batch(1)
with strategy.scope():
model = tf.keras.models.Sequential([tf.keras.layers.Dense(1)])
optimizer = tf.keras.optimizers.Adagrad(learning_rate=0.05)
model.compile(optimizer, "mse")
model.fit(dataset, epochs=5, steps_per_epoch=10)
Epoch 1/5 INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',). INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',). /tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow/python/data/ops/dataset_ops.py:453: UserWarning: To make it possible to preserve tf.data options across serialization boundaries, their implementation has moved to be part of the TensorFlow graph. As a consequence, the options value is in general no longer known at graph construction time. Invoking this method in graph mode retains the legacy behavior of the original implementation, but note that the returned value might not reflect the actual value of the options. warnings.warn("To make it possible to preserve tf.data options across " INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',). INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',). 2021-11-13 02:31:09.110074: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:766] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2" op: "TensorSliceDataset" input: "Placeholder/_0" input: "Placeholder/_1" attr { key: "Toutput_types" value { list { type: DT_FLOAT type: DT_FLOAT } } } attr { key: "_cardinality" value { i: 3 } } attr { key: "is_files" value { b: false } } attr { key: "metadata" value { s: "\n\024TensorSliceDataset:4" } } attr { key: "output_shapes" value { list { shape { dim { size: 2 } } shape { dim { size: 1 } } } } } 2021-11-13 02:31:09.115349: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:766] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2" op: "TensorSliceDataset" input: "Placeholder/_0" input: "Placeholder/_1" attr { key: "Toutput_types" value { list { type: DT_FLOAT type: DT_FLOAT } } } attr { key: "_cardinality" value { i: 3 } } attr { key: "is_files" value { b: false } } attr { key: "metadata" value { s: "\n\024TensorSliceDataset:4" } } attr { key: "output_shapes" value { list { shape { dim { size: 2 } } shape { dim { size: 1 } } } } } 2021-11-13 02:31:09.117963: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:766] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2" op: "TensorSliceDataset" input: "Placeholder/_0" input: "Placeholder/_1" attr { key: "Toutput_types" value { list { type: DT_FLOAT type: DT_FLOAT } } } attr { key: "_cardinality" value { i: 3 } } attr { key: "is_files" value { b: false } } attr { key: "metadata" value { s: "\n\024TensorSliceDataset:4" } } attr { key: "output_shapes" value { list { shape { dim { size: 2 } } shape { dim { size: 1 } } } } } 10/10 - 3s - loss: 7.4912 - 3s/epoch - 259ms/step Epoch 2/5 10/10 - 0s - loss: 3.3420 - 43ms/epoch - 4ms/step Epoch 3/5 10/10 - 0s - loss: 1.9022 - 44ms/epoch - 4ms/step Epoch 4/5 10/10 - 0s - loss: 1.1536 - 42ms/epoch - 4ms/step Epoch 5/5 10/10 - 0s - loss: 0.7208 - 43ms/epoch - 4ms/step <keras.callbacks.History at 0x7f45d83f3a50>
model.evaluate(eval_dataset, steps=10, return_dict=True)
1/10 [==>...........................] - ETA: 11s - loss: 2.4114 2021-11-13 02:31:10.757780: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:766] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2" op: "TensorSliceDataset" input: "Placeholder/_0" input: "Placeholder/_1" attr { key: "Toutput_types" value { list { type: DT_FLOAT type: DT_FLOAT } } } attr { key: "_cardinality" value { i: 3 } } attr { key: "is_files" value { b: false } } attr { key: "metadata" value { s: "\n\024TensorSliceDataset:8" } } attr { key: "output_shapes" value { list { shape { dim { size: 2 } } shape { dim { size: 1 } } } } } 2021-11-13 02:31:10.910985: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:766] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2" op: "TensorSliceDataset" input: "Placeholder/_0" input: "Placeholder/_1" attr { key: "Toutput_types" value { list { type: DT_FLOAT type: DT_FLOAT } } } attr { key: "_cardinality" value { i: 3 } } attr { key: "is_files" value { b: false } } attr { key: "metadata" value { s: "\n\024TensorSliceDataset:8" } } attr { key: "output_shapes" value { list { shape { dim { size: 2 } } shape { dim { size: 1 } } } } } 10/10 [==============================] - 2s 38ms/step - loss: 3.8431 2021-11-13 02:31:11.053772: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:766] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2" op: "TensorSliceDataset" input: "Placeholder/_0" input: "Placeholder/_1" attr { key: "Toutput_types" value { list { type: DT_FLOAT type: DT_FLOAT } } } attr { key: "_cardinality" value { i: 3 } } attr { key: "is_files" value { b: false } } attr { key: "metadata" value { s: "\n\024TensorSliceDataset:8" } } attr { key: "output_shapes" value { list { shape { dim { size: 2 } } shape { dim { size: 1 } } } } } {'loss': 3.843122}
Partisi (
tf.distribute.experimental.partitioners
)
ParameterServerStrategy
di TensorFlow 2 mendukung partisi variabel dan menawarkan partisi yang sama seperti TensorFlow 1, dengan nama yang tidak terlalu membingungkan: -tf.compat.v1.variable_axis_size_partitioner
->tf.distribute.experimental.partitioners.MaxSizePartitioner
: partisi yang menyimpan pecahan di bawah ukuran maksimum) . -tf.compat.v1.min_max_variable_partitioner
->tf.distribute.experimental.partitioners.MinSizePartitioner
: partisi yang mengalokasikan ukuran minimum per pecahan. -tf.compat.v1.fixed_size_partitioner
->tf.distribute.experimental.partitioners.FixedShardsPartitioner
: sebuah partisi yang mengalokasikan sejumlah pecahan tetap.
Atau, Anda dapat menggunakan objek MultiWorkerMirroredStrategy
:
# To clean up the `TF_CONFIG` used for `ParameterServerStrategy`.
del os.environ['TF_CONFIG']
strategy = tf.distribute.MultiWorkerMirroredStrategy()
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled. INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:GPU:0',), communication = CommunicationImplementation.AUTO
Anda dapat mengganti strategi yang digunakan di atas dengan objek MultiWorkerMirroredStrategy
untuk melakukan pelatihan dengan strategi ini.
Seperti tf.estimator
API, karena MultiWorkerMirroredStrategy
adalah strategi multiklien, tidak ada cara mudah untuk menjalankan pelatihan terdistribusi di notebook Colab ini. Oleh karena itu, mengganti kode di atas dengan strategi ini berakhir dengan menjalankan semuanya secara lokal. Pelatihan multi-pekerja dengan Keras Model.fit / tutorial loop pelatihan khusus menunjukkan cara menjalankan pelatihan multi-pekerja dengan penyiapan variabel 'TF_CONFIG'
, dengan dua pekerja di localhost di Colab. Dalam praktiknya, Anda akan membuat beberapa pekerja di alamat/port IP eksternal, dan menggunakan variabel 'TF_CONFIG'
untuk menentukan konfigurasi cluster untuk setiap pekerja.
Langkah selanjutnya
Untuk mempelajari lebih lanjut pelatihan terdistribusi multi-pekerja dengan tf.distribute.experimental.ParameterServerStrategy
dan tf.distribute.MultiWorkerMirroredStrategy
di TensorFlow 2, pertimbangkan referensi berikut:
- Tutorial: Pelatihan server parameter dengan ParameterServerStrategy dan Keras Model.fit/a custom training loop
- Tutorial: Pelatihan multi-pekerja dengan MultiWorkerMirroredStrategy dan Keras Model.fit
- Tutorial: Pelatihan multi-pekerja dengan MultiWorkerMirroredStrategy dan loop pelatihan khusus
- Panduan: Pelatihan terdistribusi dengan TensorFlow
- Panduan: Mengoptimalkan kinerja GPU TensorFlow dengan TensorFlow Profiler
- Panduan: Menggunakan GPU (bagian Menggunakan beberapa GPU)