Di chuyển đào tạo CPU / GPU nhiều nhân viên

Xem trên TensorFlow.org Chạy trong Google Colab Xem nguồn trên GitHub Tải xuống sổ ghi chép

Hướng dẫn này trình bày cách di chuyển quy trình đào tạo phân tán nhiều nhân viên của bạn từ TensorFlow 1 sang TensorFlow 2.

Để thực hiện đào tạo nhiều nhân viên với CPU / GPU:

Thành lập

Bắt đầu với một số nhập cần thiết và một tập dữ liệu đơn giản cho mục đích trình diễn:

# The notebook uses a dataset instance for `Model.fit` with
# `ParameterServerStrategy`, which depends on symbols in TF 2.7.
# Install a utility needed for this demonstration
!pip install portpicker

import tensorflow as tf
import tensorflow.compat.v1 as tf1
features = [[1., 1.5], [2., 2.5], [3., 3.5]]
labels = [[0.3], [0.5], [0.7]]
eval_features = [[4., 4.5], [5., 5.5], [6., 6.5]]
eval_labels = [[0.8], [0.9], [1.]]

Bạn sẽ cần biến môi trường cấu hình 'TF_CONFIG' để đào tạo trên nhiều máy trong TensorFlow. Sử dụng 'TF_CONFIG' để chỉ định địa chỉ 'cluster''task' . (Tìm hiểu thêm trong hướng dẫn phân phối_ đào tạo.)

import json
import os

tf_config = {
    'cluster': {
        'chief': ['localhost:11111'],
        'worker': ['localhost:12345', 'localhost:23456', 'localhost:21212'],
        'ps': ['localhost:12121', 'localhost:13131'],
    },
    'task': {'type': 'chief', 'index': 0}
}

os.environ['TF_CONFIG'] = json.dumps(tf_config)

Sử dụng câu lệnh del để xóa biến (nhưng trong khóa đào tạo nhiều nhân viên trong thế giới thực trong TensorFlow 1, bạn sẽ không phải làm điều này):

del os.environ['TF_CONFIG']

TensorFlow 1: Đào tạo phân tán nhiều nhân viên với API tf.estimator

Đoạn mã sau trình bày quy trình làm việc chuẩn của đào tạo nhiều nhân viên trong TF1: bạn sẽ sử dụng tf.estimator.Estimator , tf.estimator.TrainSpec , tf.estimator.EvalSpectf.estimator.train_and_evaluate API để phân phối Đào tạo:

def _input_fn():
  return tf1.data.Dataset.from_tensor_slices((features, labels)).batch(1)

def _eval_input_fn():
  return tf1.data.Dataset.from_tensor_slices(
      (eval_features, eval_labels)).batch(1)

def _model_fn(features, labels, mode):
  logits = tf1.layers.Dense(1)(features)
  loss = tf1.losses.mean_squared_error(labels=labels, predictions=logits)
  optimizer = tf1.train.AdagradOptimizer(0.05)
  train_op = optimizer.minimize(loss, global_step=tf1.train.get_global_step())
  return tf1.estimator.EstimatorSpec(mode, loss=loss, train_op=train_op)

estimator = tf1.estimator.Estimator(model_fn=_model_fn)
train_spec = tf1.estimator.TrainSpec(input_fn=_input_fn)
eval_spec = tf1.estimator.EvalSpec(input_fn=_eval_input_fn)
tf1.estimator.train_and_evaluate(estimator, train_spec, eval_spec)
INFO:tensorflow:Using default config.
WARNING:tensorflow:Using temporary folder as model directory: /tmp/tmpvfb91q_5
INFO:tensorflow:Using config: {'_model_dir': '/tmp/tmpvfb91q_5', '_tf_random_seed': None, '_save_summary_steps': 100, '_save_checkpoints_steps': None, '_save_checkpoints_secs': 600, '_session_config': allow_soft_placement: true
graph_options {
  rewrite_options {
    meta_optimizer_iterations: ONE
  }
}
, '_keep_checkpoint_max': 5, '_keep_checkpoint_every_n_hours': 10000, '_log_step_count_steps': 100, '_train_distribute': None, '_device_fn': None, '_protocol': None, '_eval_distribute': None, '_experimental_distribute': None, '_experimental_max_worker_delay_secs': None, '_session_creation_timeout_secs': 7200, '_checkpoint_save_graph_def': True, '_service': None, '_cluster_spec': ClusterSpec({}), '_task_type': 'worker', '_task_id': 0, '_global_id_in_cluster': 0, '_master': '', '_evaluation_master': '', '_is_chief': True, '_num_ps_replicas': 0, '_num_worker_replicas': 1}
INFO:tensorflow:Not using Distribute Coordinator.
INFO:tensorflow:Running training and evaluation locally (non-distributed).
INFO:tensorflow:Start train and evaluate loop. The evaluate will happen after every checkpoint. Checkpoint frequency is determined based on RunConfig arguments: save_checkpoints_steps None or save_checkpoints_secs 600.
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow/python/training/training_util.py:401: Variable.initialized_value (from tensorflow.python.ops.variables) is deprecated and will be removed in a future version.
Instructions for updating:
Use Variable.read_value. Variables in 2.X are initialized automatically both in eager and graph (inside tf.defun) contexts.
INFO:tensorflow:Calling model_fn.
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow/python/training/adagrad.py:143: calling Constant.__init__ (from tensorflow.python.ops.init_ops) with dtype is deprecated and will be removed in a future version.
Instructions for updating:
Call initializer instance with the dtype argument instead of passing it to the constructor
INFO:tensorflow:Done calling model_fn.
INFO:tensorflow:Create CheckpointSaverHook.
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Done running local_init_op.
INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 0...
INFO:tensorflow:Saving checkpoints for 0 into /tmp/tmpvfb91q_5/model.ckpt.
INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 0...
INFO:tensorflow:loss = 0.038075272, step = 0
INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 3...
INFO:tensorflow:Saving checkpoints for 3 into /tmp/tmpvfb91q_5/model.ckpt.
INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 3...
INFO:tensorflow:Calling model_fn.
INFO:tensorflow:Done calling model_fn.
INFO:tensorflow:Starting evaluation at 2021-11-13T02:31:06
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:Restoring parameters from /tmp/tmpvfb91q_5/model.ckpt-3
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Done running local_init_op.
INFO:tensorflow:Inference Time : 0.13630s
INFO:tensorflow:Finished evaluation at 2021-11-13-02:31:06
INFO:tensorflow:Saving dict for global step 3: global_step = 3, loss = 0.005215075
INFO:tensorflow:Saving 'checkpoint_path' summary for global step 3: /tmp/tmpvfb91q_5/model.ckpt-3
INFO:tensorflow:Loss for final step: 0.061832994.
({'loss': 0.005215075, 'global_step': 3}, [])

TensorFlow 2: Đào tạo nhiều nhân viên với các chiến lược phân phối

Trong TensorFlow 2, đào tạo phân tán cho nhiều nhân viên với CPU, GPU và TPU được thực hiện thông qua tf.distribute.Strategy s.

Ví dụ sau minh họa cách sử dụng hai chiến lược như vậy: tf.distribute.experimental.ParameterServerStrategytf.distribute.MultiWorkerMirroredStrategy , cả hai đều được thiết kế để đào tạo CPU / GPU với nhiều nhân viên.

ParameterServerStrategy sử dụng một điều phối viên ( 'chief' ), điều này làm cho nó thân thiện hơn với môi trường trong sổ ghi chép Colab này. Bạn sẽ sử dụng một số tiện ích ở đây để thiết lập các yếu tố hỗ trợ cần thiết cho trải nghiệm chạy được ở đây: bạn sẽ tạo một cụm trong quá trình , nơi các luồng được sử dụng để mô phỏng các máy chủ tham số ( 'ps' ) và worker ( 'worker' ) . Để biết thêm thông tin về đào tạo máy chủ tham số, hãy tham khảo hướng dẫn đào tạo máy chủ tham số với ParameterServerStrategy .

Trong ví dụ này, trước tiên hãy xác định biến môi trường 'TF_CONFIG' bằng tf.distribute.cluster_resolver.TFConfigClusterResolver để cung cấp thông tin cụm. Nếu bạn đang sử dụng hệ thống quản lý cụm cho đào tạo phân tán của mình, hãy kiểm tra xem nó đã cung cấp 'TF_CONFIG' cho bạn chưa, trong trường hợp đó, bạn không cần đặt biến môi trường này một cách rõ ràng. (Tìm hiểu thêm trong phần Thiết lập biến môi trường 'TF_CONFIG' trong hướng dẫn Đào tạo phân tán với TensorFlow .)

# Find ports that are available for the `'chief'` (the coordinator),
# `'worker'`s, and `'ps'` (parameter servers).
import portpicker

chief_port = portpicker.pick_unused_port()
worker_ports = [portpicker.pick_unused_port() for _ in range(3)]
ps_ports = [portpicker.pick_unused_port() for _ in range(2)]

# Dump the cluster information to `'TF_CONFIG'`.
tf_config = {
    'cluster': {
        'chief': ["localhost:%s" % chief_port],
        'worker': ["localhost:%s" % port for port in worker_ports],
        'ps':  ["localhost:%s" % port for port in ps_ports],
    },
    'task': {'type': 'chief', 'index': 0}
}
os.environ['TF_CONFIG'] = json.dumps(tf_config)

# Use a cluster resolver to bridge the information to the strategy created below.
cluster_resolver = tf.distribute.cluster_resolver.TFConfigClusterResolver()

Sau đó, tạo tf.distribute.Server cho từng công nhân và máy chủ tham số:

# Workers need some inter_ops threads to work properly.
# This is only needed for this notebook to demo. Real servers
# should not need this.
worker_config = tf.compat.v1.ConfigProto()
worker_config.inter_op_parallelism_threads = 4

for i in range(3):
  tf.distribute.Server(
      cluster_resolver.cluster_spec(),
      job_name="worker",
      task_index=i,
      config=worker_config)

for i in range(2):
  tf.distribute.Server(
      cluster_resolver.cluster_spec(),
      job_name="ps",
      task_index=i)

Trong đào tạo phân tán trong thế giới thực, thay vì bắt đầu tất cả tf.distribute.Server trên bộ điều phối, bạn sẽ sử dụng nhiều máy và những máy được chỉ định là "worker""ps" (máy chủ tham số) sẽ sử dụng mỗi máy chạy một tf.distribute.Server . Tham khảo phần Cluster trong thế giới thực trong hướng dẫn đào tạo máy chủ Tham số để biết thêm chi tiết.

Với mọi thứ đã sẵn sàng, hãy tạo đối tượng ParameterServerStrategy :

strategy = tf.distribute.experimental.ParameterServerStrategy(cluster_resolver)
INFO:tensorflow:`tf.distribute.experimental.ParameterServerStrategy` is initialized with cluster_spec: ClusterSpec({'chief': ['localhost:16660'], 'ps': ['localhost:15313', 'localhost:20369'], 'worker': ['localhost:21380', 'localhost:18699', 'localhost:19420']})
INFO:tensorflow:ParameterServerStrategyV2 is now connecting to cluster with cluster_spec: ClusterSpec({'chief': ['localhost:16660'], 'ps': ['localhost:15313', 'localhost:20369'], 'worker': ['localhost:21380', 'localhost:18699', 'localhost:19420']})
INFO:tensorflow:ParameterServerStrategy (CentralStorageStrategy if you are using a single machine) with compute_devices = ['/job:chief/replica:0/task:0/device:GPU:0'], variable_device = '/job:chief/replica:0/task:0/device:GPU:0'
INFO:tensorflow:Number of GPUs on workers: 1

Khi bạn đã tạo một đối tượng chiến lược, hãy xác định mô hình, trình tối ưu hóa và các biến khác và gọi Keras Model.compile trong API Strategy.scope để phân phối khóa đào tạo. (Tham khảo tài liệu API Strategy.scope để biết thêm thông tin.)

Ví dụ: nếu bạn muốn tùy chỉnh quá trình huấn luyện của mình bằng cách xác định đường chuyền tiến và lùi, hãy tham khảo phần Huấn luyện với vòng lặp huấn luyện tùy chỉnh trong Hướng dẫn huấn luyện máy chủ tham số để biết thêm chi tiết.

dataset = tf.data.Dataset.from_tensor_slices(
      (features, labels)).shuffle(10).repeat().batch(64)

eval_dataset = tf.data.Dataset.from_tensor_slices(
      (eval_features, eval_labels)).repeat().batch(1)

with strategy.scope():
  model = tf.keras.models.Sequential([tf.keras.layers.Dense(1)])
  optimizer = tf.keras.optimizers.Adagrad(learning_rate=0.05)
  model.compile(optimizer, "mse")

model.fit(dataset, epochs=5, steps_per_epoch=10)
Epoch 1/5
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow/python/data/ops/dataset_ops.py:453: UserWarning: To make it possible to preserve tf.data options across serialization boundaries, their implementation has moved to be part of the TensorFlow graph. As a consequence, the options value is in general no longer known at graph construction time. Invoking this method in graph mode retains the legacy behavior of the original implementation, but note that the returned value might not reflect the actual value of the options.
  warnings.warn("To make it possible to preserve tf.data options across "
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
2021-11-13 02:31:09.110074: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:766] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_FLOAT
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 3
  }
}
attr {
  key: "is_files"
  value {
    b: false
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\024TensorSliceDataset:4"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 2
        }
      }
      shape {
        dim {
          size: 1
        }
      }
    }
  }
}

2021-11-13 02:31:09.115349: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:766] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_FLOAT
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 3
  }
}
attr {
  key: "is_files"
  value {
    b: false
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\024TensorSliceDataset:4"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 2
        }
      }
      shape {
        dim {
          size: 1
        }
      }
    }
  }
}

2021-11-13 02:31:09.117963: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:766] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_FLOAT
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 3
  }
}
attr {
  key: "is_files"
  value {
    b: false
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\024TensorSliceDataset:4"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 2
        }
      }
      shape {
        dim {
          size: 1
        }
      }
    }
  }
}
10/10 - 3s - loss: 7.4912 - 3s/epoch - 259ms/step
Epoch 2/5
10/10 - 0s - loss: 3.3420 - 43ms/epoch - 4ms/step
Epoch 3/5
10/10 - 0s - loss: 1.9022 - 44ms/epoch - 4ms/step
Epoch 4/5
10/10 - 0s - loss: 1.1536 - 42ms/epoch - 4ms/step
Epoch 5/5
10/10 - 0s - loss: 0.7208 - 43ms/epoch - 4ms/step
<keras.callbacks.History at 0x7f45d83f3a50>
model.evaluate(eval_dataset, steps=10, return_dict=True)
1/10 [==>...........................] - ETA: 11s - loss: 2.4114
2021-11-13 02:31:10.757780: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:766] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_FLOAT
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 3
  }
}
attr {
  key: "is_files"
  value {
    b: false
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\024TensorSliceDataset:8"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 2
        }
      }
      shape {
        dim {
          size: 1
        }
      }
    }
  }
}

2021-11-13 02:31:10.910985: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:766] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_FLOAT
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 3
  }
}
attr {
  key: "is_files"
  value {
    b: false
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\024TensorSliceDataset:8"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 2
        }
      }
      shape {
        dim {
          size: 1
        }
      }
    }
  }
}
10/10 [==============================] - 2s 38ms/step - loss: 3.8431
2021-11-13 02:31:11.053772: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:766] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_FLOAT
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 3
  }
}
attr {
  key: "is_files"
  value {
    b: false
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\024TensorSliceDataset:8"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 2
        }
      }
      shape {
        dim {
          size: 1
        }
      }
    }
  }
}
{'loss': 3.843122}

Phân vùng ( tf.distribute.experimental.partitioners )

ParameterServerStrategy trong TensorFlow 2 hỗ trợ phân vùng có thể thay đổi và cung cấp các phân vùng giống như TensorFlow 1, với các tên ít gây nhầm lẫn hơn: - tf.compat.v1.variable_axis_size_partitioner -> tf.distribute.experimental.partitioners.MaxSizePartitioner : một phân vùng có kích thước tối đa) . - tf.compat.v1.min_max_variable_partitioner -> tf.distribute.experimental.partitioners.MinSizePartitioner : trình phân vùng phân bổ kích thước tối thiểu cho mỗi phân đoạn. - tf.compat.v1.fixed_size_partitioner -> tf.distribute.experimental.partitioners.FixedShardsPartitioner : một trình phân vùng phân bổ một số lượng cố định các phân đoạn.

Ngoài ra, bạn có thể sử dụng đối tượng MultiWorkerMirroredStrategy :

# To clean up the `TF_CONFIG` used for `ParameterServerStrategy`.
del os.environ['TF_CONFIG']
strategy = tf.distribute.MultiWorkerMirroredStrategy()
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled.
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:GPU:0',), communication = CommunicationImplementation.AUTO

Bạn có thể thay thế chiến lược được sử dụng ở trên bằng một đối tượng MultiWorkerMirroredStrategy để thực hiện đào tạo với chiến lược này.

Như với các API tf.estimator , vì MultiWorkerMirroredStrategy là một chiến lược nhiều khách hàng, nên không có cách nào dễ dàng để chạy đào tạo phân tán trong sổ ghi chép Colab này. Do đó, việc thay thế đoạn mã trên bằng chiến lược này sẽ khiến mọi thứ chạy cục bộ. Đào tạo nhiều nhân viên với Keras Model.fit / hướng dẫn vòng lặp đào tạo tùy chỉnh trình bày cách chạy đào tạo nhiều nhân viên với biến 'TF_CONFIG' được thiết lập, với hai nhân viên trên một localhost ở Colab. Trên thực tế, bạn sẽ tạo nhiều worker trên các địa chỉ / cổng IP bên ngoài và sử dụng biến 'TF_CONFIG' để chỉ định cấu hình cụm cho từng worker.

Bước tiếp theo

Để tìm hiểu thêm về đào tạo phân tán nhiều nhân viên với tf.distribute.experimental.ParameterServerStrategytf.distribute.MultiWorkerMirroredStrategy trong TensorFlow 2, hãy xem xét các tài nguyên sau: