مشاهده در TensorFlow.org | در Google Colab اجرا شود | مشاهده منبع در GitHub | دانلود دفترچه یادداشت |
بررسی اجمالی
این آموزش نشان میدهد که چگونه tf.distribute.Strategy
میتواند برای آموزش چندکارهای توزیع شده با tf.estimator
استفاده شود. اگر کد خود را با استفاده از tf.estimator
می نویسید و علاقه مند به مقیاس گذاری فراتر از یک دستگاه با عملکرد بالا هستید، این آموزش برای شما مناسب است.
قبل از شروع، لطفا راهنمای استراتژی توزیع را بخوانید. آموزش آموزش چند GPU نیز مرتبط است، زیرا این آموزش از همان مدل استفاده می کند.
برپایی
ابتدا TensorFlow و واردات لازم را تنظیم کنید.
import tensorflow_datasets as tfds
import tensorflow as tf
import os, json
tf.compat.v1.disable_eager_execution()
تابع ورودی
این آموزش از مجموعه داده های MNIST از TensorFlow Datasets استفاده می کند. کد در اینجا شبیه آموزش آموزش چند GPU با یک تفاوت کلیدی است: هنگام استفاده از Estimator برای آموزش چند کارگری، برای اطمینان از همگرایی مدل، لازم است مجموعه داده بر اساس تعداد کارگران تقسیم شود. داده های ورودی با شاخص کارگر تقسیم می شوند، به طوری که هر کارگر 1/num_workers
بخش مجزا از مجموعه داده را پردازش می کند.
BUFFER_SIZE = 10000
BATCH_SIZE = 64
def input_fn(mode, input_context=None):
datasets, info = tfds.load(name='mnist',
with_info=True,
as_supervised=True)
mnist_dataset = (datasets['train'] if mode == tf.estimator.ModeKeys.TRAIN else
datasets['test'])
def scale(image, label):
image = tf.cast(image, tf.float32)
image /= 255
return image, label
if input_context:
mnist_dataset = mnist_dataset.shard(input_context.num_input_pipelines,
input_context.input_pipeline_id)
return mnist_dataset.map(scale).cache().shuffle(BUFFER_SIZE).batch(BATCH_SIZE)
یکی دیگر از رویکردهای معقول برای دستیابی به همگرایی این است که مجموعه داده را با دانه های متمایز در هر کارگر به هم بزنیم.
پیکربندی چند کارگری
یکی از تفاوت های کلیدی در این آموزش (در مقایسه با آموزش آموزش چند GPU ) راه اندازی چند کارگری است. متغیر محیطی TF_CONFIG
روش استاندارد برای تعیین پیکربندی خوشه برای هر کارگری است که بخشی از خوشه است.
دو جزء TF_CONFIG
وجود دارد: cluster
و task
. cluster
اطلاعاتی در مورد کل خوشه، یعنی کارگران و سرورهای پارامتر در خوشه ارائه می دهد. task
اطلاعاتی در مورد وظیفه فعلی ارائه می دهد. اولین cluster
مؤلفه برای همه کارگران و سرورهای پارامتر در خوشه یکسان است و task
مؤلفه دوم در هر کارگر و سرور پارامتر متفاوت است و type
و index
خود را مشخص می کند. در این مثال type
وظیفه worker
و index
وظیفه 0
است.
برای اهداف تصویری، این آموزش نحوه تنظیم یک TF_CONFIG
با 2 کارگر در localhost
هاست را نشان می دهد. در عمل، چندین کارگر را روی یک آدرس IP و پورت خارجی ایجاد میکنید و TF_CONFIG
را روی هر کارگر بهطور مناسب تنظیم میکنید، یعنی index
وظایف را تغییر میدهید.
os.environ['TF_CONFIG'] = json.dumps({
'cluster': {
'worker': ["localhost:12345", "localhost:23456"]
},
'task': {'type': 'worker', 'index': 0}
})
مدل را تعریف کنید
لایه ها، بهینه ساز و تابع ضرر را برای آموزش بنویسید. این آموزش مدل را با لایههای Keras تعریف میکند، شبیه به آموزش چند GPU .
LEARNING_RATE = 1e-4
def model_fn(features, labels, mode):
model = tf.keras.Sequential([
tf.keras.layers.Conv2D(32, 3, activation='relu', input_shape=(28, 28, 1)),
tf.keras.layers.MaxPooling2D(),
tf.keras.layers.Flatten(),
tf.keras.layers.Dense(64, activation='relu'),
tf.keras.layers.Dense(10)
])
logits = model(features, training=False)
if mode == tf.estimator.ModeKeys.PREDICT:
predictions = {'logits': logits}
return tf.estimator.EstimatorSpec(labels=labels, predictions=predictions)
optimizer = tf.compat.v1.train.GradientDescentOptimizer(
learning_rate=LEARNING_RATE)
loss = tf.keras.losses.SparseCategoricalCrossentropy(
from_logits=True, reduction=tf.keras.losses.Reduction.NONE)(labels, logits)
loss = tf.reduce_sum(loss) * (1. / BATCH_SIZE)
if mode == tf.estimator.ModeKeys.EVAL:
return tf.estimator.EstimatorSpec(mode, loss=loss)
return tf.estimator.EstimatorSpec(
mode=mode,
loss=loss,
train_op=optimizer.minimize(
loss, tf.compat.v1.train.get_or_create_global_step()))
MultiWorkerMirroredStrategy
برای آموزش مدل، از نمونه tf.distribute.experimental.MultiWorkerMirroredStrategy
استفاده کنید. MultiWorkerMirroredStrategy
کپیهایی از همه متغیرها در لایههای مدل در هر دستگاه در همه کارگران ایجاد میکند. از CollectiveOps
، یک عملیات TensorFlow برای ارتباطات جمعی، برای جمعآوری گرادیانها و همگام نگه داشتن متغیرها استفاده میکند. راهنمای tf.distribute.Strategy
جزئیات بیشتری در مورد این استراتژی دارد.
strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()
WARNING:tensorflow:From /tmp/ipykernel_7505/349189047.py:1: _CollectiveAllReduceStrategyExperimental.__init__ (from tensorflow.python.distribute.collective_all_reduce_strategy) is deprecated and will be removed in a future version. Instructions for updating: use distribute.MultiWorkerMirroredStrategy instead INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:GPU:0',), communication = CommunicationImplementation.AUTO
آموزش و ارزیابی مدل
سپس، استراتژی توزیع را در RunConfig
برای برآوردگر مشخص کنید و با فراخوانی tf.estimator.train_and_evaluate
آموزش و ارزیابی کنید. این آموزش فقط آموزش را با مشخص کردن استراتژی از طریق train_distribute
می کند. همچنین امکان توزیع ارزیابی از طریق eval_distribute
دارد.
config = tf.estimator.RunConfig(train_distribute=strategy)
classifier = tf.estimator.Estimator(
model_fn=model_fn, model_dir='/tmp/multiworker', config=config)
tf.estimator.train_and_evaluate(
classifier,
train_spec=tf.estimator.TrainSpec(input_fn=input_fn),
eval_spec=tf.estimator.EvalSpec(input_fn=input_fn)
)
INFO:tensorflow:Initializing RunConfig with distribution strategies. INFO:tensorflow:Not using Distribute Coordinator. INFO:tensorflow:Using config: {'_model_dir': '/tmp/multiworker', '_tf_random_seed': None, '_save_summary_steps': 100, '_save_checkpoints_steps': None, '_save_checkpoints_secs': 600, '_session_config': allow_soft_placement: true graph_options { rewrite_options { meta_optimizer_iterations: ONE } } , '_keep_checkpoint_max': 5, '_keep_checkpoint_every_n_hours': 10000, '_log_step_count_steps': 100, '_train_distribute': <tensorflow.python.distribute.collective_all_reduce_strategy._CollectiveAllReduceStrategyExperimental object at 0x7f3404234490>, '_device_fn': None, '_protocol': None, '_eval_distribute': None, '_experimental_distribute': None, '_experimental_max_worker_delay_secs': None, '_session_creation_timeout_secs': 7200, '_checkpoint_save_graph_def': True, '_service': None, '_cluster_spec': ClusterSpec({}), '_task_type': 'worker', '_task_id': 0, '_global_id_in_cluster': 0, '_master': '', '_evaluation_master': '', '_is_chief': True, '_num_ps_replicas': 0, '_num_worker_replicas': 1, '_distribute_coordinator_mode': None} INFO:tensorflow:Not using Distribute Coordinator. INFO:tensorflow:Running training and evaluation locally (non-distributed). INFO:tensorflow:Start train and evaluate loop. The evaluate will happen after every checkpoint. Checkpoint frequency is determined based on RunConfig arguments: save_checkpoints_steps None or save_checkpoints_secs 600. WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow_estimator/python/estimator/estimator.py:1244: StrategyBase.configure (from tensorflow.python.distribute.distribute_lib) is deprecated and will be removed in a future version. Instructions for updating: use `update_config_proto` instead. INFO:tensorflow:The `input_fn` accepts an `input_context` which will be given by DistributionStrategy INFO:tensorflow:Calling model_fn. /tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow/python/data/ops/dataset_ops.py:449: UserWarning: To make it possible to preserve tf.data options across serialization boundaries, their implementation has moved to be part of the TensorFlow graph. As a consequence, the options value is in general no longer known at graph construction time. Invoking this method in graph mode retains the legacy behavior of the original implementation, but note that the returned value might not reflect the actual value of the options. warnings.warn("To make it possible to preserve tf.data options across " INFO:tensorflow:Calling model_fn. INFO:tensorflow:Done calling model_fn. INFO:tensorflow:Done calling model_fn. INFO:tensorflow:Create CheckpointSaverHook. INFO:tensorflow:Create CheckpointSaverHook. WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow_estimator/python/estimator/util.py:95: DistributedIteratorV1.initialize (from tensorflow.python.distribute.v1.input_lib) is deprecated and will be removed in a future version. Instructions for updating: Use the iterator's `initializer` property instead. WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow_estimator/python/estimator/util.py:95: DistributedIteratorV1.initialize (from tensorflow.python.distribute.v1.input_lib) is deprecated and will be removed in a future version. Instructions for updating: Use the iterator's `initializer` property instead. INFO:tensorflow:Graph was finalized. INFO:tensorflow:Graph was finalized. INFO:tensorflow:Running local_init_op. INFO:tensorflow:Running local_init_op. INFO:tensorflow:Done running local_init_op. INFO:tensorflow:Done running local_init_op. INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 0... INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 0... INFO:tensorflow:Saving checkpoints for 0 into /tmp/multiworker/model.ckpt. INFO:tensorflow:Saving checkpoints for 0 into /tmp/multiworker/model.ckpt. INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 0... INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 0... 2022-01-26 05:29:43.503603: W tensorflow/core/grappler/utils/graph_view.cc:836] No registered 'MultiDeviceIteratorFromStringHandle' OpKernel for GPU devices compatible with node { {node MultiDeviceIteratorFromStringHandle} } . Registered: device='CPU' 2022-01-26 05:29:43.504873: W tensorflow/core/grappler/utils/graph_view.cc:836] No registered 'MultiDeviceIteratorGetNextFromShard' OpKernel for GPU devices compatible with node { {node MultiDeviceIteratorGetNextFromShard} } . Registered: device='CPU' INFO:tensorflow:loss = 2.292878, step = 0 INFO:tensorflow:loss = 2.292878, step = 0 INFO:tensorflow:global_step/sec: 173.275 INFO:tensorflow:global_step/sec: 173.275 INFO:tensorflow:loss = 2.29561, step = 100 (0.579 sec) INFO:tensorflow:loss = 2.29561, step = 100 (0.579 sec) INFO:tensorflow:global_step/sec: 189.057 INFO:tensorflow:global_step/sec: 189.057 INFO:tensorflow:loss = 2.2644367, step = 200 (0.529 sec) INFO:tensorflow:loss = 2.2644367, step = 200 (0.529 sec) INFO:tensorflow:global_step/sec: 193.075 INFO:tensorflow:global_step/sec: 193.075 INFO:tensorflow:loss = 2.2662685, step = 300 (0.517 sec) INFO:tensorflow:loss = 2.2662685, step = 300 (0.517 sec) INFO:tensorflow:global_step/sec: 199.957 INFO:tensorflow:global_step/sec: 199.957 INFO:tensorflow:loss = 2.2667098, step = 400 (0.500 sec) INFO:tensorflow:loss = 2.2667098, step = 400 (0.500 sec) INFO:tensorflow:global_step/sec: 204.217 INFO:tensorflow:global_step/sec: 204.217 INFO:tensorflow:loss = 2.251912, step = 500 (0.490 sec) INFO:tensorflow:loss = 2.251912, step = 500 (0.490 sec) INFO:tensorflow:global_step/sec: 201.747 INFO:tensorflow:global_step/sec: 201.747 INFO:tensorflow:loss = 2.2633677, step = 600 (0.496 sec) INFO:tensorflow:loss = 2.2633677, step = 600 (0.496 sec) INFO:tensorflow:global_step/sec: 206.079 INFO:tensorflow:global_step/sec: 206.079 INFO:tensorflow:loss = 2.2531767, step = 700 (0.485 sec) INFO:tensorflow:loss = 2.2531767, step = 700 (0.485 sec) INFO:tensorflow:global_step/sec: 231.299 INFO:tensorflow:global_step/sec: 231.299 INFO:tensorflow:loss = 2.2578738, step = 800 (0.433 sec) INFO:tensorflow:loss = 2.2578738, step = 800 (0.433 sec) INFO:tensorflow:global_step/sec: 657.044 INFO:tensorflow:global_step/sec: 657.044 INFO:tensorflow:loss = 2.2344787, step = 900 (0.150 sec) INFO:tensorflow:loss = 2.2344787, step = 900 (0.150 sec) INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 938... INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 938... INFO:tensorflow:Saving checkpoints for 938 into /tmp/multiworker/model.ckpt. INFO:tensorflow:Saving checkpoints for 938 into /tmp/multiworker/model.ckpt. INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 938... INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 938... INFO:tensorflow:Calling model_fn. INFO:tensorflow:Calling model_fn. INFO:tensorflow:Done calling model_fn. INFO:tensorflow:Done calling model_fn. INFO:tensorflow:Starting evaluation at 2022-01-26T05:29:56 INFO:tensorflow:Starting evaluation at 2022-01-26T05:29:56 INFO:tensorflow:Graph was finalized. INFO:tensorflow:Graph was finalized. INFO:tensorflow:Restoring parameters from /tmp/multiworker/model.ckpt-938 INFO:tensorflow:Restoring parameters from /tmp/multiworker/model.ckpt-938 INFO:tensorflow:Running local_init_op. INFO:tensorflow:Running local_init_op. INFO:tensorflow:Done running local_init_op. INFO:tensorflow:Done running local_init_op. INFO:tensorflow:Evaluation [10/100] INFO:tensorflow:Evaluation [10/100] INFO:tensorflow:Evaluation [20/100] INFO:tensorflow:Evaluation [20/100] INFO:tensorflow:Evaluation [30/100] INFO:tensorflow:Evaluation [30/100] INFO:tensorflow:Evaluation [40/100] INFO:tensorflow:Evaluation [40/100] INFO:tensorflow:Evaluation [50/100] INFO:tensorflow:Evaluation [50/100] INFO:tensorflow:Evaluation [60/100] INFO:tensorflow:Evaluation [60/100] INFO:tensorflow:Evaluation [70/100] INFO:tensorflow:Evaluation [70/100] INFO:tensorflow:Evaluation [80/100] INFO:tensorflow:Evaluation [80/100] INFO:tensorflow:Evaluation [90/100] INFO:tensorflow:Evaluation [90/100] INFO:tensorflow:Evaluation [100/100] INFO:tensorflow:Evaluation [100/100] INFO:tensorflow:Inference Time : 2.04637s INFO:tensorflow:Inference Time : 2.04637s INFO:tensorflow:Finished evaluation at 2022-01-26-05:29:58 INFO:tensorflow:Finished evaluation at 2022-01-26-05:29:58 INFO:tensorflow:Saving dict for global step 938: global_step = 938, loss = 2.234131 INFO:tensorflow:Saving dict for global step 938: global_step = 938, loss = 2.234131 INFO:tensorflow:Saving 'checkpoint_path' summary for global step 938: /tmp/multiworker/model.ckpt-938 INFO:tensorflow:Saving 'checkpoint_path' summary for global step 938: /tmp/multiworker/model.ckpt-938 INFO:tensorflow:Loss for final step: 1.10881. INFO:tensorflow:Loss for final step: 1.10881. ({'loss': 2.234131, 'global_step': 938}, [])
بهینه سازی عملکرد تمرین
شما اکنون یک مدل و یک برآوردگر با قابلیت چندکاره دارید که توسط tf.distribute.Strategy
طراحی شده است. برای بهینه سازی عملکرد آموزش چندکاره می توانید تکنیک های زیر را امتحان کنید:
- اندازه دسته را افزایش دهید: اندازه دسته مشخص شده در اینجا به ازای هر GPU است. به طور کلی، بزرگ ترین اندازه دسته ای که متناسب با حافظه GPU باشد توصیه می شود.
- متغیرهای Cast: در صورت امکان متغیرها را به
tf.float
. مدل رسمی ResNet شامل مثالی از نحوه انجام این کار است. از ارتباطات جمعی استفاده کنید:
MultiWorkerMirroredStrategy
چندین پیاده سازی ارتباط جمعی را ارائه می دهد.-
RING
مجموعه های مبتنی بر حلقه را با استفاده از gRPC به عنوان لایه ارتباطی متقابل میزبان پیاده سازی می کند. -
NCCL
از NCCL انویدیا برای پیاده سازی جمع ها استفاده می کند. -
AUTO
انتخاب را به زمان اجرا موکول می کند.
بهترین انتخاب پیاده سازی جمعی به تعداد و نوع GPUها و اتصال شبکه در خوشه بستگی دارد. برای نادیده گرفتن انتخاب خودکار، یک مقدار معتبر برای پارامتر
communication
سازندهMultiWorkerMirroredStrategy
، به عنوان مثالcommunication=tf.distribute.experimental.CollectiveCommunication.NCCL
کنید.-
برای کسب اطلاعات بیشتر در مورد سایر استراتژی ها و ابزارهایی که می توانید برای بهینه سازی عملکرد مدل های TensorFlow خود از آنها استفاده کنید، از بخش عملکرد در راهنما دیدن کنید.
نمونه کدهای دیگر
- مثال پایانی برای آموزش چند کارگری در تنسورفلو/اکوسیستم با استفاده از الگوهای Kubernetes. این مثال با یک مدل Keras شروع می شود و با استفاده از
tf.keras.estimator.model_to_estimator
API آن را به یک برآوردگر تبدیل می کند. - مدلهای رسمی ، که بسیاری از آنها را میتوان برای اجرای چندین استراتژی توزیع پیکربندی کرد.