Entrenamiento Distribuido

El entrenamiento distribuido es un tipo de entrenamiento modelo en el que los requisitos de recursos informáticos (p. ej., CPU, RAM) se distribuyen entre varios ordenadores. El entrenamiento distribuido permite entrenar más rápido y en conjuntos de datos más grandes (hasta unos pocos miles de millones de ejemplos).

El entrenamiento distribuido también es útil para la optimización automatizada de hiperparámetros donde se entrenan varios modelos en paralelo.

En este documento aprenderá a:

  • Entrene un modelo TF-DF usando entrenamiento distribuido.
  • Ajuste los hiperparámetros de un modelo TF-DF mediante entrenamiento distribuido.

Limitaciones

A partir de ahora, la capacitación distribuida es compatible con:

  • Entrenamiento de modelos Gradient Boosted Trees con tfdf.keras.DistributedGradientBoostedTreesModel . Los modelos Distributed Gradient Boosted Trees son equivalentes a sus contrapartes no distribuidas.
  • Búsqueda de hiperparámetros para cualquier tipo de modelo TF-DF.

Cómo habilitar el entrenamiento distribuido

Esta sección enumera los pasos para habilitar el entrenamiento distribuido. Para ver ejemplos completos, consulte la siguiente sección.

Alcance de ParameterServerStrategy

El modelo y el conjunto de datos se definen en un ámbito ParameterServerStrategy .

strategy = tf.distribute.experimental.ParameterServerStrategy(...)
with strategy.scope():
  model = tfdf.keras.DistributedGradientBoostedTreesModel()
  distributed_train_dataset = strategy.distribute_datasets_from_function(dataset_fn)
model.fit(distributed_train_dataset)

Formato del conjunto de datos

Al igual que para la formación no distribuida, los conjuntos de datos se pueden proporcionar como

  1. Un conjunto de datos distribuido de flujo de tensor finito, o
  2. una ruta a los archivos del conjunto de datos utilizando uno de los formatos de conjuntos de datos compatibles .

Usar archivos fragmentados es significativamente más simple que usar el enfoque de conjuntos de datos distribuidos de flujo tensor finito (1 línea frente a ~20 líneas de código). Sin embargo, solo el enfoque del conjunto de datos de tensorflow admite el preprocesamiento de TensorFlow. Si su canalización no contiene ningún procesamiento previo, se recomienda la opción de conjunto de datos fragmentado.

En ambos casos, el conjunto de datos debe fragmentarse en varios archivos para distribuir la lectura del conjunto de datos de manera eficiente.

Trabajadores de configuración

Un proceso principal es el programa que ejecuta el código Python que define el modelo TensorFlow. Este proceso no está ejecutando ningún cálculo pesado. El cómputo de la formación efectiva lo realizan los trabajadores . Los trabajadores son procesos que ejecutan un servidor de parámetros de TensorFlow.

El jefe debe configurarse con la dirección IP de los trabajadores. Esto se puede hacer usando la variable de entorno TF_CONFIG o creando un ClusterResolver . Consulte Entrenamiento del servidor de parámetros con ParameterServerStrategy para obtener más detalles.

ParameterServerStrategy de TensorFlow define dos tipos de trabajadores: "trabajadores" y "servidor de parámetros". TensorFlow requiere que se cree una instancia de al menos uno de cada tipo de trabajador. Sin embargo, TF-DF solo usa "trabajadores". Por lo tanto, se debe crear una instancia de un "servidor de parámetros", pero TF-DF no lo utilizará. Por ejemplo, la configuración de un entrenamiento TF-DF podría tener el siguiente aspecto:

  • 1 jefe
  • 50 trabajadores
  • 1 servidor de parámetros

Los trabajadores necesitan acceso a las operaciones de capacitación personalizadas de TensorFlow Decision Forests. Hay dos opciones para habilitar el acceso:

  1. Utilice el servidor de parámetros TF-DF C++ preconfigurado //third_party/tensorflow_decision_forests/tensorflow/distribute:tensorflow_std_server .
  2. Cree un servidor de parámetros llamando tf.distribute.Server() . En este caso, se debe importar TF-DF import tensorflow_decision_forests .

Ejemplos

Esta sección muestra ejemplos completos de configuraciones de entrenamiento distribuidas. Para ver más ejemplos, consulte las pruebas unitarias de TF-DF .

Ejemplo: entrenamiento distribuido en la ruta del conjunto de datos

Divida su conjunto de datos en un conjunto de archivos fragmentados utilizando uno de los formatos de conjuntos de datos compatibles . Se recomienda nombrar los archivos de la siguiente manera: /path/to/dataset/train-<5 digit index>-of-<total files> , por ejemplo

/path/to/dataset/train-00000-of-00100
/path/to/dataset/train-00001-of-00005
/path/to/dataset/train-00002-of-00005
...

Para una máxima eficiencia, la cantidad de archivos debe ser al menos 10 veces mayor que la cantidad de trabajadores. Por ejemplo, si está capacitando a 100 trabajadores, asegúrese de que el conjunto de datos esté dividido en al menos 1000 archivos.

A continuación, se puede hacer referencia a los archivos con una expresión de fragmentación como:

  • /ruta/al/conjunto de datos/tren@1000
  • /ruta/al/conjunto de datos/tren@*

El entrenamiento distribuido se realiza de la siguiente manera. En este ejemplo, el conjunto de datos se almacena como un TFRecord de ejemplos de TensorFlow (definido por la clave tfrecord+tfe ).

import tensorflow_decision_forests as tfdf
import tensorflow as tf

strategy = tf.distribute.experimental.ParameterServerStrategy(...)

with strategy.scope():
  model = tfdf.keras.DistributedGradientBoostedTreesModel()

model.fit_on_dataset_path(
    train_path="/path/to/dataset/train@1000",
    label_key="label_key",
    dataset_format="tfrecord+tfe")

print("Trained model")
model.summary()

Ejemplo: entrenamiento distribuido en un conjunto de datos distribuido finito de TensorFlow

TF-DF espera un conjunto de datos de TensorFlow fragmentado por trabajador finito distribuido:

  • Distribuido : un conjunto de datos no distribuido se envuelve en strategy.distribute_datasets_from_function .
  • finito : el conjunto de datos debe leer cada ejemplo exactamente una vez. El conjunto de datos no debe contener instrucciones repeat .
  • worker-sharded : cada trabajador debe leer una parte separada del conjunto de datos.

Aquí hay un ejemplo:

import tensorflow_decision_forests as tfdf
import tensorflow as tf


def dataset_fn(context, paths):
  """Create a worker-sharded finite dataset from paths.

  Like for non-distributed training, each example should be visited exactly
  once (and by only one worker) during the training. In addition, for optimal
  training speed, the reading of the examples should be distributed among the
  workers (instead of being read by a single worker, or read and discarded
  multiple times).

  In other words, don't add a "repeat" statement and make sure to shard the
  dataset at the file level and not at the example level.
  """

  # List the dataset files
  ds_path = tf.data.Dataset.from_tensor_slices(paths)

  # Make sure the dataset is used with distributed training.
  assert context is not None


  # Split the among the workers.
  #
  # Note: The "shard" is applied on the file path. The shard should not be
  # applied on the examples directly.
  # Note: You cannot use 'context.num_input_pipelines' with ParameterServerV2.
  current_worker = tfdf.keras.get_worker_idx_and_num_workers(context)
  ds_path = ds_path.shard(
      num_shards=current_worker.num_workers,
      index=current_worker.worker_idx)

  def read_csv_file(path):
    """Reads a single csv file."""

    numerical = tf.constant([0.0], dtype=tf.float32)
    categorical_string = tf.constant(["NA"], dtype=tf.string)
    csv_columns = [
        numerical,  # feature 1
        categorical_string,  # feature 2
        numerical,  # feature 3
        # ... define the features here.
    ]
    return tf.data.experimental.CsvDataset(path, csv_columns, header=True)

  ds_columns = ds_path.interleave(read_csv_file)

  # We assume a binary classification label with the following possible values.
  label_values = ["<=50K", ">50K"]

  # Convert the text labels into integers:
  # "<=50K" => 0
  # ">50K" => 1
  init_label_table = tf.lookup.KeyValueTensorInitializer(
      keys=tf.constant(label_values),
      values=tf.constant(range(label_values), dtype=tf.int64))
  label_table = tf.lookup.StaticVocabularyTable(
      init_label_table, num_oov_buckets=1)

  def extract_label(*columns):
    return columns[0:-1], label_table.lookup(columns[-1])

  ds_dataset = ds_columns.map(extract_label)

  # The batch size has no impact on the quality of the model. However, a larger
  # batch size generally is faster.
  ds_dataset = ds_dataset.batch(500)
  return ds_dataset


strategy = tf.distribute.experimental.ParameterServerStrategy(...)
with strategy.scope():
  model = tfdf.keras.DistributedGradientBoostedTreesModel()

  train_dataset = strategy.distribute_datasets_from_function(
      lambda context: dataset_fn(context, [...list of csv files...])
  )

model.fit(train_dataset)

print("Trained model")
model.summary()

Ejemplo: ajuste de hiperparámetro distribuido en una ruta de conjunto de datos

El ajuste de hiperparámetros distribuidos en una ruta de conjunto de datos es similar al entrenamiento distribuido. La única diferencia es que esta opción es compatible con modelos no distribuidos. Por ejemplo, puede distribuir el ajuste de hiperparámetros del modelo Gradient Boosted Trees (no distribuido).

with strategy.scope():
  tuner = tfdf.tuner.RandomSearch(num_trials=30, use_predefined_hps=True)
  model = tfdf.keras.GradientBoostedTreesModel(tuner=tuner)

training_history = model.fit_on_dataset_path(
  train_path=train_path,
  label_key=label,
  dataset_format="csv",
  valid_path=test_path)

logging.info("Trained model:")
model.summary()

Ejemplo: prueba unitaria

Para realizar pruebas unitarias de capacitación distribuida, puede crear procesos de trabajo simulados. Consulte el método _create_in_process_tf_ps_cluster en las pruebas unitarias de TF-DF para obtener más información.