آموزش چند کارگری با استیماتور

مشاهده در TensorFlow.org در Google Colab اجرا شود مشاهده منبع در GitHub دانلود دفترچه یادداشت

بررسی اجمالی

این آموزش نشان می‌دهد که چگونه tf.distribute.Strategy می‌تواند برای آموزش چندکاره‌ای توزیع شده با tf.estimator استفاده شود. اگر کد خود را با استفاده از tf.estimator می نویسید و علاقه مند به مقیاس گذاری فراتر از یک دستگاه با عملکرد بالا هستید، این آموزش برای شما مناسب است.

قبل از شروع، لطفا راهنمای استراتژی توزیع را بخوانید. آموزش آموزش چند GPU نیز مرتبط است، زیرا این آموزش از همان مدل استفاده می کند.

برپایی

ابتدا TensorFlow و واردات لازم را تنظیم کنید.

import tensorflow_datasets as tfds
import tensorflow as tf

import os, json
tf.compat.v1.disable_eager_execution()

تابع ورودی

این آموزش از مجموعه داده های MNIST از TensorFlow Datasets استفاده می کند. کد در اینجا شبیه آموزش آموزش چند GPU با یک تفاوت کلیدی است: هنگام استفاده از Estimator برای آموزش چند کارگری، برای اطمینان از همگرایی مدل، لازم است مجموعه داده بر اساس تعداد کارگران تقسیم شود. داده های ورودی با شاخص کارگر تقسیم می شوند، به طوری که هر کارگر 1/num_workers بخش مجزا از مجموعه داده را پردازش می کند.

BUFFER_SIZE = 10000
BATCH_SIZE = 64

def input_fn(mode, input_context=None):
  datasets, info = tfds.load(name='mnist',
                                with_info=True,
                                as_supervised=True)
  mnist_dataset = (datasets['train'] if mode == tf.estimator.ModeKeys.TRAIN else
                   datasets['test'])

  def scale(image, label):
    image = tf.cast(image, tf.float32)
    image /= 255
    return image, label

  if input_context:
    mnist_dataset = mnist_dataset.shard(input_context.num_input_pipelines,
                                        input_context.input_pipeline_id)
  return mnist_dataset.map(scale).cache().shuffle(BUFFER_SIZE).batch(BATCH_SIZE)

یکی دیگر از رویکردهای معقول برای دستیابی به همگرایی این است که مجموعه داده را با دانه های متمایز در هر کارگر به هم بزنیم.

پیکربندی چند کارگری

یکی از تفاوت های کلیدی در این آموزش (در مقایسه با آموزش آموزش چند GPU ) راه اندازی چند کارگری است. متغیر محیطی TF_CONFIG روش استاندارد برای تعیین پیکربندی خوشه برای هر کارگری است که بخشی از خوشه است.

دو جزء TF_CONFIG وجود دارد: cluster و task . cluster اطلاعاتی در مورد کل خوشه، یعنی کارگران و سرورهای پارامتر در خوشه ارائه می دهد. task اطلاعاتی در مورد وظیفه فعلی ارائه می دهد. اولین cluster مؤلفه برای همه کارگران و سرورهای پارامتر در خوشه یکسان است و task مؤلفه دوم در هر کارگر و سرور پارامتر متفاوت است و type و index خود را مشخص می کند. در این مثال type وظیفه worker و index وظیفه 0 است.

برای اهداف تصویری، این آموزش نحوه تنظیم یک TF_CONFIG با 2 کارگر در localhost هاست را نشان می دهد. در عمل، چندین کارگر را روی یک آدرس IP و پورت خارجی ایجاد می‌کنید و TF_CONFIG را روی هر کارگر به‌طور مناسب تنظیم می‌کنید، یعنی index وظایف را تغییر می‌دهید.

os.environ['TF_CONFIG'] = json.dumps({
    'cluster': {
        'worker': ["localhost:12345", "localhost:23456"]
    },
    'task': {'type': 'worker', 'index': 0}
})

مدل را تعریف کنید

لایه ها، بهینه ساز و تابع ضرر را برای آموزش بنویسید. این آموزش مدل را با لایه‌های Keras تعریف می‌کند، شبیه به آموزش چند GPU .

LEARNING_RATE = 1e-4
def model_fn(features, labels, mode):
  model = tf.keras.Sequential([
      tf.keras.layers.Conv2D(32, 3, activation='relu', input_shape=(28, 28, 1)),
      tf.keras.layers.MaxPooling2D(),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(64, activation='relu'),
      tf.keras.layers.Dense(10)
  ])
  logits = model(features, training=False)

  if mode == tf.estimator.ModeKeys.PREDICT:
    predictions = {'logits': logits}
    return tf.estimator.EstimatorSpec(labels=labels, predictions=predictions)

  optimizer = tf.compat.v1.train.GradientDescentOptimizer(
      learning_rate=LEARNING_RATE)
  loss = tf.keras.losses.SparseCategoricalCrossentropy(
      from_logits=True, reduction=tf.keras.losses.Reduction.NONE)(labels, logits)
  loss = tf.reduce_sum(loss) * (1. / BATCH_SIZE)
  if mode == tf.estimator.ModeKeys.EVAL:
    return tf.estimator.EstimatorSpec(mode, loss=loss)

  return tf.estimator.EstimatorSpec(
      mode=mode,
      loss=loss,
      train_op=optimizer.minimize(
          loss, tf.compat.v1.train.get_or_create_global_step()))

MultiWorkerMirroredStrategy

برای آموزش مدل، از نمونه tf.distribute.experimental.MultiWorkerMirroredStrategy استفاده کنید. MultiWorkerMirroredStrategy کپی‌هایی از همه متغیرها در لایه‌های مدل در هر دستگاه در همه کارگران ایجاد می‌کند. از CollectiveOps ، یک عملیات TensorFlow برای ارتباطات جمعی، برای جمع‌آوری گرادیان‌ها و همگام نگه داشتن متغیرها استفاده می‌کند. راهنمای tf.distribute.Strategy جزئیات بیشتری در مورد این استراتژی دارد.

strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()
WARNING:tensorflow:From /tmp/ipykernel_7505/349189047.py:1: _CollectiveAllReduceStrategyExperimental.__init__ (from tensorflow.python.distribute.collective_all_reduce_strategy) is deprecated and will be removed in a future version.
Instructions for updating:
use distribute.MultiWorkerMirroredStrategy instead
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:GPU:0',), communication = CommunicationImplementation.AUTO

آموزش و ارزیابی مدل

سپس، استراتژی توزیع را در RunConfig برای برآوردگر مشخص کنید و با فراخوانی tf.estimator.train_and_evaluate آموزش و ارزیابی کنید. این آموزش فقط آموزش را با مشخص کردن استراتژی از طریق train_distribute می کند. همچنین امکان توزیع ارزیابی از طریق eval_distribute دارد.

config = tf.estimator.RunConfig(train_distribute=strategy)

classifier = tf.estimator.Estimator(
    model_fn=model_fn, model_dir='/tmp/multiworker', config=config)
tf.estimator.train_and_evaluate(
    classifier,
    train_spec=tf.estimator.TrainSpec(input_fn=input_fn),
    eval_spec=tf.estimator.EvalSpec(input_fn=input_fn)
)
INFO:tensorflow:Initializing RunConfig with distribution strategies.
INFO:tensorflow:Not using Distribute Coordinator.
INFO:tensorflow:Using config: {'_model_dir': '/tmp/multiworker', '_tf_random_seed': None, '_save_summary_steps': 100, '_save_checkpoints_steps': None, '_save_checkpoints_secs': 600, '_session_config': allow_soft_placement: true
graph_options {
  rewrite_options {
    meta_optimizer_iterations: ONE
  }
}
, '_keep_checkpoint_max': 5, '_keep_checkpoint_every_n_hours': 10000, '_log_step_count_steps': 100, '_train_distribute': <tensorflow.python.distribute.collective_all_reduce_strategy._CollectiveAllReduceStrategyExperimental object at 0x7f3404234490>, '_device_fn': None, '_protocol': None, '_eval_distribute': None, '_experimental_distribute': None, '_experimental_max_worker_delay_secs': None, '_session_creation_timeout_secs': 7200, '_checkpoint_save_graph_def': True, '_service': None, '_cluster_spec': ClusterSpec({}), '_task_type': 'worker', '_task_id': 0, '_global_id_in_cluster': 0, '_master': '', '_evaluation_master': '', '_is_chief': True, '_num_ps_replicas': 0, '_num_worker_replicas': 1, '_distribute_coordinator_mode': None}
INFO:tensorflow:Not using Distribute Coordinator.
INFO:tensorflow:Running training and evaluation locally (non-distributed).
INFO:tensorflow:Start train and evaluate loop. The evaluate will happen after every checkpoint. Checkpoint frequency is determined based on RunConfig arguments: save_checkpoints_steps None or save_checkpoints_secs 600.
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow_estimator/python/estimator/estimator.py:1244: StrategyBase.configure (from tensorflow.python.distribute.distribute_lib) is deprecated and will be removed in a future version.
Instructions for updating:
use `update_config_proto` instead.
INFO:tensorflow:The `input_fn` accepts an `input_context` which will be given by DistributionStrategy
INFO:tensorflow:Calling model_fn.
/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow/python/data/ops/dataset_ops.py:449: UserWarning: To make it possible to preserve tf.data options across serialization boundaries, their implementation has moved to be part of the TensorFlow graph. As a consequence, the options value is in general no longer known at graph construction time. Invoking this method in graph mode retains the legacy behavior of the original implementation, but note that the returned value might not reflect the actual value of the options.
  warnings.warn("To make it possible to preserve tf.data options across "
INFO:tensorflow:Calling model_fn.
INFO:tensorflow:Done calling model_fn.
INFO:tensorflow:Done calling model_fn.
INFO:tensorflow:Create CheckpointSaverHook.
INFO:tensorflow:Create CheckpointSaverHook.
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow_estimator/python/estimator/util.py:95: DistributedIteratorV1.initialize (from tensorflow.python.distribute.v1.input_lib) is deprecated and will be removed in a future version.
Instructions for updating:
Use the iterator's `initializer` property instead.
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow_estimator/python/estimator/util.py:95: DistributedIteratorV1.initialize (from tensorflow.python.distribute.v1.input_lib) is deprecated and will be removed in a future version.
Instructions for updating:
Use the iterator's `initializer` property instead.
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Done running local_init_op.
INFO:tensorflow:Done running local_init_op.
INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 0...
INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 0...
INFO:tensorflow:Saving checkpoints for 0 into /tmp/multiworker/model.ckpt.
INFO:tensorflow:Saving checkpoints for 0 into /tmp/multiworker/model.ckpt.
INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 0...
INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 0...
2022-01-26 05:29:43.503603: W tensorflow/core/grappler/utils/graph_view.cc:836] No registered 'MultiDeviceIteratorFromStringHandle' OpKernel for GPU devices compatible with node { {node MultiDeviceIteratorFromStringHandle} }
    .  Registered:  device='CPU'

2022-01-26 05:29:43.504873: W tensorflow/core/grappler/utils/graph_view.cc:836] No registered 'MultiDeviceIteratorGetNextFromShard' OpKernel for GPU devices compatible with node { {node MultiDeviceIteratorGetNextFromShard} }
    .  Registered:  device='CPU'
INFO:tensorflow:loss = 2.292878, step = 0
INFO:tensorflow:loss = 2.292878, step = 0
INFO:tensorflow:global_step/sec: 173.275
INFO:tensorflow:global_step/sec: 173.275
INFO:tensorflow:loss = 2.29561, step = 100 (0.579 sec)
INFO:tensorflow:loss = 2.29561, step = 100 (0.579 sec)
INFO:tensorflow:global_step/sec: 189.057
INFO:tensorflow:global_step/sec: 189.057
INFO:tensorflow:loss = 2.2644367, step = 200 (0.529 sec)
INFO:tensorflow:loss = 2.2644367, step = 200 (0.529 sec)
INFO:tensorflow:global_step/sec: 193.075
INFO:tensorflow:global_step/sec: 193.075
INFO:tensorflow:loss = 2.2662685, step = 300 (0.517 sec)
INFO:tensorflow:loss = 2.2662685, step = 300 (0.517 sec)
INFO:tensorflow:global_step/sec: 199.957
INFO:tensorflow:global_step/sec: 199.957
INFO:tensorflow:loss = 2.2667098, step = 400 (0.500 sec)
INFO:tensorflow:loss = 2.2667098, step = 400 (0.500 sec)
INFO:tensorflow:global_step/sec: 204.217
INFO:tensorflow:global_step/sec: 204.217
INFO:tensorflow:loss = 2.251912, step = 500 (0.490 sec)
INFO:tensorflow:loss = 2.251912, step = 500 (0.490 sec)
INFO:tensorflow:global_step/sec: 201.747
INFO:tensorflow:global_step/sec: 201.747
INFO:tensorflow:loss = 2.2633677, step = 600 (0.496 sec)
INFO:tensorflow:loss = 2.2633677, step = 600 (0.496 sec)
INFO:tensorflow:global_step/sec: 206.079
INFO:tensorflow:global_step/sec: 206.079
INFO:tensorflow:loss = 2.2531767, step = 700 (0.485 sec)
INFO:tensorflow:loss = 2.2531767, step = 700 (0.485 sec)
INFO:tensorflow:global_step/sec: 231.299
INFO:tensorflow:global_step/sec: 231.299
INFO:tensorflow:loss = 2.2578738, step = 800 (0.433 sec)
INFO:tensorflow:loss = 2.2578738, step = 800 (0.433 sec)
INFO:tensorflow:global_step/sec: 657.044
INFO:tensorflow:global_step/sec: 657.044
INFO:tensorflow:loss = 2.2344787, step = 900 (0.150 sec)
INFO:tensorflow:loss = 2.2344787, step = 900 (0.150 sec)
INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 938...
INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 938...
INFO:tensorflow:Saving checkpoints for 938 into /tmp/multiworker/model.ckpt.
INFO:tensorflow:Saving checkpoints for 938 into /tmp/multiworker/model.ckpt.
INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 938...
INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 938...
INFO:tensorflow:Calling model_fn.
INFO:tensorflow:Calling model_fn.
INFO:tensorflow:Done calling model_fn.
INFO:tensorflow:Done calling model_fn.
INFO:tensorflow:Starting evaluation at 2022-01-26T05:29:56
INFO:tensorflow:Starting evaluation at 2022-01-26T05:29:56
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:Restoring parameters from /tmp/multiworker/model.ckpt-938
INFO:tensorflow:Restoring parameters from /tmp/multiworker/model.ckpt-938
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Done running local_init_op.
INFO:tensorflow:Done running local_init_op.
INFO:tensorflow:Evaluation [10/100]
INFO:tensorflow:Evaluation [10/100]
INFO:tensorflow:Evaluation [20/100]
INFO:tensorflow:Evaluation [20/100]
INFO:tensorflow:Evaluation [30/100]
INFO:tensorflow:Evaluation [30/100]
INFO:tensorflow:Evaluation [40/100]
INFO:tensorflow:Evaluation [40/100]
INFO:tensorflow:Evaluation [50/100]
INFO:tensorflow:Evaluation [50/100]
INFO:tensorflow:Evaluation [60/100]
INFO:tensorflow:Evaluation [60/100]
INFO:tensorflow:Evaluation [70/100]
INFO:tensorflow:Evaluation [70/100]
INFO:tensorflow:Evaluation [80/100]
INFO:tensorflow:Evaluation [80/100]
INFO:tensorflow:Evaluation [90/100]
INFO:tensorflow:Evaluation [90/100]
INFO:tensorflow:Evaluation [100/100]
INFO:tensorflow:Evaluation [100/100]
INFO:tensorflow:Inference Time : 2.04637s
INFO:tensorflow:Inference Time : 2.04637s
INFO:tensorflow:Finished evaluation at 2022-01-26-05:29:58
INFO:tensorflow:Finished evaluation at 2022-01-26-05:29:58
INFO:tensorflow:Saving dict for global step 938: global_step = 938, loss = 2.234131
INFO:tensorflow:Saving dict for global step 938: global_step = 938, loss = 2.234131
INFO:tensorflow:Saving 'checkpoint_path' summary for global step 938: /tmp/multiworker/model.ckpt-938
INFO:tensorflow:Saving 'checkpoint_path' summary for global step 938: /tmp/multiworker/model.ckpt-938
INFO:tensorflow:Loss for final step: 1.10881.
INFO:tensorflow:Loss for final step: 1.10881.
({'loss': 2.234131, 'global_step': 938}, [])

بهینه سازی عملکرد تمرین

شما اکنون یک مدل و یک برآوردگر با قابلیت چندکاره دارید که توسط tf.distribute.Strategy طراحی شده است. برای بهینه سازی عملکرد آموزش چندکاره می توانید تکنیک های زیر را امتحان کنید:

  • اندازه دسته را افزایش دهید: اندازه دسته مشخص شده در اینجا به ازای هر GPU است. به طور کلی، بزرگ ترین اندازه دسته ای که متناسب با حافظه GPU باشد توصیه می شود.
  • متغیرهای Cast: در صورت امکان متغیرها را به tf.float . مدل رسمی ResNet شامل مثالی از نحوه انجام این کار است.
  • از ارتباطات جمعی استفاده کنید: MultiWorkerMirroredStrategy چندین پیاده سازی ارتباط جمعی را ارائه می دهد.

    • RING مجموعه های مبتنی بر حلقه را با استفاده از gRPC به عنوان لایه ارتباطی متقابل میزبان پیاده سازی می کند.
    • NCCL از NCCL انویدیا برای پیاده سازی جمع ها استفاده می کند.
    • AUTO انتخاب را به زمان اجرا موکول می کند.

    بهترین انتخاب پیاده سازی جمعی به تعداد و نوع GPUها و اتصال شبکه در خوشه بستگی دارد. برای نادیده گرفتن انتخاب خودکار، یک مقدار معتبر برای پارامتر communication سازنده MultiWorkerMirroredStrategy ، به عنوان مثال communication=tf.distribute.experimental.CollectiveCommunication.NCCL کنید.

برای کسب اطلاعات بیشتر در مورد سایر استراتژی ها و ابزارهایی که می توانید برای بهینه سازی عملکرد مدل های TensorFlow خود از آنها استفاده کنید، از بخش عملکرد در راهنما دیدن کنید.

نمونه کدهای دیگر

  1. مثال پایانی برای آموزش چند کارگری در تنسورفلو/اکوسیستم با استفاده از الگوهای Kubernetes. این مثال با یک مدل Keras شروع می شود و با استفاده از tf.keras.estimator.model_to_estimator API آن را به یک برآوردگر تبدیل می کند.
  2. مدل‌های رسمی ، که بسیاری از آنها را می‌توان برای اجرای چندین استراتژی توزیع پیکربندی کرد.