حلقة تدريب مخصصة مع Keras و MultiWorkerMirroredStrategy

عرض على TensorFlow.org تشغيل في Google Colab عرض المصدر على جيثب تحميل دفتر

ملخص

يوضح هذا البرنامج التعليمي تدريبًا متعدد العمال باستخدام حلقة تدريب مخصصة API ، موزعة عبر MultiWorkerMirroredStrategy ، لذلك يمكن لنموذج Keras المصمم للتشغيل على عامل واحد أن يعمل بسلاسة على العديد من العمال مع الحد الأدنى من تغيير الرمز.

نحن نستخدم حلقات تدريب مخصصة لتدريب نموذجنا لأنها تمنحنا مرونة وتحكمًا أكبر في التدريب. علاوة على ذلك ، من الأسهل تصحيح أخطاء النموذج وحلقة التدريب. يتوفر المزيد من المعلومات التفصيلية في كتابة حلقة تدريبية من البداية .

إذا كنت تبحث عن كيفية استخدام MultiWorkerMirroredStrategy مع model.fit ، فراجع هذا البرنامج التعليمي بدلاً من ذلك.

يتوفر التدريب الموزع في دليل TensorFlow للحصول على نظرة عامة على استراتيجيات التوزيع التي يدعمها TensorFlow للمهتمين بفهم أعمق لـ tf.distribute.Strategy APIs.

يثبت

أولا ، بعض الواردات الضرورية.

import json
import os
import sys

قبل استيراد TensorFlow ، قم بإجراء بعض التغييرات على البيئة.

قم بتعطيل كافة وحدات معالجة الرسومات. هذا يمنع الأخطاء التي يسببها جميع العمال الذين يحاولون استخدام نفس GPU. للحصول على تطبيق حقيقي ، سيكون كل عامل على جهاز مختلف.

os.environ["CUDA_VISIBLE_DEVICES"] = "-1"

أعد تعيين متغير البيئة TF_CONFIG ، وسترى المزيد حول هذا لاحقًا.

os.environ.pop('TF_CONFIG', None)

تأكد من أن الدليل الحالي على مسار بايثون. يسمح هذا للكمبيوتر الدفتري باستيراد الملفات المكتوبة بواسطة %%writefile لاحقًا.

if '.' not in sys.path:
  sys.path.insert(0, '.')

الآن قم باستيراد TensorFlow.

import tensorflow as tf

تعريف نموذج ومجموعة البيانات

بعد ذلك ، قم بإنشاء ملف mnist.py بنموذج بسيط وإعداد مجموعة بيانات. سيتم استخدام ملف python بواسطة العمليات العاملة في هذا البرنامج التعليمي:

%%writefile mnist.py

import os
import tensorflow as tf
import numpy as np

def mnist_dataset(batch_size):
  (x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
  # The `x` arrays are in uint8 and have values in the range [0, 255].
  # You need to convert them to float32 with values in the range [0, 1]
  x_train = x_train / np.float32(255)
  y_train = y_train.astype(np.int64)
  train_dataset = tf.data.Dataset.from_tensor_slices(
      (x_train, y_train)).shuffle(60000)
  return train_dataset

def dataset_fn(global_batch_size, input_context):
  batch_size = input_context.get_per_replica_batch_size(global_batch_size)
  dataset = mnist_dataset(batch_size)
  dataset = dataset.shard(input_context.num_input_pipelines,
                          input_context.input_pipeline_id)
  dataset = dataset.batch(batch_size)
  return dataset

def build_cnn_model():
  return tf.keras.Sequential([
      tf.keras.Input(shape=(28, 28)),
      tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
      tf.keras.layers.Conv2D(32, 3, activation='relu'),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(128, activation='relu'),
      tf.keras.layers.Dense(10)
  ])
Writing mnist.py

تكوين متعدد العمال

الآن دعنا ندخل عالم التدريب متعدد العمال. في TensorFlow ، مطلوب متغير بيئة TF_CONFIG للتدريب على أجهزة متعددة ، لكل منها دور مختلف. TF_CONFIG المستخدم أدناه ، عبارة عن سلسلة JSON تستخدم لتحديد تكوين الكتلة على كل عامل يمثل جزءًا من الكتلة. هذه هي الطريقة الافتراضية لتحديد العنقود ، باستخدام cluster_resolver.TFConfigClusterResolver ، ولكن هناك خيارات أخرى متاحة في وحدة distribute.cluster_resolver .

صف مجموعتك

فيما يلي مثال على التكوين:

tf_config = {
    'cluster': {
        'worker': ['localhost:12345', 'localhost:23456']
    },
    'task': {'type': 'worker', 'index': 0}
}

إليك نفس TF_CONFIG المتسلسلة كسلسلة JSON:

json.dumps(tf_config)
'{"cluster": {"worker": ["localhost:12345", "localhost:23456"]}, "task": {"type": "worker", "index": 0} }'

هناك نوعان من مكونات TF_CONFIG : cluster task .

  • cluster هي نفسها لجميع العمال وتوفر معلومات حول مجموعة التدريب ، والتي تتكون من أنواع مختلفة من الوظائف مثل worker . في التدريب متعدد العمال باستخدام MultiWorkerMirroredStrategy ، عادة ما يكون هناك worker واحد يتحمل مسؤولية أكبر قليلاً مثل حفظ نقطة التفتيش وكتابة ملف ملخص لـ TensorBoard بالإضافة إلى ما يفعله worker العادي. يُشار إلى هذا العامل على أنه chief العمال ، ومن المعتاد أن يتم تعيين worker ذي index 0 باعتباره worker الرئيسي (في الواقع هذه هي الطريقة tf.distribute.Strategy ).

  • task توفر معلومات عن المهمة الحالية وتختلف على كل عامل. تحدد type index ذلك العامل.

في هذا المثال ، تقوم بتعيين type المهمة إلى "worker" index المهام على 0 . هذه الآلة هي العامل الأول وسيتم تعيينه كرئيس للعمال ويقوم بعمل أكثر من الآخرين. لاحظ أن الأجهزة الأخرى ستحتاج إلى مجموعة متغير البيئة TF_CONFIG أيضًا ، ويجب أن يكون لها نفس cluster الإملاء ، ولكن type مهمة أو index مهام مختلف اعتمادًا على أدوار تلك الأجهزة.

لأغراض التوضيح ، يوضح هذا البرنامج التعليمي كيف يمكن للمرء تعيين TF_CONFIG مع عاملين على localhost المحلي. من الناحية العملية ، يمكن للمستخدمين إنشاء عدة عمال على عناوين / منافذ IP خارجية ، وتعيين TF_CONFIG على كل عامل بشكل مناسب.

في هذا المثال سوف تستخدم عاملين ، يظهر TF_CONFIG العامل الأول أعلاه. بالنسبة للعامل الثاني ، يمكنك تعيين tf_config['task']['index']=1

أعلاه ، tf_config هو مجرد متغير محلي في بيثون. لاستخدامه بالفعل لتكوين التدريب ، يحتاج هذا القاموس إلى التسلسل كـ JSON ، ووضعه في متغير البيئة TF_CONFIG .

متغيرات البيئة والعمليات الفرعية في أجهزة الكمبيوتر المحمولة

ترث العمليات الفرعية متغيرات البيئة من الوالدين. لذلك إذا قمت بتعيين متغير بيئة في عملية jupyter notebook هذه:

os.environ['GREETINGS'] = 'Hello TensorFlow!'

يمكنك الوصول إلى متغير البيئة من عمليات فرعية:

echo ${GREETINGS}
Hello TensorFlow!

في القسم التالي ، ستستخدم هذا لتمرير TF_CONFIG إلى العمليات الفرعية للعمال. لن تقوم مطلقًا بإطلاق وظائفك بهذه الطريقة ، لكنها كافية لأغراض هذا البرنامج التعليمي: لإثبات مثال الحد الأدنى من تعدد العاملين.

MultiWorkerMirroredStrategy

لتدريب النموذج ، استخدم مثيلاً من tf.distribute.MultiWorkerMirroredStrategy ، والذي يقوم بإنشاء نسخ من جميع المتغيرات في طبقات النموذج على كل جهاز عبر جميع العاملين. يحتوي دليل tf.distribute.Strategy على مزيد من التفاصيل حول هذه الإستراتيجية.

strategy = tf.distribute.MultiWorkerMirroredStrategy()
2022-01-26 05:35:39.353025: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:CPU:0',), communication = CommunicationImplementation.AUTO
2022-01-26 05:35:39.353298: E tensorflow/stream_executor/cuda/cuda_diagnostics.cc:313] kernel version 470.63.1 does not match DSO version 470.86.0 -- cannot find working devices in this configuration

استخدم tf.distribute.Strategy.scope لتحديد وجوب استخدام إستراتيجية عند بناء نموذجك. يضعك هذا في " سياق النسخ المتماثل " لهذه الإستراتيجية ، مما يعني أن الإستراتيجية تتحكم في أشياء مثل الموضع المتغير.

import mnist
with strategy.scope():
  # Model building needs to be within `strategy.scope()`.
  multi_worker_model = mnist.build_cnn_model()

تقاسم بياناتك تلقائيًا بين العاملين

في التدريب متعدد العمال ، لا تكون هناك حاجة بالضرورة إلى تجزئة مجموعة البيانات ، ولكنها تمنحك دلالات مرة واحدة بالضبط مما يجعل المزيد من التدريب أكثر قابلية للتكرار ، أي أن التدريب على عدة عمال يجب أن يكون هو نفسه التدريب على عامل واحد. ملاحظة: قد يتأثر الأداء في بعض الحالات.

انظر: distribute_datasets_from_function

per_worker_batch_size = 64
num_workers = len(tf_config['cluster']['worker'])
global_batch_size = per_worker_batch_size * num_workers

with strategy.scope():
  multi_worker_dataset = strategy.distribute_datasets_from_function(
      lambda input_context: mnist.dataset_fn(global_batch_size, input_context))

تحديد حلقة تدريب مخصصة وتدريب النموذج

حدد مُحسِّنًا

with strategy.scope():
  # The creation of optimizer and train_accuracy will need to be in
  # `strategy.scope()` as well, since they create variables.
  optimizer = tf.keras.optimizers.RMSprop(learning_rate=0.001)
  train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
      name='train_accuracy')

حدد خطوة تدريب tf.function

@tf.function
def train_step(iterator):
  """Training step function."""

  def step_fn(inputs):
    """Per-Replica step function."""
    x, y = inputs
    with tf.GradientTape() as tape:
      predictions = multi_worker_model(x, training=True)
      per_batch_loss = tf.keras.losses.SparseCategoricalCrossentropy(
          from_logits=True,
          reduction=tf.keras.losses.Reduction.NONE)(y, predictions)
      loss = tf.nn.compute_average_loss(
          per_batch_loss, global_batch_size=global_batch_size)

    grads = tape.gradient(loss, multi_worker_model.trainable_variables)
    optimizer.apply_gradients(
        zip(grads, multi_worker_model.trainable_variables))
    train_accuracy.update_state(y, predictions)
    return loss

  per_replica_losses = strategy.run(step_fn, args=(next(iterator),))
  return strategy.reduce(
      tf.distribute.ReduceOp.SUM, per_replica_losses, axis=None)

حفظ واستعادة نقطة التفتيش

يتطلب تنفيذ Checkpointing في حلقة تدريب مخصصة أن يتعامل معها المستخدم بدلاً من استخدام رد اتصال keras. يسمح لك بحفظ أوزان النموذج واستعادتها دون الحاجة إلى حفظ النموذج بالكامل.

from multiprocessing import util
checkpoint_dir = os.path.join(util.get_temp_dir(), 'ckpt')

def _is_chief(task_type, task_id, cluster_spec):
  return (task_type is None
          or task_type == 'chief'
          or (task_type == 'worker'
              and task_id == 0
              and "chief" not in cluster_spec.as_dict()))

def _get_temp_dir(dirpath, task_id):
  base_dirpath = 'workertemp_' + str(task_id)
  temp_dir = os.path.join(dirpath, base_dirpath)
  tf.io.gfile.makedirs(temp_dir)
  return temp_dir

def write_filepath(filepath, task_type, task_id, cluster_spec):
  dirpath = os.path.dirname(filepath)
  base = os.path.basename(filepath)
  if not _is_chief(task_type, task_id, cluster_spec):
    dirpath = _get_temp_dir(dirpath, task_id)
  return os.path.join(dirpath, base)

هنا ، ستقوم بإنشاء tf.train.Checkpoint واحد الذي يتتبع النموذج ، والذي تتم إدارته بواسطة tf.train.CheckpointManager بحيث يتم الاحتفاظ بأحدث نقطة فحص فقط.

epoch = tf.Variable(
    initial_value=tf.constant(0, dtype=tf.dtypes.int64), name='epoch')
step_in_epoch = tf.Variable(
    initial_value=tf.constant(0, dtype=tf.dtypes.int64),
    name='step_in_epoch')
task_type, task_id = (strategy.cluster_resolver.task_type,
                      strategy.cluster_resolver.task_id)
# We normally don't need to manually instantiate a ClusterSpec, but in this 
# illustrative example we did not set TF_CONFIG before initializing the
# strategy. See the next section for "real-world" usage.
cluster_spec = tf.train.ClusterSpec(tf_config['cluster'])

checkpoint = tf.train.Checkpoint(
    model=multi_worker_model, epoch=epoch, step_in_epoch=step_in_epoch)

write_checkpoint_dir = write_filepath(checkpoint_dir, task_type, task_id,
                                      cluster_spec)
checkpoint_manager = tf.train.CheckpointManager(
    checkpoint, directory=write_checkpoint_dir, max_to_keep=1)

الآن ، عندما تحتاج إلى الاستعادة ، يمكنك العثور على أحدث نقطة تفتيش محفوظة باستخدام وظيفة tf.train.latest_checkpoint الملائمة.

latest_checkpoint = tf.train.latest_checkpoint(checkpoint_dir)
if latest_checkpoint:
  checkpoint.restore(latest_checkpoint)

بعد استعادة نقطة التفتيش ، يمكنك الاستمرار في تدريب حلقة التدريب المخصصة الخاصة بك.

num_epochs = 3
num_steps_per_epoch = 70

while epoch.numpy() < num_epochs:
  iterator = iter(multi_worker_dataset)
  total_loss = 0.0
  num_batches = 0

  while step_in_epoch.numpy() < num_steps_per_epoch:
    total_loss += train_step(iterator)
    num_batches += 1
    step_in_epoch.assign_add(1)

  train_loss = total_loss / num_batches
  print('Epoch: %d, accuracy: %f, train_loss: %f.'
                %(epoch.numpy(), train_accuracy.result(), train_loss))

  train_accuracy.reset_states()

  # Once the `CheckpointManager` is set up, you're now ready to save, and remove
  # the checkpoints non-chief workers saved.
  checkpoint_manager.save()
  if not _is_chief(task_type, task_id, cluster_spec):
    tf.io.gfile.rmtree(write_checkpoint_dir)

  epoch.assign_add(1)
  step_in_epoch.assign(0)
2022-01-26 05:35:40.200068: W tensorflow/core/framework/dataset.cc:768] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.
Epoch: 0, accuracy: 0.849107, train_loss: 0.491886.
Epoch: 1, accuracy: 0.937835, train_loss: 0.197650.
Epoch: 2, accuracy: 0.963170, train_loss: 0.129683.

إعداد كود كامل على العاملين

للتشغيل فعليًا باستخدام MultiWorkerMirroredStrategy ستحتاج إلى تشغيل عمليات العمال وتمرير TF_CONFIG إليهم.

مثل ملف mnist.py المكتوب سابقًا ، إليك main.py التي تحتوي على نفس الكود الذي مررنا فيه خطوة بخطوة مسبقًا في هذا colab ، فنحن نكتبه فقط في ملف حتى يقوم كل عامل بتشغيله:

ملف: main.py

Writing main.py

تدريب وتقييم

يحتوي الدليل الحالي الآن على كلا ملفي Python:

ls *.py
main.py
mnist.py

لذا ، قم بتسلسل TF_CONFIG إلى متغيرات البيئة:

os.environ['TF_CONFIG'] = json.dumps(tf_config)

الآن ، يمكنك تشغيل عملية عاملة تقوم بتشغيل main.py واستخدام TF_CONFIG :

# first kill any previous runs
%killbgscripts
All background processes were killed.
python main.py &> job_0.log

هناك بعض الأشياء التي يجب ملاحظتها حول الأمر أعلاه:

  1. يستخدم %%bash وهو عبارة عن دفتر ملاحظات "سحري" لتشغيل بعض أوامر bash.
  2. يستخدم العلامة --bg لتشغيل عملية bash في الخلفية ، لأن هذا العامل لن ينتهي. ينتظر جميع العمال قبل أن يبدأ.

لن تقوم عملية العامل ذات الخلفية بطباعة الإخراج إلى هذا الكمبيوتر الدفتري ، لذا فإن &> يعيد توجيه الإخراج إلى ملف ، حتى تتمكن من رؤية ما حدث.

لذلك ، انتظر بضع ثوان حتى تبدأ العملية:

import time
time.sleep(20)

انظر الآن إلى ما تم إخراج ملف سجل العامل حتى الآن:

cat job_0.log
2022-01-26 05:35:49.225025: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2022-01-26 05:35:49.225297: E tensorflow/stream_executor/cuda/cuda_diagnostics.cc:313] kernel version 470.63.1 does not match DSO version 470.86.0 -- cannot find working devices in this configuration

يجب أن يقول السطر الأخير من ملف السجل: تم Started server with target: grpc://localhost:12345 . أصبح العامل الأول جاهزًا الآن ، وينتظر جميع العمال الآخرين ليكونوا مستعدين للمضي قدمًا.

لذا قم بتحديث tf_config لعملية العامل الثاني لالتقاط:

tf_config['task']['index'] = 1
os.environ['TF_CONFIG'] = json.dumps(tf_config)

الآن أطلق العامل الثاني. سيبدأ هذا التدريب نظرًا لأن جميع العمال نشطين (لذلك ليست هناك حاجة لخلفية هذه العملية):

python main.py > /dev/null 2>&1

الآن إذا أعدت فحص السجلات التي كتبها العامل الأول ، فسترى أنه شارك في تدريب هذا النموذج:

cat job_0.log
2022-01-26 05:35:49.225025: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2022-01-26 05:35:49.225297: E tensorflow/stream_executor/cuda/cuda_diagnostics.cc:313] kernel version 470.63.1 does not match DSO version 470.86.0 -- cannot find working devices in this configuration
2022-01-26 05:36:10.343173: W tensorflow/core/framework/dataset.cc:768] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.
Epoch: 0, accuracy: 0.832589, train_loss: 0.531260.
Epoch: 1, accuracy: 0.936161, train_loss: 0.214774.
Epoch: 2, accuracy: 0.958594, train_loss: 0.140772.
# Delete the `TF_CONFIG`, and kill any background tasks so they don't affect the next section.
os.environ.pop('TF_CONFIG', None)
%killbgscripts
All background processes were killed.

تدريب متعدد العمال في العمق

أظهر هذا البرنامج التعليمي سير عمل Custom Training Loop للإعداد متعدد العمال. يتوفر وصف تفصيلي لموضوعات أخرى في model.fit's guide متعدد العمال وقابل للتطبيق على قوائم الشهادات الموثوق بها (CTL).

أنظر أيضا

  1. يوفر التدريب الموزع في دليل TensorFlow نظرة عامة على استراتيجيات التوزيع المتاحة.
  2. النماذج الرسمية ، يمكن تكوين العديد منها لتشغيل إستراتيجيات توزيع متعددة.
  3. يوفر قسم الأداء في الدليل معلومات حول الاستراتيجيات والأدوات الأخرى التي يمكنك استخدامها لتحسين أداء نماذج TensorFlow.