Entraînement distribué

La formation distribuée est un type de formation modèle dans laquelle les besoins en ressources informatiques (par exemple, CPU, RAM) sont répartis entre plusieurs ordinateurs. La formation distribuée permet de s'entraîner plus rapidement et sur des ensembles de données plus volumineux (jusqu'à quelques milliards d'exemples).

La formation distribuée est également utile pour l'optimisation automatisée des hyper-paramètres où plusieurs modèles sont formés en parallèle.

Dans ce document, vous apprendrez comment :

  • Entraînez un modèle TF-DF à l’aide de la formation distribuée.
  • Ajustez les hyper-paramètres d'un modèle TF-DF à l'aide d'une formation distribuée.

Limites

À partir de maintenant, la formation distribuée est prise en charge pour :

  • Entraînement de modèles d'arbres boostés par dégradé avec tfdf.keras.DistributedGradientBoostedTreesModel . Les modèles d'arbres à gradient boosté distribués sont équivalents à leurs homologues non distribués.
  • Recherche d'hyper-paramètres pour tout type de modèle TF-DF.

Comment activer la formation distribuée

Cette section répertorie les étapes pour activer la formation distribuée. Pour des exemples complets, consultez la section suivante.

Portée ParameterServerStrategy

Le modèle et l'ensemble de données sont définis dans une portée ParameterServerStrategy .

strategy = tf.distribute.experimental.ParameterServerStrategy(...)
with strategy.scope():
  model = tfdf.keras.DistributedGradientBoostedTreesModel()
  distributed_train_dataset = strategy.distribute_datasets_from_function(dataset_fn)
model.fit(distributed_train_dataset)

Format de l'ensemble de données

Comme pour les formations non distribuées, les ensembles de données peuvent être fournis sous forme de

  1. Un ensemble de données distribuées à tensorflow fini, ou
  2. un chemin d'accès aux fichiers de l'ensemble de données en utilisant l'un des formats d'ensemble de données compatibles .

L'utilisation de fichiers fragmentés est nettement plus simple que l'utilisation de l'approche d'ensemble de données distribuées Tensorflow fini (1 ligne contre ~ 20 lignes de code). Cependant, seule l'approche des ensembles de données Tensorflow prend en charge le prétraitement TensorFlow. Si votre pipeline ne contient aucun prétraitement, l'option de jeu de données fragmenté est recommandée.

Dans les deux cas, l’ensemble de données doit être divisé en plusieurs fichiers pour répartir efficacement la lecture de l’ensemble de données.

Travailleurs d'installation

L'un des processus principaux est le programme exécutant le code Python qui définit le modèle TensorFlow. Ce processus n’exécute aucun calcul lourd. Le calcul efficace de la formation est effectué par les travailleurs . Les travailleurs sont des processus exécutant un serveur de paramètres TensorFlow.

Le chef doit être configuré avec l'adresse IP des travailleurs. Cela peut être fait en utilisant la variable d'environnement TF_CONFIG ou en créant un ClusterResolver . Voir Formation du serveur de paramètres avec ParameterServerStrategy pour plus de détails.

ParameterServerStrategy de TensorFlow définit deux types de nœuds de calcul : « travailleurs » et « serveur de paramètres ». TensorFlow nécessite qu'au moins un travailleur de chaque type soit instancié. Cependant, TF-DF n'utilise que des « travailleurs ». Ainsi, un « serveur de paramètres » doit être instancié mais ne sera pas utilisé par TF-DF. Par exemple, la configuration d'une formation TF-DF pourrait ressembler à ceci :

  • 1 chef
  • 50 travailleurs
  • 1 serveur de paramètres

Les travailleurs doivent avoir accès aux opérations de formation personnalisées de TensorFlow Decision Forests. Il existe deux options pour activer l'accès :

  1. Utilisez le serveur de paramètres TF-DF C++ préconfiguré //third_party/tensorflow_decision_forests/tensorflow/distribute:tensorflow_std_server .
  2. Créez un serveur de paramètres en appelant tf.distribute.Server() . Dans ce cas, TF-DF doit être importé import tensorflow_decision_forests .

Exemples

Cette section présente des exemples complets de configurations de formation distribuées. Pour plus d'exemples, consultez les tests unitaires TF-DF .

Exemple : entraînement distribué sur le chemin de l'ensemble de données

Divisez votre ensemble de données en un ensemble de fichiers fragmentés à l'aide de l'un des formats d'ensemble de données compatibles . Il est recommandé de nommer les fichiers comme suit : /path/to/dataset/train-<5 digit index>-of-<total files> , par exemple

/path/to/dataset/train-00000-of-00100
/path/to/dataset/train-00001-of-00005
/path/to/dataset/train-00002-of-00005
...

Pour une efficacité maximale, le nombre de fichiers doit être au moins 10 fois supérieur au nombre de travailleurs. Par exemple, si vous formez avec 100 travailleurs, assurez-vous que l'ensemble de données est divisé en au moins 1 000 fichiers.

Les fichiers peuvent ensuite être référencés avec une expression de partitionnement telle que :

  • /chemin/vers/ensemble de données/train@1000
  • /chemin/vers/ensemble de données/train@*

La formation distribuée se déroule comme suit. Dans cet exemple, l'ensemble de données est stocké en tant que TFRecord d'exemples TensorFlow (défini par la clé tfrecord+tfe ).

import tensorflow_decision_forests as tfdf
import tensorflow as tf

strategy = tf.distribute.experimental.ParameterServerStrategy(...)

with strategy.scope():
  model = tfdf.keras.DistributedGradientBoostedTreesModel()

model.fit_on_dataset_path(
    train_path="/path/to/dataset/train@1000",
    label_key="label_key",
    dataset_format="tfrecord+tfe")

print("Trained model")
model.summary()

Exemple : entraînement distribué sur un ensemble de données distribué TensorFlow fini

TF-DF s'attend à un ensemble de données TensorFlow distribué et fragmenté par un travailleur fini :

  • Distribué : un ensemble de données non distribué est enveloppé dans strategy.distribute_datasets_from_function .
  • finite : l'ensemble de données doit lire chaque exemple exactement une fois. L'ensemble de données ne doit contenir aucune instruction repeat .
  • worker-sharded : chaque travailleur doit lire une partie distincte de l'ensemble de données.

Voici un exemple :

import tensorflow_decision_forests as tfdf
import tensorflow as tf


def dataset_fn(context, paths):
  """Create a worker-sharded finite dataset from paths.

  Like for non-distributed training, each example should be visited exactly
  once (and by only one worker) during the training. In addition, for optimal
  training speed, the reading of the examples should be distributed among the
  workers (instead of being read by a single worker, or read and discarded
  multiple times).

  In other words, don't add a "repeat" statement and make sure to shard the
  dataset at the file level and not at the example level.
  """

  # List the dataset files
  ds_path = tf.data.Dataset.from_tensor_slices(paths)

  # Make sure the dataset is used with distributed training.
  assert context is not None


  # Split the among the workers.
  #
  # Note: The "shard" is applied on the file path. The shard should not be
  # applied on the examples directly.
  # Note: You cannot use 'context.num_input_pipelines' with ParameterServerV2.
  current_worker = tfdf.keras.get_worker_idx_and_num_workers(context)
  ds_path = ds_path.shard(
      num_shards=current_worker.num_workers,
      index=current_worker.worker_idx)

  def read_csv_file(path):
    """Reads a single csv file."""

    numerical = tf.constant([0.0], dtype=tf.float32)
    categorical_string = tf.constant(["NA"], dtype=tf.string)
    csv_columns = [
        numerical,  # feature 1
        categorical_string,  # feature 2
        numerical,  # feature 3
        # ... define the features here.
    ]
    return tf.data.experimental.CsvDataset(path, csv_columns, header=True)

  ds_columns = ds_path.interleave(read_csv_file)

  # We assume a binary classification label with the following possible values.
  label_values = ["<=50K", ">50K"]

  # Convert the text labels into integers:
  # "<=50K" => 0
  # ">50K" => 1
  init_label_table = tf.lookup.KeyValueTensorInitializer(
      keys=tf.constant(label_values),
      values=tf.constant(range(label_values), dtype=tf.int64))
  label_table = tf.lookup.StaticVocabularyTable(
      init_label_table, num_oov_buckets=1)

  def extract_label(*columns):
    return columns[0:-1], label_table.lookup(columns[-1])

  ds_dataset = ds_columns.map(extract_label)

  # The batch size has no impact on the quality of the model. However, a larger
  # batch size generally is faster.
  ds_dataset = ds_dataset.batch(500)
  return ds_dataset


strategy = tf.distribute.experimental.ParameterServerStrategy(...)
with strategy.scope():
  model = tfdf.keras.DistributedGradientBoostedTreesModel()

  train_dataset = strategy.distribute_datasets_from_function(
      lambda context: dataset_fn(context, [...list of csv files...])
  )

model.fit(train_dataset)

print("Trained model")
model.summary()

Exemple : réglage distribué des hyperparamètres sur un chemin d'ensemble de données

Le réglage distribué des hyperparamètres sur un chemin d'ensemble de données est similaire à la formation distribuée. La seule différence est que cette option est compatible avec les modèles non distribués. Par exemple, vous pouvez distribuer le réglage des hyperparamètres du modèle (non distribué) Gradient Boosted Trees.

with strategy.scope():
  tuner = tfdf.tuner.RandomSearch(num_trials=30, use_predefined_hps=True)
  model = tfdf.keras.GradientBoostedTreesModel(tuner=tuner)

training_history = model.fit_on_dataset_path(
  train_path=train_path,
  label_key=label,
  dataset_format="csv",
  valid_path=test_path)

logging.info("Trained model:")
model.summary()

Exemple : tests unitaires

Pour tester unitairement une formation distribuée, vous pouvez créer des processus de travail simulés. Voir la méthode _create_in_process_tf_ps_cluster dans les tests unitaires TF-DF pour plus d'informations.