추정기(Estimator)를 사용한 다중 작업자 훈련

TensorFlow.org에서 보기 Google Colab에서 실행 GitHub에서 소스 보기 노트북 다운로드

경고: Estimator는 새 코드에 권장되지 않습니다. Estimator는 v1.Session 스타일 코드를 실행하는데, 이 코드는 올바르게 작성하기가 좀 더 어렵고 특히 TF 2 코드와 결합할 경우 예기치 않게 작동할 수 있습니다. Estimator는 호환성 보장이 적용되지만 보안 취약점 외에는 수정 사항이 제공되지 않습니다. 자세한 내용은 마이그레이션 가이드를 참조하세요.

개요

참고: tf.distribute API와 함께 추정기를 사용할 수는 있지만, tf.distribute와 함께 Keras를 사용하는 것을 추천합니다. Keras를 사용한 다중 작업자 훈련을 참조하세요. tf.distribute.Strategy를 추정기와 사용하는 것은 부분적으로만 지원하고 있습니다.

이 튜토리얼은 tf.estimator를 이용한 분산 다중 작업자 훈련을 위해 tf.distribute.Strategy를 사용하는 방법을 보여줍니다. tf.estimator를 사용하여 코드를 작성하고 있고, 고성능의 장비 한 대로 다룰 수 있는 것보다 더 큰 작업을 수행하는 데 관심이 있다면 이 튜토리얼이 적합합니다.

시작하기 전에 분산 전략 가이드를 읽어주세요. 이 튜토리얼과 같은 모델을 사용하는 다중 GPU 훈련 튜토리얼도 관련이 있습니다.

설정

먼저, 텐서플로를 설정하고 필요한 패키지들을 가져옵니다.

import tensorflow_datasets as tfds
import tensorflow as tf

import os, json
2022-12-15 01:58:31.517648: W tensorflow/compiler/xla/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libnvinfer.so.7'; dlerror: libnvinfer.so.7: cannot open shared object file: No such file or directory
2022-12-15 01:58:31.517754: W tensorflow/compiler/xla/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libnvinfer_plugin.so.7'; dlerror: libnvinfer_plugin.so.7: cannot open shared object file: No such file or directory
2022-12-15 01:58:31.517764: W tensorflow/compiler/tf2tensorrt/utils/py_utils.cc:38] TF-TRT Warning: Cannot dlopen some TensorRT libraries. If you would like to use Nvidia GPU with TensorRT, please make sure the missing libraries mentioned above are installed properly.

참고: TF2.4부터 다중 작업자 미러링 방법은 즉시 실행이 활성화된 경우(기본 설정) 추정기에서 오류를 일으킵니다. TF2.4의 오류는 TypeError: cannot pickle '_thread.lock' object입니다. 자세한 내용은 이슈 #46556을 참조하세요. 해결 방법은 즉시 실행을 비활성화하는 것입니다.

tf.compat.v1.disable_eager_execution()

입력 함수

이 튜토리얼은 TensorFlow 데이터세트의 MNIST 데이터세트를 사용합니다. 코드 내용은 다중 GPU 훈련 튜토리얼과 유사하지만 큰 차이점이 하나 있습니다. 바로 추정기를 사용하여 다중 작업자 훈련을 할 때는 데이터세트를 작업자 숫자대로 나누어 주어야 모델이 수렴합니다. 입력 데이터는 작업자 인덱스로 샤딩(shard)합니다. 그러면 각 작업자가 데이터세트의 1/num_workers 고유 부분을 처리합니다.

BUFFER_SIZE = 10000
BATCH_SIZE = 64

def input_fn(mode, input_context=None):
  datasets, info = tfds.load(name='mnist',
                                with_info=True,
                                as_supervised=True)
  mnist_dataset = (datasets['train'] if mode == tf.estimator.ModeKeys.TRAIN else
                   datasets['test'])

  def scale(image, label):
    image = tf.cast(image, tf.float32)
    image /= 255
    return image, label

  if input_context:
    mnist_dataset = mnist_dataset.shard(input_context.num_input_pipelines,
                                        input_context.input_pipeline_id)
  return mnist_dataset.map(scale).cache().shuffle(BUFFER_SIZE).batch(BATCH_SIZE)

훈련을 수렴시키기 위한 또 다른 방법으로 각 작업자에서 데이터세트를 제각기 다른 시드 값으로 셔플하는 것도 있습니다.

다중 작업자 구성

다중 GPU 훈련 튜토리얼과 비교할 때 가장 큰 차이 중 하나는 다중 워커를 설정하는 부분입니다. TF_CONFIG 환경 변수는 클러스터를 이루는 각 워커에 클러스터 설정을 지정하는 표준 방법입니다.

TF_CONFIG에는 clustertask라는 두 가지 구성 요소가 있습니다. cluster는 전체 클러스터, 다시 말해 클러스터에 속한 작업자와 매개변수 서버에 대한 정보를 제공합니다. task는 현재 작업에 대한 정보를 제공합니다. 첫 번째 구성 요소 cluster는 모든 작업자 및 매개변수 서버에 대해 동일하며 두 번째 구성 요소 task는 각 작업자 및 매개변수 서버에서 다르며 고유한 typeindex를 지정합니다. 이 예제에서는 작업의 typeworker이고, 작업의 index0입니다.

예를 들기 위해 이 튜토리얼에서는 두 개의 작업자를 localhost에 띄울 때의 TF_CONFIG를 보여드리겠습니다. 실제로는 외부 IP 주소 및 포트에 여러 작업자를 만들고 각 작업자에 대해 TF_CONFIG를 적절하게 설정합니다. 예를 들어 index 작업을 수정합니다.

경고: Colab에서 다음 코드를 실행하지 마세요. TensorFlow의 런타임은 지정된 IP 주소 및 포트에서 gRPC 서버를 생성하려고 시도하지만 실패할 가능성이 높습니다. 단일 시스템에서 여러 작업자를 테스트 실행하는 방법의 예는 이 튜토리얼의 keras 버전을 참조하세요.

os.environ['TF_CONFIG'] = json.dumps({
    'cluster': {
        'worker': ["localhost:12345", "localhost:23456"]
    },
    'task': {'type': 'worker', 'index': 0}
})

모델 정의하기

훈련을 위하여 레이어와 옵티마이저, 손실 함수를 정의하세요. 이 튜토리얼에서는 다중 GPU 훈련 튜토리얼과 비슷하게 케라스 레이어로 모델을 정의합니다.

LEARNING_RATE = 1e-4
def model_fn(features, labels, mode):
  model = tf.keras.Sequential([
      tf.keras.layers.Conv2D(32, 3, activation='relu', input_shape=(28, 28, 1)),
      tf.keras.layers.MaxPooling2D(),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(64, activation='relu'),
      tf.keras.layers.Dense(10)
  ])
  logits = model(features, training=False)

  if mode == tf.estimator.ModeKeys.PREDICT:
    predictions = {'logits': logits}
    return tf.estimator.EstimatorSpec(labels=labels, predictions=predictions)

  optimizer = tf.compat.v1.train.GradientDescentOptimizer(
      learning_rate=LEARNING_RATE)
  loss = tf.keras.losses.SparseCategoricalCrossentropy(
      from_logits=True, reduction=tf.keras.losses.Reduction.NONE)(labels, logits)
  loss = tf.reduce_sum(loss) * (1. / BATCH_SIZE)
  if mode == tf.estimator.ModeKeys.EVAL:
    return tf.estimator.EstimatorSpec(mode, loss=loss)

  return tf.estimator.EstimatorSpec(
      mode=mode,
      loss=loss,
      train_op=optimizer.minimize(
          loss, tf.compat.v1.train.get_or_create_global_step()))

MultiWorkerMirroredStrategy

모델을 훈련하기 위하여 tf.distribute.experimental.MultiWorkerMirroredStrategy의 인스턴스를 사용하세요. MultiWorkerMirroredStrategy는 모든 워커의 각 장비에, 모델의 레이어에 있는 모든 변수의 복사본을 만듭니다. 이 전략은 CollectiveOps라는 수집을 위한 통신용 텐서플로 연산을 사용하여 그래디언트를 모으고, 변수들의 값을 동일하게 맞춥니다. 텐서플로로 분산 훈련하기에 이 전략에 대한 더 자세한 내용이 있습니다.

strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()
WARNING:tensorflow:From /tmpfs/tmp/ipykernel_857867/349189047.py:1: _CollectiveAllReduceStrategyExperimental.__init__ (from tensorflow.python.distribute.collective_all_reduce_strategy) is deprecated and will be removed in a future version.
Instructions for updating:
use distribute.MultiWorkerMirroredStrategy instead
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:GPU:0', '/device:GPU:1', '/device:GPU:2', '/device:GPU:3'), communication = CommunicationImplementation.AUTO

모델 훈련 및 평가하기

다음으로, 추정기의 RunConfig에 분산 전략을 지정하십시오. 그리고 tf.estimator.train_and_evaluate로 훈련 및 평가를 합니다. 이 튜토리얼에서는 train_distribute로만 전략을 지정하였기 때문에 훈련 과정만 분산 처리합니다. eval_distribute를 지정하여 평가도 분산 처리할 수 있습니다.

config = tf.estimator.RunConfig(train_distribute=strategy)

classifier = tf.estimator.Estimator(
    model_fn=model_fn, model_dir='/tmp/multiworker', config=config)
tf.estimator.train_and_evaluate(
    classifier,
    train_spec=tf.estimator.TrainSpec(input_fn=input_fn),
    eval_spec=tf.estimator.EvalSpec(input_fn=input_fn)
)
INFO:tensorflow:Initializing RunConfig with distribution strategies.
INFO:tensorflow:Not using Distribute Coordinator.
INFO:tensorflow:Using config: {'_model_dir': '/tmp/multiworker', '_tf_random_seed': None, '_save_summary_steps': 100, '_save_checkpoints_steps': None, '_save_checkpoints_secs': 600, '_session_config': allow_soft_placement: true
graph_options {
  rewrite_options {
    meta_optimizer_iterations: ONE
  }
}
, '_keep_checkpoint_max': 5, '_keep_checkpoint_every_n_hours': 10000, '_log_step_count_steps': 100, '_train_distribute': <tensorflow.python.distribute.collective_all_reduce_strategy._CollectiveAllReduceStrategyExperimental object at 0x7faca03586a0>, '_device_fn': None, '_protocol': None, '_eval_distribute': None, '_experimental_distribute': None, '_experimental_max_worker_delay_secs': None, '_session_creation_timeout_secs': 7200, '_checkpoint_save_graph_def': True, '_service': None, '_cluster_spec': ClusterSpec({}), '_task_type': 'worker', '_task_id': 0, '_global_id_in_cluster': 0, '_master': '', '_evaluation_master': '', '_is_chief': True, '_num_ps_replicas': 0, '_num_worker_replicas': 1, '_distribute_coordinator_mode': None}
INFO:tensorflow:Not using Distribute Coordinator.
INFO:tensorflow:Running training and evaluation locally (non-distributed).
INFO:tensorflow:Start train and evaluate loop. The evaluate will happen after every checkpoint. Checkpoint frequency is determined based on RunConfig arguments: save_checkpoints_steps None or save_checkpoints_secs 600.
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.9/site-packages/tensorflow_estimator/python/estimator/estimator.py:1244: StrategyBase.configure (from tensorflow.python.distribute.distribute_lib) is deprecated and will be removed in a future version.
Instructions for updating:
use `update_config_proto` instead.
INFO:tensorflow:The `input_fn` accepts an `input_context` which will be given by DistributionStrategy
/tmpfs/src/tf_docs_env/lib/python3.9/site-packages/tensorflow/python/data/ops/dataset_ops.py:461: UserWarning: To make it possible to preserve tf.data options across serialization boundaries, their implementation has moved to be part of the TensorFlow graph. As a consequence, the options value is in general no longer known at graph construction time. Invoking this method in graph mode retains the legacy behavior of the original implementation, but note that the returned value might not reflect the actual value of the options.
  warnings.warn("To make it possible to preserve tf.data options across "
INFO:tensorflow:Calling model_fn.
INFO:tensorflow:Calling model_fn.
INFO:tensorflow:Calling model_fn.
INFO:tensorflow:Calling model_fn.
INFO:tensorflow:Calling model_fn.
INFO:tensorflow:Calling model_fn.
INFO:tensorflow:Calling model_fn.
INFO:tensorflow:Calling model_fn.
INFO:tensorflow:Collective all_reduce tensors: 6 all_reduces, num_devices = 4, group_size = 4, implementation = CommunicationImplementation.AUTO, num_packs = 1
INFO:tensorflow:Collective all_reduce tensors: 6 all_reduces, num_devices = 4, group_size = 4, implementation = CommunicationImplementation.AUTO, num_packs = 1
INFO:tensorflow:Done calling model_fn.
INFO:tensorflow:Done calling model_fn.
INFO:tensorflow:Done calling model_fn.
INFO:tensorflow:Done calling model_fn.
INFO:tensorflow:Done calling model_fn.
INFO:tensorflow:Done calling model_fn.
INFO:tensorflow:Done calling model_fn.
INFO:tensorflow:Done calling model_fn.
INFO:tensorflow:Collective all_reduce tensors: 1 all_reduces, num_devices = 4, group_size = 4, implementation = CommunicationImplementation.AUTO, num_packs = 1
INFO:tensorflow:Collective all_reduce tensors: 1 all_reduces, num_devices = 4, group_size = 4, implementation = CommunicationImplementation.AUTO, num_packs = 1
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.9/site-packages/tensorflow/python/autograph/pyct/static_analysis/liveness.py:83: Analyzer.lamba_check (from tensorflow.python.autograph.pyct.static_analysis.liveness) is deprecated and will be removed after 2023-09-23.
Instructions for updating:
Lambda fuctions will be no more assumed to be used in the statement where they are used, or at least in the same block. https://github.com/tensorflow/tensorflow/issues/56089
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.9/site-packages/tensorflow/python/autograph/pyct/static_analysis/liveness.py:83: Analyzer.lamba_check (from tensorflow.python.autograph.pyct.static_analysis.liveness) is deprecated and will be removed after 2023-09-23.
Instructions for updating:
Lambda fuctions will be no more assumed to be used in the statement where they are used, or at least in the same block. https://github.com/tensorflow/tensorflow/issues/56089
INFO:tensorflow:Create CheckpointSaverHook.
INFO:tensorflow:Create CheckpointSaverHook.
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.9/site-packages/tensorflow_estimator/python/estimator/util.py:95: DistributedIteratorV1.initialize (from tensorflow.python.distribute.v1.input_lib) is deprecated and will be removed in a future version.
Instructions for updating:
Use the iterator's `initializer` property instead.
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.9/site-packages/tensorflow_estimator/python/estimator/util.py:95: DistributedIteratorV1.initialize (from tensorflow.python.distribute.v1.input_lib) is deprecated and will be removed in a future version.
Instructions for updating:
Use the iterator's `initializer` property instead.
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Done running local_init_op.
INFO:tensorflow:Done running local_init_op.
INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 0...
INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 0...
INFO:tensorflow:Saving checkpoints for 0 into /tmp/multiworker/model.ckpt.
INFO:tensorflow:Saving checkpoints for 0 into /tmp/multiworker/model.ckpt.
INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 0...
INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 0...
2022-12-15 01:58:42.017152: W tensorflow/core/grappler/utils/graph_view.cc:836] No registered 'MultiDeviceIteratorFromStringHandle' OpKernel for GPU devices compatible with node { {node MultiDeviceIteratorFromStringHandle} }
    .  Registered:  device='CPU'

2022-12-15 01:58:42.018439: W tensorflow/core/grappler/utils/graph_view.cc:836] No registered 'MultiDeviceIteratorGetNextFromShard' OpKernel for GPU devices compatible with node { {node MultiDeviceIteratorGetNextFromShard} }
    .  Registered:  device='CPU'

2022-12-15 01:58:42.047657: W tensorflow/core/grappler/utils/graph_view.cc:836] No registered 'MultiDeviceIteratorFromStringHandle' OpKernel for GPU devices compatible with node { {node MultiDeviceIteratorFromStringHandle} }
    .  Registered:  device='CPU'

2022-12-15 01:58:42.048658: W tensorflow/core/grappler/utils/graph_view.cc:836] No registered 'MultiDeviceIteratorGetNextFromShard' OpKernel for GPU devices compatible with node { {node MultiDeviceIteratorGetNextFromShard} }
    .  Registered:  device='CPU'

2022-12-15 01:58:42.053870: W tensorflow/core/grappler/utils/graph_view.cc:836] No registered 'MultiDeviceIteratorFromStringHandle' OpKernel for GPU devices compatible with node { {node MultiDeviceIteratorFromStringHandle} }
    .  Registered:  device='CPU'

2022-12-15 01:58:42.054532: W tensorflow/core/grappler/utils/graph_view.cc:836] No registered 'MultiDeviceIteratorGetNextFromShard' OpKernel for GPU devices compatible with node { {node MultiDeviceIteratorGetNextFromShard} }
    .  Registered:  device='CPU'

2022-12-15 01:58:42.063150: W tensorflow/core/grappler/utils/graph_view.cc:836] No registered 'MultiDeviceIteratorFromStringHandle' OpKernel for GPU devices compatible with node { {node MultiDeviceIteratorFromStringHandle} }
    .  Registered:  device='CPU'

2022-12-15 01:58:42.063741: W tensorflow/core/grappler/utils/graph_view.cc:836] No registered 'MultiDeviceIteratorGetNextFromShard' OpKernel for GPU devices compatible with node { {node MultiDeviceIteratorGetNextFromShard} }
    .  Registered:  device='CPU'
INFO:tensorflow:loss = 2.3004613, step = 0
INFO:tensorflow:loss = 2.3004613, step = 0
INFO:tensorflow:global_step/sec: 142.044
INFO:tensorflow:global_step/sec: 142.044
INFO:tensorflow:loss = 2.281633, step = 100 (0.706 sec)
INFO:tensorflow:loss = 2.281633, step = 100 (0.706 sec)
INFO:tensorflow:global_step/sec: 193.118
INFO:tensorflow:global_step/sec: 193.118
INFO:tensorflow:loss = 2.280754, step = 200 (0.517 sec)
INFO:tensorflow:loss = 2.280754, step = 200 (0.517 sec)
INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 234...
INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 234...
INFO:tensorflow:Saving checkpoints for 234 into /tmp/multiworker/model.ckpt.
INFO:tensorflow:Saving checkpoints for 234 into /tmp/multiworker/model.ckpt.
INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 234...
INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 234...
INFO:tensorflow:Calling model_fn.
INFO:tensorflow:Calling model_fn.
INFO:tensorflow:Done calling model_fn.
INFO:tensorflow:Done calling model_fn.
INFO:tensorflow:Starting evaluation at 2022-12-15T01:58:49
INFO:tensorflow:Starting evaluation at 2022-12-15T01:58:49
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:Restoring parameters from /tmp/multiworker/model.ckpt-234
INFO:tensorflow:Restoring parameters from /tmp/multiworker/model.ckpt-234
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Done running local_init_op.
INFO:tensorflow:Done running local_init_op.
INFO:tensorflow:Evaluation [10/100]
INFO:tensorflow:Evaluation [10/100]
INFO:tensorflow:Evaluation [20/100]
INFO:tensorflow:Evaluation [20/100]
INFO:tensorflow:Evaluation [30/100]
INFO:tensorflow:Evaluation [30/100]
INFO:tensorflow:Evaluation [40/100]
INFO:tensorflow:Evaluation [40/100]
INFO:tensorflow:Evaluation [50/100]
INFO:tensorflow:Evaluation [50/100]
INFO:tensorflow:Evaluation [60/100]
INFO:tensorflow:Evaluation [60/100]
INFO:tensorflow:Evaluation [70/100]
INFO:tensorflow:Evaluation [70/100]
INFO:tensorflow:Evaluation [80/100]
INFO:tensorflow:Evaluation [80/100]
INFO:tensorflow:Evaluation [90/100]
INFO:tensorflow:Evaluation [90/100]
INFO:tensorflow:Evaluation [100/100]
INFO:tensorflow:Evaluation [100/100]
INFO:tensorflow:Inference Time : 0.84291s
INFO:tensorflow:Inference Time : 0.84291s
INFO:tensorflow:Finished evaluation at 2022-12-15-01:58:49
INFO:tensorflow:Finished evaluation at 2022-12-15-01:58:49
INFO:tensorflow:Saving dict for global step 234: global_step = 234, loss = 2.280501
INFO:tensorflow:Saving dict for global step 234: global_step = 234, loss = 2.280501
INFO:tensorflow:Saving 'checkpoint_path' summary for global step 234: /tmp/multiworker/model.ckpt-234
INFO:tensorflow:Saving 'checkpoint_path' summary for global step 234: /tmp/multiworker/model.ckpt-234
INFO:tensorflow:Loss for final step: 2.2873833.
INFO:tensorflow:Loss for final step: 2.2873833.
({'loss': 2.280501, 'global_step': 234}, [])

훈련 성능 최적화하기

이제 tf.distribute.Strategy에 의해 구동되는 모델과 다중 작업자 지원 Estimator가 있습니다. 다중 작업자 훈련의 성능을 최적화하기 위해 다음 방법을 시도할 수 있습니다.

  • 배치 크기 늘리기: 여기서 지정하는 배치 크기는 GPU당 크기입니다. 일반적으로, GPU 메모리 크기에 맞는 한 가장 크게 배치 크기를 잡는 것이 좋습니다.

  • 변수 형변환: 가능하면 변수를 tf.float 타입으로 바꾸세요. 공식 ResNet 모델의 예제에서 어떻게 변환하는지 볼 수 있습니다.

  • 집합 통신 구현을 사용하세요: MultiWorkerMirroredStrategy는 여러 가지 집합 통신 구현을 제공합니다.

    • RING은 장비 간 통신을 위하여 gRPC를 써서 링 네트워크 기반의 집합 통신을 구현한 것입니다.
    • NCCLNvidia의 NCCL을 사용하여 수집 연산을 구현한 것입니다.
    • AUTO는 런타임이 알아서 고르도록 합니다.

    어떤 집합 구현이 가장 좋은지는 GPU의 숫자와 종류, 클러스터 장비 간 네트워크 연결 등에 따라 다를 수 있습니다. 런타임 자동 선택을 오버라이드하려면, MultiWorkerMirroredStrategy 생성자의 communication 인자에 적절한 값을 주면 됩니다. 예를 들어 communication=tf.distribute.experimental.CollectiveCommunication.NCCL과 같이 주면 됩니다.

가이드의 성능 섹션을 방문하여 TensorFlow 모델의 성능을 최적화하는 데 사용할 수 있는 다른 전략과 도구에 대해 자세히 알아보세요.

다른 코드 예제

  1. 처음부터 끝까지 살펴보는 예제에서는 tensorflow/ecosystem의 쿠버네티스(Kubernetes) 템플릿을 이용하여 다중 워커를 사용하여 훈련합니다. 이 예제에서는 케라스 모델을 만든 후, tf.keras.estimator.model_to_estimator API를 이용하여 추정기 모델로 변환합니다.
  2. 많은 부분을 다중 분산 전략으로 실행할 수 있는 공식 모델.