Ciclo di formazione personalizzato con Keras e MultiWorkerMirroredStrategy

Visualizza su TensorFlow.org Esegui in Google Colab Visualizza l'origine su GitHub Scarica quaderno

Panoramica

Questo tutorial illustra la formazione per più lavoratori con l'API del ciclo di formazione personalizzata, distribuita tramite MultiWorkerMirroredStrategy, in modo che un modello Keras progettato per essere eseguito su un singolo lavoratore possa funzionare senza problemi su più lavoratori con una modifica minima del codice.

Utilizziamo cicli di allenamento personalizzati per addestrare il nostro modello perché ci danno flessibilità e un maggiore controllo sull'allenamento. Inoltre, è più semplice eseguire il debug del modello e del ciclo di addestramento. Informazioni più dettagliate sono disponibili in Scrittura di un ciclo di formazione da zero .

Se stai cercando come utilizzare MultiWorkerMirroredStrategy con keras model.fit , fai invece riferimento a questo tutorial .

La guida Distributed Training in TensorFlow è disponibile per una panoramica delle strategie di distribuzione supportate da TensorFlow per coloro che sono interessati a una comprensione più approfondita delle API tf.distribute.Strategy .

Impostare

Innanzitutto, alcune importazioni necessarie.

import json
import os
import sys

Prima di importare TensorFlow, apportare alcune modifiche all'ambiente.

Disabilita tutte le GPU. Ciò previene gli errori causati dai lavoratori che tentano tutti di utilizzare la stessa GPU. Per un'applicazione reale ogni lavoratore si troverebbe su una macchina diversa.

os.environ["CUDA_VISIBLE_DEVICES"] = "-1"

Reimposta la variabile di ambiente TF_CONFIG , ne vedrai di più in seguito.

os.environ.pop('TF_CONFIG', None)

Assicurati che la directory corrente sia sul percorso di Python. Ciò consente al notebook di importare i file scritti da %%writefile secondo momento.

if '.' not in sys.path:
  sys.path.insert(0, '.')

Ora importa TensorFlow.

import tensorflow as tf

Dataset e definizione del modello

Quindi crea un file mnist.py con un modello semplice e una configurazione del set di dati. Questo file python verrà utilizzato dai processi di lavoro in questo tutorial:

%%writefile mnist.py

import os
import tensorflow as tf
import numpy as np

def mnist_dataset(batch_size):
  (x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
  # The `x` arrays are in uint8 and have values in the range [0, 255].
  # You need to convert them to float32 with values in the range [0, 1]
  x_train = x_train / np.float32(255)
  y_train = y_train.astype(np.int64)
  train_dataset = tf.data.Dataset.from_tensor_slices(
      (x_train, y_train)).shuffle(60000)
  return train_dataset

def dataset_fn(global_batch_size, input_context):
  batch_size = input_context.get_per_replica_batch_size(global_batch_size)
  dataset = mnist_dataset(batch_size)
  dataset = dataset.shard(input_context.num_input_pipelines,
                          input_context.input_pipeline_id)
  dataset = dataset.batch(batch_size)
  return dataset

def build_cnn_model():
  return tf.keras.Sequential([
      tf.keras.Input(shape=(28, 28)),
      tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
      tf.keras.layers.Conv2D(32, 3, activation='relu'),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(128, activation='relu'),
      tf.keras.layers.Dense(10)
  ])
Writing mnist.py

Configurazione multi-operatore

Entriamo ora nel mondo della formazione multi-lavoratore. In TensorFlow, la variabile di ambiente TF_CONFIG è richiesta per l'addestramento su più macchine, ognuna delle quali può avere un ruolo diverso. TF_CONFIG utilizzata di seguito è una stringa JSON utilizzata per specificare la configurazione del cluster su ogni lavoratore che fa parte del cluster. Questo è il metodo predefinito per specificare un cluster, utilizzando cluster_resolver.TFConfigClusterResolver , ma sono disponibili altre opzioni nel modulo distribute.cluster_resolver .

Descrivi il tuo cluster

Ecco un esempio di configurazione:

tf_config = {
    'cluster': {
        'worker': ['localhost:12345', 'localhost:23456']
    },
    'task': {'type': 'worker', 'index': 0}
}

Ecco lo stesso TF_CONFIG serializzato come stringa JSON:

json.dumps(tf_config)
'{"cluster": {"worker": ["localhost:12345", "localhost:23456"]}, "task": {"type": "worker", "index": 0} }'

Ci sono due componenti di TF_CONFIG : cluster e task .

  • cluster è lo stesso per tutti i lavoratori e fornisce informazioni sul cluster di formazione, che è un dict costituito da diversi tipi di lavoro come worker . Nella formazione per più lavoratori con MultiWorkerMirroredStrategy , di solito c'è un worker che si assume un po' più di responsabilità come il salvataggio del checkpoint e la scrittura di un file di riepilogo per TensorBoard oltre a ciò che fa un normale worker . Tale lavoratore è indicato come chief lavoratore ed è consuetudine che il worker con index 0 sia nominato capo worker (infatti è così che viene implementato tf.distribute.Strategy ).

  • l' task fornisce informazioni sull'attività corrente ed è diversa per ogni lavoratore. Specifica il type e l' index di quel lavoratore.

In questo esempio, imposti il type di attività su "worker" e l' index di attività su 0 . Questa macchina è il primo lavoratore e sarà nominato capo lavoratore e farà più lavoro degli altri. Si noti che anche altre macchine dovranno avere la variabile di ambiente TF_CONFIG impostata, e dovrebbe avere lo stesso cluster dict, ma diverso type di attività o index di attività a seconda dei ruoli di quelle macchine.

A scopo illustrativo, questo tutorial mostra come è possibile impostare un TF_CONFIG con 2 worker su localhost . In pratica, gli utenti creerebbero più lavoratori su indirizzi/porte IP esterni e imposterebbero TF_CONFIG su ciascun lavoratore in modo appropriato.

In questo esempio utilizzerai 2 lavoratori, il TF_CONFIG del primo lavoratore è mostrato sopra. Per il secondo lavoratore devi impostare tf_config['task']['index']=1

Sopra, tf_config è solo una variabile locale in python. Per utilizzarlo effettivamente per configurare l'addestramento, questo dizionario deve essere serializzato come JSON e inserito nella variabile di ambiente TF_CONFIG .

Variabili d'ambiente e sottoprocessi nei notebook

I sottoprocessi ereditano le variabili di ambiente dal loro genitore. Quindi, se imposti una variabile di ambiente in questo processo jupyter notebook :

os.environ['GREETINGS'] = 'Hello TensorFlow!'

È possibile accedere alla variabile di ambiente da un sottoprocesso:

echo ${GREETINGS}
Hello TensorFlow!

Nella prossima sezione, lo utilizzerai per passare TF_CONFIG ai sottoprocessi di lavoro. Non avvieresti mai i tuoi lavori in questo modo, ma è sufficiente per gli scopi di questo tutorial: Per dimostrare un esempio minimo di più lavoratori.

MultiWorkerMirroredStrategy

Per addestrare il modello, usa un'istanza di tf.distribute.MultiWorkerMirroredStrategy , che crea copie di tutte le variabili nei livelli del modello su ogni dispositivo in tutti i worker. La guida tf.distribute.Strategy contiene maggiori dettagli su questa strategia.

strategy = tf.distribute.MultiWorkerMirroredStrategy()
2022-01-26 05:35:39.353025: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:CPU:0',), communication = CommunicationImplementation.AUTO
2022-01-26 05:35:39.353298: E tensorflow/stream_executor/cuda/cuda_diagnostics.cc:313] kernel version 470.63.1 does not match DSO version 470.86.0 -- cannot find working devices in this configuration

Utilizzare tf.distribute.Strategy.scope per specificare che deve essere utilizzata una strategia durante la creazione del modello. Questo ti mette nel " contesto di replica incrociata " per questa strategia, il che significa che la strategia ha il controllo di cose come il posizionamento delle variabili.

import mnist
with strategy.scope():
  # Model building needs to be within `strategy.scope()`.
  multi_worker_model = mnist.build_cnn_model()

Condivisione automatica dei dati tra i dipendenti

Nella formazione multi-lavoratore, lo sharding del set di dati non è necessariamente necessario, tuttavia fornisce una semantica esatta che rende più riproducibile una formazione maggiore, ad esempio la formazione su più lavoratori dovrebbe essere uguale alla formazione su un lavoratore. Nota: le prestazioni possono essere influenzate in alcuni casi.

Vedi: distribute_datasets_from_function

per_worker_batch_size = 64
num_workers = len(tf_config['cluster']['worker'])
global_batch_size = per_worker_batch_size * num_workers

with strategy.scope():
  multi_worker_dataset = strategy.distribute_datasets_from_function(
      lambda input_context: mnist.dataset_fn(global_batch_size, input_context))

Definisci il ciclo di addestramento personalizzato e addestra il modello

Specificare un ottimizzatore

with strategy.scope():
  # The creation of optimizer and train_accuracy will need to be in
  # `strategy.scope()` as well, since they create variables.
  optimizer = tf.keras.optimizers.RMSprop(learning_rate=0.001)
  train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
      name='train_accuracy')

Definisci una fase di allenamento con tf.function

@tf.function
def train_step(iterator):
  """Training step function."""

  def step_fn(inputs):
    """Per-Replica step function."""
    x, y = inputs
    with tf.GradientTape() as tape:
      predictions = multi_worker_model(x, training=True)
      per_batch_loss = tf.keras.losses.SparseCategoricalCrossentropy(
          from_logits=True,
          reduction=tf.keras.losses.Reduction.NONE)(y, predictions)
      loss = tf.nn.compute_average_loss(
          per_batch_loss, global_batch_size=global_batch_size)

    grads = tape.gradient(loss, multi_worker_model.trainable_variables)
    optimizer.apply_gradients(
        zip(grads, multi_worker_model.trainable_variables))
    train_accuracy.update_state(y, predictions)
    return loss

  per_replica_losses = strategy.run(step_fn, args=(next(iterator),))
  return strategy.reduce(
      tf.distribute.ReduceOp.SUM, per_replica_losses, axis=None)

Salvataggio e ripristino del checkpoint

L'implementazione del checkpoint in un ciclo di formazione personalizzato richiede che l'utente lo gestisca invece di usare un callback keras. Consente di salvare i pesi del modello e ripristinarli senza dover salvare l'intero modello.

from multiprocessing import util
checkpoint_dir = os.path.join(util.get_temp_dir(), 'ckpt')

def _is_chief(task_type, task_id, cluster_spec):
  return (task_type is None
          or task_type == 'chief'
          or (task_type == 'worker'
              and task_id == 0
              and "chief" not in cluster_spec.as_dict()))

def _get_temp_dir(dirpath, task_id):
  base_dirpath = 'workertemp_' + str(task_id)
  temp_dir = os.path.join(dirpath, base_dirpath)
  tf.io.gfile.makedirs(temp_dir)
  return temp_dir

def write_filepath(filepath, task_type, task_id, cluster_spec):
  dirpath = os.path.dirname(filepath)
  base = os.path.basename(filepath)
  if not _is_chief(task_type, task_id, cluster_spec):
    dirpath = _get_temp_dir(dirpath, task_id)
  return os.path.join(dirpath, base)

Qui creerai un tf.train.Checkpoint che tiene traccia del modello, che è gestito da un tf.train.CheckpointManager in modo che venga conservato solo il checkpoint più recente.

epoch = tf.Variable(
    initial_value=tf.constant(0, dtype=tf.dtypes.int64), name='epoch')
step_in_epoch = tf.Variable(
    initial_value=tf.constant(0, dtype=tf.dtypes.int64),
    name='step_in_epoch')
task_type, task_id = (strategy.cluster_resolver.task_type,
                      strategy.cluster_resolver.task_id)
# We normally don't need to manually instantiate a ClusterSpec, but in this 
# illustrative example we did not set TF_CONFIG before initializing the
# strategy. See the next section for "real-world" usage.
cluster_spec = tf.train.ClusterSpec(tf_config['cluster'])

checkpoint = tf.train.Checkpoint(
    model=multi_worker_model, epoch=epoch, step_in_epoch=step_in_epoch)

write_checkpoint_dir = write_filepath(checkpoint_dir, task_type, task_id,
                                      cluster_spec)
checkpoint_manager = tf.train.CheckpointManager(
    checkpoint, directory=write_checkpoint_dir, max_to_keep=1)

Ora, quando devi ripristinare, puoi trovare l'ultimo checkpoint salvato utilizzando la comoda funzione tf.train.latest_checkpoint .

latest_checkpoint = tf.train.latest_checkpoint(checkpoint_dir)
if latest_checkpoint:
  checkpoint.restore(latest_checkpoint)

Dopo aver ripristinato il checkpoint, puoi continuare ad addestrare il tuo ciclo di allenamento personalizzato.

num_epochs = 3
num_steps_per_epoch = 70

while epoch.numpy() < num_epochs:
  iterator = iter(multi_worker_dataset)
  total_loss = 0.0
  num_batches = 0

  while step_in_epoch.numpy() < num_steps_per_epoch:
    total_loss += train_step(iterator)
    num_batches += 1
    step_in_epoch.assign_add(1)

  train_loss = total_loss / num_batches
  print('Epoch: %d, accuracy: %f, train_loss: %f.'
                %(epoch.numpy(), train_accuracy.result(), train_loss))

  train_accuracy.reset_states()

  # Once the `CheckpointManager` is set up, you're now ready to save, and remove
  # the checkpoints non-chief workers saved.
  checkpoint_manager.save()
  if not _is_chief(task_type, task_id, cluster_spec):
    tf.io.gfile.rmtree(write_checkpoint_dir)

  epoch.assign_add(1)
  step_in_epoch.assign(0)
2022-01-26 05:35:40.200068: W tensorflow/core/framework/dataset.cc:768] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.
Epoch: 0, accuracy: 0.849107, train_loss: 0.491886.
Epoch: 1, accuracy: 0.937835, train_loss: 0.197650.
Epoch: 2, accuracy: 0.963170, train_loss: 0.129683.

Configurazione del codice completo sui lavoratori

Per eseguire effettivamente MultiWorkerMirroredStrategy dovrai eseguire i processi di lavoro e passare loro un TF_CONFIG .

Come il file mnist.py scritto in precedenza, ecco il main.py che contiene lo stesso codice che abbiamo esaminato passo dopo passo in precedenza in questa colab, lo stiamo semplicemente scrivendo in un file in modo che ciascuno dei lavoratori lo eseguirà:

File: main.py

Writing main.py

Allena e valuta

La directory corrente ora contiene entrambi i file Python:

ls *.py
main.py
mnist.py

Quindi json-serializza TF_CONFIG e aggiungilo alle variabili di ambiente:

os.environ['TF_CONFIG'] = json.dumps(tf_config)

Ora puoi avviare un processo di lavoro che eseguirà main.py e utilizzerà TF_CONFIG :

# first kill any previous runs
%killbgscripts
All background processes were killed.
python main.py &> job_0.log

Ci sono alcune cose da notare sul comando precedente:

  1. Usa %%bash che è una "magia" del notebook per eseguire alcuni comandi bash.
  2. Utilizza il flag --bg per eseguire il processo bash in background, perché questo lavoratore non verrà terminato. Aspetta tutti i lavoratori prima di iniziare.

Il processo di lavoro in background non stamperà l'output su questo notebook, quindi &> reindirizza il suo output a un file, in modo da poter vedere cosa è successo.

Quindi, attendi qualche secondo affinché il processo si avvii:

import time
time.sleep(20)

Ora guarda cosa è stato prodotto finora nel file di registro del lavoratore:

cat job_0.log
2022-01-26 05:35:49.225025: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2022-01-26 05:35:49.225297: E tensorflow/stream_executor/cuda/cuda_diagnostics.cc:313] kernel version 470.63.1 does not match DSO version 470.86.0 -- cannot find working devices in this configuration

L'ultima riga del file di registro dovrebbe dire: Started server with target: grpc://localhost:12345 . Il primo lavoratore è ora pronto e attende che tutti gli altri lavoratori siano pronti per procedere.

Quindi aggiorna tf_config affinché il processo del secondo lavoratore raccolga:

tf_config['task']['index'] = 1
os.environ['TF_CONFIG'] = json.dumps(tf_config)

Ora avvia il secondo lavoratore. Questo avvierà la formazione poiché tutti i lavoratori sono attivi (quindi non è necessario eseguire il background di questo processo):

python main.py > /dev/null 2>&1

Ora se ricontrolli i log scritti dal primo lavoratore vedrai che ha partecipato all'addestramento di quel modello:

cat job_0.log
2022-01-26 05:35:49.225025: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2022-01-26 05:35:49.225297: E tensorflow/stream_executor/cuda/cuda_diagnostics.cc:313] kernel version 470.63.1 does not match DSO version 470.86.0 -- cannot find working devices in this configuration
2022-01-26 05:36:10.343173: W tensorflow/core/framework/dataset.cc:768] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.
Epoch: 0, accuracy: 0.832589, train_loss: 0.531260.
Epoch: 1, accuracy: 0.936161, train_loss: 0.214774.
Epoch: 2, accuracy: 0.958594, train_loss: 0.140772.
# Delete the `TF_CONFIG`, and kill any background tasks so they don't affect the next section.
os.environ.pop('TF_CONFIG', None)
%killbgscripts
All background processes were killed.

Formazione multi-lavoratore approfondita

Questo tutorial ha dimostrato un flusso di lavoro Custom Training Loop della configurazione multi-lavoratore. Una descrizione dettagliata di altri argomenti è disponibile nella model.fit's guide configurazione multi-lavoratore e applicabile ai CTL.

Guarda anche

  1. Formazione distribuita nella guida TensorFlow fornisce una panoramica delle strategie di distribuzione disponibili.
  2. Modelli ufficiali , molti dei quali possono essere configurati per eseguire più strategie di distribuzione.
  3. La sezione Prestazioni nella guida fornisce informazioni su altre strategie e strumenti che puoi utilizzare per ottimizzare le prestazioni dei tuoi modelli TensorFlow.