Migracja szkolenia CPU/GPU dla wielu pracowników

Ten przewodnik pokazuje, jak przeprowadzić migrację rozproszonego przepływu pracy szkolenia obejmującego wielu pracowników z TensorFlow 1 do TensorFlow 2.

Aby przeprowadzić szkolenie wieloosobowe z wykorzystaniem procesorów/GPU:


Zacznij od kilku niezbędnych importów i prostego zestawu danych do celów demonstracyjnych:

# The notebook uses a dataset instance for `Model.fit` with
# `ParameterServerStrategy`, which depends on symbols in TF 2.7.
# Install a utility needed for this demonstration
!pip install portpicker

import tensorflow as tf
import tensorflow.compat.v1 as tf1
features = [[1., 1.5], [2., 2.5], [3., 3.5]]
= [[0.3], [0.5], [0.7]]
= [[4., 4.5], [5., 5.5], [6., 6.5]]
= [[0.8], [0.9], [1.]]

Do szkolenia na wielu komputerach w TensorFlow potrzebna będzie zmienna środowiskowa konfiguracji 'TF_CONFIG' . Użyj 'TF_CONFIG' aby określić adresy 'cluster' i 'task' . (Dowiedz się więcej w przewodniku Distributed_training ).

import json
import os

= {
'cluster': {
'chief': ['localhost:11111'],
'worker': ['localhost:12345', 'localhost:23456', 'localhost:21212'],
'ps': ['localhost:12121', 'localhost:13131'],
'task': {'type': 'chief', 'index': 0}

.environ['TF_CONFIG'] = json.dumps(tf_config)

Użyj instrukcji del , aby usunąć zmienną (ale w prawdziwym szkoleniu dla wielu pracowników w TensorFlow 1, nie musisz tego robić):

del os.environ['TF_CONFIG']

TensorFlow 1: Szkolenie rozproszone dla wielu pracowników z interfejsami API tf.estimator

Poniższy fragment kodu demonstruje kanoniczny przepływ pracy szkolenia wielu pracowników w TF1: do dystrybucji użyjesz interfejsów API tf.estimator.Estimator , tf.estimator.TrainSpec , tf.estimator.EvalSpec i tf.estimator.train_and_evaluate trening:

def _input_fn():
return tf1.data.Dataset.from_tensor_slices((features, labels)).batch(1)

def _eval_input_fn():
return tf1.data.Dataset.from_tensor_slices(
(eval_features, eval_labels)).batch(1)

def _model_fn(features, labels, mode):
= tf1.layers.Dense(1)(features)
= tf1.losses.mean_squared_error(labels=labels, predictions=logits)
= tf1.train.AdagradOptimizer(0.05)
= optimizer.minimize(loss, global_step=tf1.train.get_global_step())
return tf1.estimator.EstimatorSpec(mode, loss=loss, train_op=train_op)

= tf1.estimator.Estimator(model_fn=_model_fn)
= tf1.estimator.TrainSpec(input_fn=_input_fn)
= tf1.estimator.EvalSpec(input_fn=_eval_input_fn)
.estimator.train_and_evaluate(estimator, train_spec, eval_spec)
TensorFlow 2: Szkolenie dla wielu pracowników ze strategiami dystrybucji

W TensorFlow 2 rozproszone szkolenie dla wielu pracowników z procesorami CPU, GPU i TPU odbywa się za pośrednictwem tf.distribute.Strategy s.

Poniższy przykład pokazuje, jak używać dwóch takich strategii: tf.distribute.experimental.ParameterServerStrategy i tf.distribute.MultiWorkerMirroredStrategy , z których obie są przeznaczone do szkolenia procesora/GPU z wieloma pracownikami.

ParameterServerStrategy zatrudnia koordynatora ( 'chief' ), co czyni go bardziej przyjaznym dla środowiska w tym notebooku Colab. Użyjesz tutaj kilku narzędzi, aby skonfigurować elementy pomocnicze niezbędne do działania tutaj: utworzysz klaster w procesie , w którym wątki są używane do symulacji serwerów parametrów ( 'ps' ) i pracowników ( 'worker' ) . Aby uzyskać więcej informacji na temat uczenia serwera parametrów, zapoznaj się z samouczkiem Uczenie serwera parametrów za pomocą ParameterServerStrategy .

W tym przykładzie najpierw zdefiniuj zmienną środowiskową 'TF_CONFIG' za pomocą tf.distribute.cluster_resolver.TFConfigClusterResolver , aby udostępnić informacje o klastrze. Jeśli używasz systemu zarządzania klastrami do szkolenia rozproszonego, sprawdź, czy zapewnia on już 'TF_CONFIG' , w którym to przypadku nie musisz jawnie ustawiać tej zmiennej środowiskowej. (Dowiedz się więcej w sekcji Konfigurowanie zmiennej środowiskowej 'TF_CONFIG' w przewodniku Szkolenie rozproszone z TensorFlow ).

# Find ports that are available for the `'chief'` (the coordinator),
# `'worker'`s, and `'ps'` (parameter servers).
import portpicker

= portpicker.pick_unused_port()
= [portpicker.pick_unused_port() for _ in range(3)]
= [portpicker.pick_unused_port() for _ in range(2)]

# Dump the cluster information to `'TF_CONFIG'`.
= {
'cluster': {
'chief': ["localhost:%s" % chief_port],
'worker': ["localhost:%s" % port for port in worker_ports],
'ps':  ["localhost:%s" % port for port in ps_ports],
'task': {'type': 'chief', 'index': 0}
.environ['TF_CONFIG'] = json.dumps(tf_config)

# Use a cluster resolver to bridge the information to the strategy created below.
= tf.distribute.cluster_resolver.TFConfigClusterResolver()

Następnie utwórz tf.distribute.Server dla procesów roboczych i serwerów parametrów:

# Workers need some inter_ops threads to work properly.
# This is only needed for this notebook to demo. Real servers
# should not need this.
= tf.compat.v1.ConfigProto()
.inter_op_parallelism_threads = 4

for i in range(3):

for i in range(2):

W szkoleniu rozproszonym w rzeczywistym świecie zamiast uruchamiać wszystkie tf.distribute.Server na koordynatorze, będziesz korzystać z wielu komputerów, a te, które są oznaczone jako "worker" i "ps" (serwery parametrów) będą uruchom tf.distribute.Server . Aby uzyskać więcej informacji, zapoznaj się z sekcją Klastry w świecie rzeczywistym w samouczku dotyczącym szkolenia serwera parametrów .

Gdy wszystko jest gotowe, utwórz obiekt ParameterServerStrategy :

strategy = tf.distribute.experimental.ParameterServerStrategy(cluster_resolver)
INFO:tensorflow:`tf.distribute.experimental.ParameterServerStrategy` is initialized with cluster_spec: ClusterSpec({'chief': ['localhost:16660'], 'ps': ['localhost:15313', 'localhost:20369'], 'worker': ['localhost:21380', 'localhost:18699', 'localhost:19420']})
INFO:tensorflow:ParameterServerStrategyV2 is now connecting to cluster with cluster_spec: ClusterSpec({'chief': ['localhost:16660'], 'ps': ['localhost:15313', 'localhost:20369'], 'worker': ['localhost:21380', 'localhost:18699', 'localhost:19420']})
INFO:tensorflow:ParameterServerStrategy (CentralStorageStrategy if you are using a single machine) with compute_devices = ['/job:chief/replica:0/task:0/device:GPU:0'], variable_device = '/job:chief/replica:0/task:0/device:GPU:0'
INFO:tensorflow:Number of GPUs on workers: 1

Po utworzeniu obiektu strategii zdefiniuj model, optymalizator i inne zmienne, a następnie wywołaj Keras Model.compile w interfejsie API Strategy.scope w celu dystrybucji szkolenia. (Więcej informacji można znaleźć w dokumentacji API Strategy.scope ).

Jeśli wolisz dostosować swój trening, na przykład poprzez zdefiniowanie podań do przodu i do tyłu, zapoznaj się z sekcją Trening z niestandardową pętlą treningową w samouczku dotyczącym treningu serwera parametrów, aby uzyskać więcej informacji.

dataset = tf.data.Dataset.from_tensor_slices(
(features, labels)).shuffle(10).repeat().batch(64)

= tf.data.Dataset.from_tensor_slices(
(eval_features, eval_labels)).repeat().batch(1)

with strategy.scope():
= tf.keras.models.Sequential([tf.keras.layers.Dense(1)])
= tf.keras.optimizers.Adagrad(learning_rate=0.05)
.compile(optimizer, "mse")

.fit(dataset, epochs=5, steps_per_epoch=10)
Epoch 1/5
10/10 - 3s - loss: 7.4912 - 3s/epoch - 259ms/step
Epoch 2/5
10/10 - 0s - loss: 3.3420 - 43ms/epoch - 4ms/step
Epoch 3/5
10/10 - 0s - loss: 1.9022 - 44ms/epoch - 4ms/step
Epoch 4/5
10/10 - 0s - loss: 1.1536 - 42ms/epoch - 4ms/step
Epoch 5/5
10/10 - 0s - loss: 0.7208 - 43ms/epoch - 4ms/step
<keras.callbacks.History at 0x7f45d83f3a50>
model.evaluate(eval_dataset, steps=10, return_dict=True)
10/10 [==============================] - 2s 38ms/step - loss: 3.8431
{'loss': 3.843122}

Partycje ( tf.distribute.experimental.partitioners )

ParameterServerStrategy w TensorFlow 2 obsługuje partycjonowanie zmiennych i oferuje te same partycje co TensorFlow 1, z mniej mylącymi nazwami: - tf.compat.v1.variable_axis_size_partitioner -> tf.distribute.experimental.partitioners.MaxSizePartitioner : partycjoner, który utrzymuje maksymalne rozmiary) . - tf.compat.v1.min_max_variable_partitioner -> tf.distribute.experimental.partitioners.MinSizePartitioner : partycjoner, który przydziela minimalny rozmiar na fragment. - tf.compat.v1.fixed_size_partitioner -> tf.distribute.experimental.partitioners.FixedShardsPartitioner : partycjoner, który przydziela ustaloną liczbę odłamków.

Alternatywnie możesz użyć obiektu MultiWorkerMirroredStrategy :

# To clean up the `TF_CONFIG` used for `ParameterServerStrategy`.
del os.environ['TF_CONFIG']
= tf.distribute.MultiWorkerMirroredStrategy()
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled.
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:GPU:0',), communication = CommunicationImplementation.AUTO

Strategię zastosowaną powyżej można zastąpić obiektem MultiWorkerMirroredStrategy , aby przeprowadzić szkolenie przy użyciu tej strategii.

Podobnie jak w przypadku interfejsów API tf.estimator , ponieważ MultiWorkerMirroredStrategy jest strategią dla wielu klientów, nie ma łatwego sposobu na prowadzenie rozproszonych szkoleń w tym notatniku Colab. Dlatego zastąpienie powyższego kodu tą strategią kończy się uruchamianiem rzeczy lokalnie. Szkolenie dla wielu pracowników z Keras Model.fit / samouczki z niestandardową pętlą szkoleniową pokazują, jak uruchomić szkolenie dla wielu pracowników z ustawioną zmienną 'TF_CONFIG' , z dwoma pracownikami na hoście lokalnym w Colab. W praktyce należy utworzyć wiele procesów roboczych na zewnętrznych adresach IP/portach i użyć zmiennej 'TF_CONFIG' , aby określić konfigurację klastra dla każdego procesu roboczego.

Następne kroki

Aby dowiedzieć się więcej o rozproszonym szkoleniu dla wielu pracowników za pomocą tf.distribute.experimental.ParameterServerStrategy i tf.distribute.MultiWorkerMirroredStrategy w TensorFlow 2, zapoznaj się z następującymi zasobami: