Voir sur TensorFlow.org | Exécuter dans Google Colab | Voir la source sur GitHub | Télécharger le cahier |
Aperçu
Ce didacticiel illustre la formation multi-travailleurs avec une API de boucle de formation personnalisée, distribuée via MultiWorkerMirroredStrategy, de sorte qu'un modèle Keras conçu pour s'exécuter sur un seul travailleur peut fonctionner de manière transparente sur plusieurs travailleurs avec un changement de code minimal.
Nous utilisons des boucles d'entraînement personnalisées pour entraîner notre modèle car elles nous donnent de la flexibilité et un plus grand contrôle sur l'entraînement. De plus, il est plus facile de déboguer le modèle et la boucle d'apprentissage. Des informations plus détaillées sont disponibles dans Rédaction d'une boucle de formation à partir de zéro .
Si vous cherchez comment utiliser MultiWorkerMirroredStrategy
avec keras model.fit
, reportez-vous plutôt à ce didacticiel .
Le guide Distributed Training in TensorFlow est disponible pour un aperçu des stratégies de distribution prises en charge par TensorFlow pour ceux qui souhaitent approfondir leur compréhension des API tf.distribute.Strategy
.
Installer
Tout d'abord, quelques importations nécessaires.
import json
import os
import sys
Avant d'importer TensorFlow, apportez quelques modifications à l'environnement.
Désactivez tous les GPU. Cela évite les erreurs causées par les travailleurs essayant tous d'utiliser le même GPU. Pour une application réelle, chaque travailleur serait sur une machine différente.
os.environ["CUDA_VISIBLE_DEVICES"] = "-1"
Réinitialisez la variable d'environnement TF_CONFIG
, vous en saurez plus plus tard.
os.environ.pop('TF_CONFIG', None)
Assurez-vous que le répertoire actuel se trouve sur le chemin de python. Cela permet au notebook d'importer ultérieurement les fichiers écrits par %%writefile
.
if '.' not in sys.path:
sys.path.insert(0, '.')
Importez maintenant TensorFlow.
import tensorflow as tf
Définition du jeu de données et du modèle
Créez ensuite un fichier mnist.py
avec une configuration simple de modèle et de jeu de données. Ce fichier python sera utilisé par les processus de travail dans ce tutoriel :
%%writefile mnist.py
import os
import tensorflow as tf
import numpy as np
def mnist_dataset(batch_size):
(x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
# The `x` arrays are in uint8 and have values in the range [0, 255].
# You need to convert them to float32 with values in the range [0, 1]
x_train = x_train / np.float32(255)
y_train = y_train.astype(np.int64)
train_dataset = tf.data.Dataset.from_tensor_slices(
(x_train, y_train)).shuffle(60000)
return train_dataset
def dataset_fn(global_batch_size, input_context):
batch_size = input_context.get_per_replica_batch_size(global_batch_size)
dataset = mnist_dataset(batch_size)
dataset = dataset.shard(input_context.num_input_pipelines,
input_context.input_pipeline_id)
dataset = dataset.batch(batch_size)
return dataset
def build_cnn_model():
return tf.keras.Sequential([
tf.keras.Input(shape=(28, 28)),
tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
tf.keras.layers.Conv2D(32, 3, activation='relu'),
tf.keras.layers.Flatten(),
tf.keras.layers.Dense(128, activation='relu'),
tf.keras.layers.Dense(10)
])
Writing mnist.py
Configuration multi-travailleur
Entrons maintenant dans le monde de la formation multi-travailleurs. Dans TensorFlow, la variable d'environnement TF_CONFIG
est requise pour l'entraînement sur plusieurs machines, chacune ayant éventuellement un rôle différent. TF_CONFIG
utilisé ci-dessous est une chaîne JSON utilisée pour spécifier la configuration du cluster sur chaque travailleur faisant partie du cluster. Il s'agit de la méthode par défaut pour spécifier un cluster, à l'aide cluster_resolver.TFConfigClusterResolver
, mais d'autres options sont disponibles dans le module distribute.cluster_resolver
.
Décrivez votre cluster
Voici un exemple de configuration :
tf_config = {
'cluster': {
'worker': ['localhost:12345', 'localhost:23456']
},
'task': {'type': 'worker', 'index': 0}
}
Voici le même TF_CONFIG
sérialisé en tant que chaîne JSON :
json.dumps(tf_config)
'{"cluster": {"worker": ["localhost:12345", "localhost:23456"]}, "task": {"type": "worker", "index": 0} }'
Il existe deux composants de TF_CONFIG
: cluster
et task
.
cluster
est le même pour tous les travailleurs et fournit des informations sur le cluster de formation, qui est un dict composé de différents types d'emplois tels queworker
. Dans la formation multi-travailleurs avecMultiWorkerMirroredStrategy
, il y a généralement unworker
qui assume un peu plus de responsabilités, comme enregistrer un point de contrôle et écrire un fichier récapitulatif pour TensorBoard en plus de ce qu'unworker
régulier fait. Un tel travailleur est appelé le travailleurchief
, et il est d'usage que leworker
avec l'index
0 soit nomméworker
principal (en fait, c'est ainsi quetf.distribute.Strategy
est implémenté).task
fournit des informations sur la tâche en cours et est différente sur chaque travailleur. Il spécifie letype
et l'index
de ce travailleur.
Dans cet exemple, vous définissez le type
de tâche sur "worker"
et l' index
de tâche sur 0
. Cette machine est le premier ouvrier et sera nommée ouvrier en chef et fera plus de travail que les autres. Notez que d'autres machines devront également avoir la variable d'environnement TF_CONFIG
définie, et elle doit avoir le même dict cluster
, mais un type
de tâche ou index
de tâche différent selon les rôles de ces machines.
À des fins d'illustration, ce tutoriel montre comment on peut définir un TF_CONFIG
avec 2 workers sur localhost
. En pratique, les utilisateurs créeraient plusieurs nœuds de calcul sur des adresses IP/ports externes et définiraient TF_CONFIG
sur chaque nœud de travail de manière appropriée.
Dans cet exemple, vous utiliserez 2 travailleurs, le TF_CONFIG
du premier travailleur est illustré ci-dessus. Pour le deuxième travailleur, vous tf_config['task']['index']=1
Ci-dessus, tf_config
est juste une variable locale en python. Pour l'utiliser réellement pour configurer la formation, ce dictionnaire doit être sérialisé en tant que JSON et placé dans la variable d'environnement TF_CONFIG
.
Variables d'environnement et sous-processus dans les notebooks
Les sous-processus héritent des variables d'environnement de leur parent. Donc, si vous définissez une variable d'environnement dans ce processus de jupyter notebook
:
os.environ['GREETINGS'] = 'Hello TensorFlow!'
Vous pouvez accéder à la variable d'environnement à partir d'un sous-processus :
echo ${GREETINGS}
Hello TensorFlow!
Dans la section suivante, vous l'utiliserez pour transmettre le TF_CONFIG
aux sous-processus de travail. Vous ne lanceriez jamais vraiment vos tâches de cette façon, mais c'est suffisant pour les besoins de ce didacticiel : pour illustrer un exemple minimal multi-travailleur.
MultiWorkerMirroredStrategy
Pour former le modèle, utilisez une instance de tf.distribute.MultiWorkerMirroredStrategy
, qui crée des copies de toutes les variables dans les couches du modèle sur chaque appareil sur tous les travailleurs. Le guide tf.distribute.Strategy
contient plus de détails sur cette stratégie.
strategy = tf.distribute.MultiWorkerMirroredStrategy()
2022-01-26 05:35:39.353025: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:CPU:0',), communication = CommunicationImplementation.AUTO 2022-01-26 05:35:39.353298: E tensorflow/stream_executor/cuda/cuda_diagnostics.cc:313] kernel version 470.63.1 does not match DSO version 470.86.0 -- cannot find working devices in this configuration
Utilisez tf.distribute.Strategy.scope
pour spécifier qu'une stratégie doit être utilisée lors de la création de votre modèle. Cela vous place dans le « contexte de répliques croisées » pour cette stratégie, ce qui signifie que la stratégie contrôle des éléments tels que le placement des variables.
import mnist
with strategy.scope():
# Model building needs to be within `strategy.scope()`.
multi_worker_model = mnist.build_cnn_model()
Partage automatique de vos données entre les travailleurs
Dans la formation multi-travailleurs, le partage de l'ensemble de données n'est pas nécessairement nécessaire, mais il vous donne une sémantique unique qui rend plus de formation plus reproductible, c'est-à-dire que la formation sur plusieurs travailleurs doit être la même que la formation sur un seul travailleur. Remarque : les performances peuvent être affectées dans certains cas.
Voir : distribute_datasets_from_function
per_worker_batch_size = 64
num_workers = len(tf_config['cluster']['worker'])
global_batch_size = per_worker_batch_size * num_workers
with strategy.scope():
multi_worker_dataset = strategy.distribute_datasets_from_function(
lambda input_context: mnist.dataset_fn(global_batch_size, input_context))
Définir une boucle de formation personnalisée et former le modèle
Spécifier un optimiseur
with strategy.scope():
# The creation of optimizer and train_accuracy will need to be in
# `strategy.scope()` as well, since they create variables.
optimizer = tf.keras.optimizers.RMSprop(learning_rate=0.001)
train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
name='train_accuracy')
Définir une étape d'entraînement avec tf.function
@tf.function
def train_step(iterator):
"""Training step function."""
def step_fn(inputs):
"""Per-Replica step function."""
x, y = inputs
with tf.GradientTape() as tape:
predictions = multi_worker_model(x, training=True)
per_batch_loss = tf.keras.losses.SparseCategoricalCrossentropy(
from_logits=True,
reduction=tf.keras.losses.Reduction.NONE)(y, predictions)
loss = tf.nn.compute_average_loss(
per_batch_loss, global_batch_size=global_batch_size)
grads = tape.gradient(loss, multi_worker_model.trainable_variables)
optimizer.apply_gradients(
zip(grads, multi_worker_model.trainable_variables))
train_accuracy.update_state(y, predictions)
return loss
per_replica_losses = strategy.run(step_fn, args=(next(iterator),))
return strategy.reduce(
tf.distribute.ReduceOp.SUM, per_replica_losses, axis=None)
Enregistrement et restauration des points de contrôle
L'implémentation de points de contrôle dans une boucle d'entraînement personnalisée nécessite que l'utilisateur le gère au lieu d'utiliser un rappel keras. Il vous permet de sauvegarder les poids du modèle et de les restaurer sans avoir à sauvegarder le modèle entier.
from multiprocessing import util
checkpoint_dir = os.path.join(util.get_temp_dir(), 'ckpt')
def _is_chief(task_type, task_id, cluster_spec):
return (task_type is None
or task_type == 'chief'
or (task_type == 'worker'
and task_id == 0
and "chief" not in cluster_spec.as_dict()))
def _get_temp_dir(dirpath, task_id):
base_dirpath = 'workertemp_' + str(task_id)
temp_dir = os.path.join(dirpath, base_dirpath)
tf.io.gfile.makedirs(temp_dir)
return temp_dir
def write_filepath(filepath, task_type, task_id, cluster_spec):
dirpath = os.path.dirname(filepath)
base = os.path.basename(filepath)
if not _is_chief(task_type, task_id, cluster_spec):
dirpath = _get_temp_dir(dirpath, task_id)
return os.path.join(dirpath, base)
Ici, vous allez créer un tf.train.Checkpoint
qui suit le modèle, qui est géré par un tf.train.CheckpointManager
afin que seul le dernier point de contrôle soit conservé.
epoch = tf.Variable(
initial_value=tf.constant(0, dtype=tf.dtypes.int64), name='epoch')
step_in_epoch = tf.Variable(
initial_value=tf.constant(0, dtype=tf.dtypes.int64),
name='step_in_epoch')
task_type, task_id = (strategy.cluster_resolver.task_type,
strategy.cluster_resolver.task_id)
# We normally don't need to manually instantiate a ClusterSpec, but in this
# illustrative example we did not set TF_CONFIG before initializing the
# strategy. See the next section for "real-world" usage.
cluster_spec = tf.train.ClusterSpec(tf_config['cluster'])
checkpoint = tf.train.Checkpoint(
model=multi_worker_model, epoch=epoch, step_in_epoch=step_in_epoch)
write_checkpoint_dir = write_filepath(checkpoint_dir, task_type, task_id,
cluster_spec)
checkpoint_manager = tf.train.CheckpointManager(
checkpoint, directory=write_checkpoint_dir, max_to_keep=1)
Maintenant, lorsque vous avez besoin de restaurer, vous pouvez trouver le dernier point de contrôle enregistré à l'aide de la fonction pratique tf.train.latest_checkpoint
.
latest_checkpoint = tf.train.latest_checkpoint(checkpoint_dir)
if latest_checkpoint:
checkpoint.restore(latest_checkpoint)
Après avoir restauré le point de contrôle, vous pouvez continuer à entraîner votre boucle d'entraînement personnalisée.
num_epochs = 3
num_steps_per_epoch = 70
while epoch.numpy() < num_epochs:
iterator = iter(multi_worker_dataset)
total_loss = 0.0
num_batches = 0
while step_in_epoch.numpy() < num_steps_per_epoch:
total_loss += train_step(iterator)
num_batches += 1
step_in_epoch.assign_add(1)
train_loss = total_loss / num_batches
print('Epoch: %d, accuracy: %f, train_loss: %f.'
%(epoch.numpy(), train_accuracy.result(), train_loss))
train_accuracy.reset_states()
# Once the `CheckpointManager` is set up, you're now ready to save, and remove
# the checkpoints non-chief workers saved.
checkpoint_manager.save()
if not _is_chief(task_type, task_id, cluster_spec):
tf.io.gfile.rmtree(write_checkpoint_dir)
epoch.assign_add(1)
step_in_epoch.assign(0)
2022-01-26 05:35:40.200068: W tensorflow/core/framework/dataset.cc:768] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations. Epoch: 0, accuracy: 0.849107, train_loss: 0.491886. Epoch: 1, accuracy: 0.937835, train_loss: 0.197650. Epoch: 2, accuracy: 0.963170, train_loss: 0.129683.
Configuration complète du code sur les travailleurs
Pour exécuter réellement avec MultiWorkerMirroredStrategy
, vous devez exécuter des processus de travail et leur transmettre un TF_CONFIG
.
Comme le fichier mnist.py
écrit précédemment, voici le main.py
qui contient le même code que nous avons parcouru étape par étape précédemment dans ce colab, nous l'écrivons simplement dans un fichier afin que chacun des travailleurs l'exécute :
Fichier: main.py
%%writefile main.py
import os
import json
import tensorflow as tf
import mnist
from multiprocessing import util
per_worker_batch_size = 64
tf_config = json.loads(os.environ['TF_CONFIG'])
num_workers = len(tf_config['cluster']['worker'])
global_batch_size = per_worker_batch_size * num_workers
num_epochs = 3
num_steps_per_epoch=70
# Checkpoint saving and restoring
def _is_chief(task_type, task_id, cluster_spec):
return (task_type is None
or task_type == 'chief'
or (task_type == 'worker'
and task_id == 0
and 'chief' not in cluster_spec.as_dict()))
def _get_temp_dir(dirpath, task_id):
base_dirpath = 'workertemp_' + str(task_id)
temp_dir = os.path.join(dirpath, base_dirpath)
tf.io.gfile.makedirs(temp_dir)
return temp_dir
def write_filepath(filepath, task_type, task_id, cluster_spec):
dirpath = os.path.dirname(filepath)
base = os.path.basename(filepath)
if not _is_chief(task_type, task_id, cluster_spec):
dirpath = _get_temp_dir(dirpath, task_id)
return os.path.join(dirpath, base)
checkpoint_dir = os.path.join(util.get_temp_dir(), 'ckpt')
# Define Strategy
strategy = tf.distribute.MultiWorkerMirroredStrategy()
with strategy.scope():
# Model building/compiling need to be within `strategy.scope()`.
multi_worker_model = mnist.build_cnn_model()
multi_worker_dataset = strategy.distribute_datasets_from_function(
lambda input_context: mnist.dataset_fn(global_batch_size, input_context))
optimizer = tf.keras.optimizers.RMSprop(learning_rate=0.001)
train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
name='train_accuracy')
@tf.function
def train_step(iterator):
"""Training step function."""
def step_fn(inputs):
"""Per-Replica step function."""
x, y = inputs
with tf.GradientTape() as tape:
predictions = multi_worker_model(x, training=True)
per_batch_loss = tf.keras.losses.SparseCategoricalCrossentropy(
from_logits=True,
reduction=tf.keras.losses.Reduction.NONE)(y, predictions)
loss = tf.nn.compute_average_loss(
per_batch_loss, global_batch_size=global_batch_size)
grads = tape.gradient(loss, multi_worker_model.trainable_variables)
optimizer.apply_gradients(
zip(grads, multi_worker_model.trainable_variables))
train_accuracy.update_state(y, predictions)
return loss
per_replica_losses = strategy.run(step_fn, args=(next(iterator),))
return strategy.reduce(
tf.distribute.ReduceOp.SUM, per_replica_losses, axis=None)
epoch = tf.Variable(
initial_value=tf.constant(0, dtype=tf.dtypes.int64), name='epoch')
step_in_epoch = tf.Variable(
initial_value=tf.constant(0, dtype=tf.dtypes.int64),
name='step_in_epoch')
task_type, task_id, cluster_spec = (strategy.cluster_resolver.task_type,
strategy.cluster_resolver.task_id,
strategy.cluster_resolver.cluster_spec())
checkpoint = tf.train.Checkpoint(
model=multi_worker_model, epoch=epoch, step_in_epoch=step_in_epoch)
write_checkpoint_dir = write_filepath(checkpoint_dir, task_type, task_id,
cluster_spec)
checkpoint_manager = tf.train.CheckpointManager(
checkpoint, directory=write_checkpoint_dir, max_to_keep=1)
# Restoring the checkpoint
latest_checkpoint = tf.train.latest_checkpoint(checkpoint_dir)
if latest_checkpoint:
checkpoint.restore(latest_checkpoint)
# Resume our CTL training
while epoch.numpy() < num_epochs:
iterator = iter(multi_worker_dataset)
total_loss = 0.0
num_batches = 0
while step_in_epoch.numpy() < num_steps_per_epoch:
total_loss += train_step(iterator)
num_batches += 1
step_in_epoch.assign_add(1)
train_loss = total_loss / num_batches
print('Epoch: %d, accuracy: %f, train_loss: %f.'
%(epoch.numpy(), train_accuracy.result(), train_loss))
train_accuracy.reset_states()
checkpoint_manager.save()
if not _is_chief(task_type, task_id, cluster_spec):
tf.io.gfile.rmtree(write_checkpoint_dir)
epoch.assign_add(1)
step_in_epoch.assign(0)
Writing main.py
Former et évaluer
Le répertoire courant contient maintenant les deux fichiers Python :
ls *.py
main.py mnist.py
Donc json-sérialisez le TF_CONFIG
et ajoutez-le aux variables d'environnement :
os.environ['TF_CONFIG'] = json.dumps(tf_config)
Maintenant, vous pouvez lancer un processus de travail qui exécutera le main.py
et utilisera TF_CONFIG
:
# first kill any previous runs
%killbgscripts
All background processes were killed.
python main.py &> job_0.log
Il y a quelques points à noter à propos de la commande ci-dessus :
- Il utilise le
%%bash
qui est un bloc- notes "magique" pour exécuter certaines commandes bash. - Il utilise l'indicateur
--bg
pour exécuter le processusbash
en arrière-plan, car ce travailleur ne se terminera pas. Il attend tous les travailleurs avant de commencer.
Le processus de travail en arrière-plan n'imprime pas la sortie sur ce bloc-notes, donc le &>
redirige sa sortie vers un fichier, afin que vous puissiez voir ce qui s'est passé.
Alors, attendez quelques secondes que le processus démarre :
import time
time.sleep(20)
Regardez maintenant ce qui a été généré dans le fichier journal du travailleur jusqu'à présent :
cat job_0.log
2022-01-26 05:35:49.225025: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected 2022-01-26 05:35:49.225297: E tensorflow/stream_executor/cuda/cuda_diagnostics.cc:313] kernel version 470.63.1 does not match DSO version 470.86.0 -- cannot find working devices in this configuration
La dernière ligne du fichier journal doit indiquer : Started server with target: grpc://localhost:12345
. Le premier travailleur est maintenant prêt et attend que tous les autres travailleurs soient prêts à continuer.
Donc, mettez à jour le tf_config
pour que le processus du deuxième travailleur récupère :
tf_config['task']['index'] = 1
os.environ['TF_CONFIG'] = json.dumps(tf_config)
Lancez maintenant le deuxième ouvrier. Cela démarrera la formation puisque tous les travailleurs sont actifs (il n'est donc pas nécessaire de mettre ce processus en arrière-plan) :
python main.py > /dev/null 2>&1
Maintenant, si vous revérifiez les journaux écrits par le premier nœud de calcul, vous verrez qu'il a participé à la formation de ce modèle :
cat job_0.log
2022-01-26 05:35:49.225025: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected 2022-01-26 05:35:49.225297: E tensorflow/stream_executor/cuda/cuda_diagnostics.cc:313] kernel version 470.63.1 does not match DSO version 470.86.0 -- cannot find working devices in this configuration 2022-01-26 05:36:10.343173: W tensorflow/core/framework/dataset.cc:768] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations. Epoch: 0, accuracy: 0.832589, train_loss: 0.531260. Epoch: 1, accuracy: 0.936161, train_loss: 0.214774. Epoch: 2, accuracy: 0.958594, train_loss: 0.140772.
# Delete the `TF_CONFIG`, and kill any background tasks so they don't affect the next section.
os.environ.pop('TF_CONFIG', None)
%killbgscripts
All background processes were killed.
Formation multi-travailleurs approfondie
Ce didacticiel a démontré un flux de travail de Custom Training Loop
de la configuration multi-travailleur. Une description détaillée d'autres sujets est disponible dans le model.fit's guide
de la configuration multi-travailleur et applicable aux CTL.
Voir également
- Le guide Distributed Training in TensorFlow fournit un aperçu des stratégies de distribution disponibles.
- Modèles officiels , dont beaucoup peuvent être configurés pour exécuter plusieurs stratégies de distribution.
- La section Performances du guide fournit des informations sur d'autres stratégies et outils que vous pouvez utiliser pour optimiser les performances de vos modèles TensorFlow.