ย้ายข้อมูลการฝึกอบรม CPU/GPU สำหรับผู้ปฏิบัติงานหลายคน

คู่มือนี้สาธิตวิธีการโยกย้ายเวิร์กโฟลว์การฝึกอบรมแบบกระจายของผู้ปฏิบัติงานหลายคนจาก TensorFlow 1 เป็น TensorFlow 2

ในการดำเนินการฝึกอบรมผู้ปฏิบัติงานหลายคนด้วย CPU/GPU:



# The notebook uses a dataset instance for `Model.fit` with
# `ParameterServerStrategy`, which depends on symbols in TF 2.7.
import tensorflow as tf
import tensorflow.compat.v1 as tf1
features = [[1., 1.5], [2., 2.5], [3., 3.5]]
= [[0.3], [0.5], [0.7]]
= [[4., 4.5], [5., 5.5], [6., 6.5]]
= [[0.8], [0.9], [1.]]

คุณจะต้องใช้ตัวแปรสภาพแวดล้อมการกำหนดค่า 'TF_CONFIG' สำหรับการฝึกอบรมบนเครื่องหลายเครื่องใน TensorFlow ใช้ 'TF_CONFIG' เพื่อระบุที่อยู่ 'cluster' และ 'task' (เรียนรู้เพิ่มเติมในคู่มือ Distributed_training )

import json
import os

= {
'cluster': {
'chief': ['localhost:11111'],
'worker': ['localhost:12345', 'localhost:23456', 'localhost:21212'],
'ps': ['localhost:12121', 'localhost:13131'],
'task': {'type': 'chief', 'index': 0}

.environ['TF_CONFIG'] = json.dumps(tf_config)

ใช้คำสั่ง del เพื่อลบตัวแปร (แต่ในการฝึกอบรมผู้ปฏิบัติงานหลายคนในโลกแห่งความเป็นจริงใน TensorFlow 1 คุณจะไม่ต้องทำสิ่งนี้):

del os.environ['TF_CONFIG']

TensorFlow 1: Multi-worker กระจายการฝึกอบรมด้วย tf.estimator APIs

ข้อมูลโค้ดต่อไปนี้สาธิตเวิร์กโฟลว์ที่เป็นที่ยอมรับของการฝึกอบรมผู้ปฏิบัติงานหลายคนใน TF1: คุณจะใช้ tf.estimator.Estimator , a tf.estimator.TrainSpec , tf.estimator.EvalSpec และ tf.estimator.train_and_evaluate API เพื่อแจกจ่าย การฝึกอบรม:

def _input_fn():
return tf1.data.Dataset.from_tensor_slices((features, labels)).batch(1)

def _eval_input_fn():
return tf1.data.Dataset.from_tensor_slices(
(eval_features, eval_labels)).batch(1)

def _model_fn(features, labels, mode):
= tf1.layers.Dense(1)(features)
= tf1.losses.mean_squared_error(labels=labels, predictions=logits)
= tf1.train.AdagradOptimizer(0.05)
= optimizer.minimize(loss, global_step=tf1.train.get_global_step())
return tf1.estimator.EstimatorSpec(mode, loss=loss, train_op=train_op)

= tf1.estimator.Estimator(model_fn=_model_fn)
= tf1.estimator.TrainSpec(input_fn=_input_fn)
= tf1.estimator.EvalSpec(input_fn=_eval_input_fn)
.estimator.train_and_evaluate(estimator, train_spec, eval_spec)
INFO:tensorflow:Using default config.
WARNING:tensorflow:Using temporary folder as model directory: /tmp/tmpvfb91q_5
INFO:tensorflow:Not using Distribute Coordinator.
INFO:tensorflow:Running training and evaluation locally (non-distributed).
INFO:tensorflow:Start train and evaluate loop. The evaluate will happen after every checkpoint. Checkpoint frequency is determined based on RunConfig arguments: save_checkpoints_steps None or save_checkpoints_secs 600.
INFO:tensorflow:Calling model_fn.
INFO:tensorflow:Done calling model_fn.
INFO:tensorflow:Create CheckpointSaverHook.
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Done running local_init_op.
INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 0...
INFO:tensorflow:Saving checkpoints for 0 into /tmp/tmpvfb91q_5/model.ckpt.
INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 0...
INFO:tensorflow:loss = 0.038075272, step = 0
INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 3...
INFO:tensorflow:Saving checkpoints for 3 into /tmp/tmpvfb91q_5/model.ckpt.
INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 3...
INFO:tensorflow:Calling model_fn.
INFO:tensorflow:Done calling model_fn.
INFO:tensorflow:Starting evaluation at 2021-11-13T02:31:06
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:Restoring parameters from /tmp/tmpvfb91q_5/model.ckpt-3
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Done running local_init_op.
INFO:tensorflow:Inference Time : 0.13630s
INFO:tensorflow:Finished evaluation at 2021-11-13-02:31:06
INFO:tensorflow:Saving dict for global step 3: global_step = 3, loss = 0.005215075
INFO:tensorflow:Saving 'checkpoint_path' summary for global step 3: /tmp/tmpvfb91q_5/model.ckpt-3
INFO:tensorflow:Loss for final step: 0.061832994.
({'loss': 0.005215075, 'global_step': 3}, [])

TensorFlow 2: การฝึกอบรมพนักงานหลายคนพร้อมกลยุทธ์การจัดจำหน่าย

ใน TensorFlow 2 การฝึกอบรมแบบกระจายไปยังผู้ปฏิบัติงานหลายคนด้วย CPU, GPU และ TPU ทำได้ผ่าน tf.distribute.Strategy s

ตัวอย่างต่อไปนี้สาธิตวิธีใช้กลยุทธ์สองแบบดังกล่าว: tf.distribute.experimental.ParameterServerStrategy และ tf.distribute.MultiWorkerMirroredStrategy ซึ่งทั้งสองวิธีนี้ได้รับการออกแบบมาสำหรับการฝึกอบรม CPU/GPU กับผู้ปฏิบัติงานหลายคน

ParameterServerStrategy ใช้ผู้ ประสานงาน ( 'chief' ) ซึ่งทำให้เป็นมิตรกับสิ่งแวดล้อมในสมุดบันทึก Colab นี้มากขึ้น คุณจะใช้ยูทิลิตี้บางอย่างที่นี่เพื่อตั้งค่าองค์ประกอบสนับสนุนที่จำเป็นสำหรับประสบการณ์ที่รันได้ที่นี่: คุณจะสร้าง คลัสเตอร์ในกระบวนการ โดยที่เธรดใช้เพื่อจำลองเซิร์ฟเวอร์พารามิเตอร์ ( 'ps' ) และผู้ปฏิบัติงาน ( 'worker' ) . สำหรับข้อมูลเพิ่มเติมเกี่ยวกับการฝึกเซิร์ฟเวอร์พารามิเตอร์ โปรดดูที่ การ ฝึกเซิร์ฟเวอร์พารามิเตอร์ด้วยบทช่วยสอน ParameterServerStrategy

ในตัวอย่างนี้ ขั้นแรกให้กำหนดตัวแปรสภาพแวดล้อม 'TF_CONFIG' ด้วย tf.distribute.cluster_resolver.TFConfigClusterResolver เพื่อจัดเตรียมข้อมูลคลัสเตอร์ หากคุณกำลังใช้ระบบการจัดการคลัสเตอร์สำหรับการฝึกอบรมแบบกระจาย ให้ตรวจสอบว่าระบบมี 'TF_CONFIG' ให้คุณอยู่แล้วหรือไม่ ซึ่งในกรณีนี้ คุณไม่จำเป็นต้องตั้งค่าตัวแปรสภาพแวดล้อมนี้อย่างชัดเจน (เรียนรู้เพิ่มเติมในการ ตั้งค่าส่วนตัวแปรสภาพแวดล้อม 'TF_CONFIG' ในคู่มือ การฝึกอบรมแบบกระจายด้วย TensorFlow )

# Find ports that are available for the `'chief'` (the coordinator),
# `'worker'`s, and `'ps'` (parameter servers).
import portpicker

= portpicker.pick_unused_port()
= [portpicker.pick_unused_port() for _ in range(3)]
= [portpicker.pick_unused_port() for _ in range(2)]

# Dump the cluster information to `'TF_CONFIG'`.
= {
'cluster': {
'chief': ["localhost:%s" % chief_port],
'worker': ["localhost:%s" % port for port in worker_ports],
'ps':  ["localhost:%s" % port for port in ps_ports],
'task': {'type': 'chief', 'index': 0}
.environ['TF_CONFIG'] = json.dumps(tf_config)

# Use a cluster resolver to bridge the information to the strategy created below.
= tf.distribute.cluster_resolver.TFConfigClusterResolver()

จากนั้น สร้าง tf.distribute.Server สำหรับผู้ปฏิบัติงานและเซิร์ฟเวอร์พารามิเตอร์ทีละรายการ:

# Workers need some inter_ops threads to work properly.
# This is only needed for this notebook to demo. Real servers
# should not need this.
= tf.compat.v1.ConfigProto()
.inter_op_parallelism_threads = 4

for i in range(3):

for i in range(2):

ในการฝึกอบรมแบบกระจายในโลกแห่งความเป็นจริง แทนที่จะเริ่ม tf.distribute.Server ทั้งหมดบนผู้ประสานงาน คุณจะใช้หลายเครื่อง และแต่ละเครื่องที่กำหนดให้เป็น "worker" และ "ps" (เซิร์ฟเวอร์พารามิเตอร์) จะใช้แต่ละเครื่อง เรียกใช้ tf.distribute.Server อ้างถึง Clusters ในส่วนโลกแห่งความจริง ในบทช่วยสอน การฝึกอบรมเซิร์ฟเวอร์ Parameter สำหรับรายละเอียดเพิ่มเติม

เมื่อพร้อมทุกอย่างแล้ว ให้สร้างวัตถุ ParameterServerStrategy :

strategy = tf.distribute.experimental.ParameterServerStrategy(cluster_resolver)
INFO:tensorflow:`tf.distribute.experimental.ParameterServerStrategy` is initialized with cluster_spec: ClusterSpec({'chief': ['localhost:16660'], 'ps': ['localhost:15313', 'localhost:20369'], 'worker': ['localhost:21380', 'localhost:18699', 'localhost:19420']})
INFO:tensorflow:ParameterServerStrategyV2 is now connecting to cluster with cluster_spec: ClusterSpec({'chief': ['localhost:16660'], 'ps': ['localhost:15313', 'localhost:20369'], 'worker': ['localhost:21380', 'localhost:18699', 'localhost:19420']})
INFO:tensorflow:ParameterServerStrategy (CentralStorageStrategy if you are using a single machine) with compute_devices = ['/job:chief/replica:0/task:0/device:GPU:0'], variable_device = '/job:chief/replica:0/task:0/device:GPU:0'
INFO:tensorflow:Number of GPUs on workers: 1

เมื่อคุณสร้างอ็อบเจ็กต์กลยุทธ์แล้ว ให้กำหนดโมเดล เครื่องมือเพิ่มประสิทธิภาพ และตัวแปรอื่นๆ และเรียก Keras Model.compile ภายใน Strategy.scope API เพื่อแจกจ่ายการฝึกอบรม (ดูเอกสาร API ของ Strategy.scope สำหรับข้อมูลเพิ่มเติม)

หากคุณต้องการปรับแต่งการฝึกของคุณโดยกำหนดรูปแบบการส่งต่อและถอยหลัง ให้ดูที่หัวข้อ การฝึกด้วยลูปการฝึกแบบกำหนดเอง ในบทช่วยสอน การฝึกเซิร์ฟเวอร์พารามิเตอร์ สำหรับรายละเอียดเพิ่มเติม

dataset = tf.data.Dataset.from_tensor_slices(
(features, labels)).shuffle(10).repeat().batch(64)

= tf.data.Dataset.from_tensor_slices(
(eval_features, eval_labels)).repeat().batch(1)

with strategy.scope():
= tf.keras.models.Sequential([tf.keras.layers.Dense(1)])
= tf.keras.optimizers.Adagrad(learning_rate=0.05)
.compile(optimizer, "mse")

.fit(dataset, epochs=5, steps_per_epoch=10)
Epoch 1/5
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
10/10 - 3s - loss: 7.4912 - 3s/epoch - 259ms/step
Epoch 2/5
10/10 - 0s - loss: 3.3420 - 43ms/epoch - 4ms/step
Epoch 3/5
10/10 - 0s - loss: 1.9022 - 44ms/epoch - 4ms/step
Epoch 4/5
10/10 - 0s - loss: 1.1536 - 42ms/epoch - 4ms/step
Epoch 5/5
10/10 - 0s - loss: 0.7208 - 43ms/epoch - 4ms/step
<keras.callbacks.History at 0x7f45d83f3a50>
model.evaluate(eval_dataset, steps=10, return_dict=True)
1/10 [==>...........................] - ETA: 11s - loss: 2.4114
10/10 [==============================] - 2s 38ms/step - loss: 3.8431
{'loss': 3.843122}

พาร์ติชั่น ( tf.distribute.experimental.partitioners )

ParameterServerStrategy ใน TensorFlow 2 รองรับการแบ่งพาร์ติชันแบบแปรผันและเสนอตัวแบ่งพาร์ติชันแบบเดียวกับ TensorFlow 1 โดยมีชื่อที่สับสนน้อยกว่า: - tf.compat.v1.variable_axis_size_partitioner -> tf.distribute.experimental.partitioners.MaxSizePartitioner : ตัวแบ่งพาร์ติชั่นที่ช่วยให้ชาร์ดมีขนาดสูงสุด) . - tf.compat.v1.min_max_variable_partitioner -> tf.distribute.experimental.partitioners.MinSizePartitioner : ตัวแบ่งพาร์ติชั่นที่จัดสรรขนาดต่ำสุดต่อชาร์ด - tf.compat.v1.fixed_size_partitioner -> tf.distribute.experimental.partitioners.FixedShardsPartitioner : ตัวแบ่งพาร์ติชันที่จัดสรรส่วนแบ่งข้อมูลจำนวนคงที่

หรือ คุณสามารถใช้วัตถุ MultiWorkerMirroredStrategy :

# To clean up the `TF_CONFIG` used for `ParameterServerStrategy`.
del os.environ['TF_CONFIG']
= tf.distribute.MultiWorkerMirroredStrategy()
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled.
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:GPU:0',), communication = CommunicationImplementation.AUTO

คุณสามารถแทนที่กลยุทธ์ที่ใช้ด้านบนด้วยวัตถุ MultiWorkerMirroredStrategy เพื่อดำเนินการฝึกอบรมด้วยกลยุทธ์นี้

เช่นเดียวกับ tf.estimator API เนื่องจาก MultiWorkerMirroredStrategy เป็นกลยุทธ์แบบหลายลูกค้า จึงไม่มีวิธีง่าย ๆ ในการรันการฝึกอบรมแบบกระจายในสมุดบันทึก Colab นี้ ดังนั้น การแทนที่โค้ดด้านบนด้วยกลยุทธ์นี้จะจบลงด้วยการรันสิ่งต่าง ๆ ในเครื่อง การฝึกอบรมผู้ปฏิบัติงานหลายคน ด้วย Keras Model.fit / บทช่วยสอน การฝึกอบรมแบบกำหนดเองจะ สาธิตวิธีเรียกใช้การฝึกอบรมผู้ปฏิบัติงานหลายคนด้วยการตั้งค่าตัวแปร 'TF_CONFIG' โดยมีพนักงานสองคนบนโฮสต์ในพื้นที่ใน Colab ในทางปฏิบัติ คุณจะต้องสร้างผู้ปฏิบัติงานหลายคนบนที่อยู่/พอร์ต IP ภายนอก และใช้ตัวแปร 'TF_CONFIG' เพื่อระบุการกำหนดค่าคลัสเตอร์สำหรับผู้ปฏิบัติงานแต่ละคน


หากต้องการเรียนรู้เพิ่มเติมเกี่ยวกับการฝึกอบรมแบบกระจายผู้ปฏิบัติงานหลายคนด้วย tf.distribute.experimental.ParameterServerStrategy และ tf.distribute.MultiWorkerMirroredStrategy ใน TensorFlow 2 ให้พิจารณาแหล่งข้อมูลต่อไปนี้: