Treinamento distribuído

O treinamento distribuído é um tipo de treinamento de modelo em que os requisitos de recursos de computação (por exemplo, CPU, RAM) são distribuídos entre vários computadores. O treinamento distribuído permite treinar mais rapidamente e em conjuntos de dados maiores (até alguns bilhões de exemplos).

O treinamento distribuído também é útil para otimização automatizada de hiperparâmetros em que vários modelos são treinados em paralelo.

Neste documento, você aprenderá como:

  • Treine um modelo TF-DF usando treinamento distribuído.
  • Ajuste os hiperparâmetros de um modelo TF-DF usando treinamento distribuído.

Limitações

A partir de agora, o treinamento distribuído é suportado para:

  • Modelos de árvores com aumento de gradiente de treinamento com tfdf.keras.DistributedGradientBoostedTreesModel . Os modelos Distributed Gradient Boosted Trees são equivalentes às suas contrapartes não distribuídas.
  • Pesquisa de hiperparâmetros para qualquer tipo de modelo TF-DF.

Como ativar o treinamento distribuído

Esta seção lista as etapas para ativar o treinamento distribuído. Para exemplos completos, consulte a próxima seção.

Escopo ParameterServerStrategy

O modelo e o conjunto de dados são definidos em um escopo ParameterServerStrategy .

strategy = tf.distribute.experimental.ParameterServerStrategy(...)
with strategy.scope():
  model = tfdf.keras.DistributedGradientBoostedTreesModel()
  distributed_train_dataset = strategy.distribute_datasets_from_function(dataset_fn)
model.fit(distributed_train_dataset)

Formato do conjunto de dados

Como no treinamento não distribuído, os conjuntos de dados podem ser fornecidos como

  1. Um conjunto de dados distribuído por tensorflow finito, ou
  2. um caminho para os arquivos de conjunto de dados usando um dos formatos de conjunto de dados compatíveis .

O uso de arquivos fragmentados é significativamente mais simples do que o uso da abordagem de conjunto de dados distribuído por tensorflow finito (1 linha x aproximadamente 20 linhas de código). No entanto, apenas a abordagem do conjunto de dados do tensorflow oferece suporte ao pré-processamento do TensorFlow. Se o pipeline não contiver nenhum pré-processamento, a opção de conjunto de dados fragmentado é recomendada.

Em ambos os casos, o conjunto de dados deve ser fragmentado em vários arquivos para distribuir a leitura do conjunto de dados com eficiência.

Funcionários de configuração

Um processo principal é o programa que executa o código python que define o modelo do TensorFlow. Este processo não está executando nenhuma computação pesada. O cálculo efetivo do treinamento é feito pelos trabalhadores . Os workers são processos que executam um servidor de parâmetros do TensorFlow.

O chefe deve ser configurado com o endereço IP dos trabalhadores. Isso pode ser feito usando a variável de ambiente TF_CONFIG ou criando um ClusterResolver . Consulte Treinamento do servidor de parâmetros com ParameterServerStrategy para obter mais detalhes.

O ParameterServerStrategy do TensorFlow define dois tipos de trabalhadores: "trabalhadores" e "servidor de parâmetros". O TensorFlow requer que pelo menos um de cada tipo de trabalhador seja instanciado. No entanto, o TF-DF usa apenas "trabalhadores". Portanto, um "servidor de parâmetros" precisa ser instanciado, mas não será usado pelo TF-DF. Por exemplo, a configuração de um treinamento TF-DF pode ser a seguinte:

  • 1 chefe
  • 50 Trabalhadores
  • 1 servidor de parâmetros

Os trabalhadores precisam de acesso às operações de treinamento personalizadas do TensorFlow Decision Forests. Existem duas opções para habilitar o acesso:

  1. Use o servidor de parâmetros TF-DF C++ pré-configurado //third_party/tensorflow_decision_forests/tensorflow/distribute:tensorflow_std_server .
  2. Crie um servidor de parâmetros chamando tf.distribute.Server() . Nesse caso, o TF-DF deve ser importado import tensorflow_decision_forests .

Exemplos

Esta seção mostra exemplos completos de configurações de treinamento distribuído. Para mais exemplos, verifique os testes de unidade TF-DF .

Exemplo: treinamento distribuído no caminho do conjunto de dados

Divida seu conjunto de dados em um conjunto de arquivos fragmentados usando um dos formatos de conjunto de dados compatíveis . Recomenda-se nomear os arquivos da seguinte forma: /path/to/dataset/train-<5 digit index>-of-<total files> , por exemplo

/path/to/dataset/train-00000-of-00100
/path/to/dataset/train-00001-of-00005
/path/to/dataset/train-00002-of-00005
...

Para máxima eficiência, o número de arquivos deve ser pelo menos 10x o número de trabalhadores. Por exemplo, se você estiver treinando com 100 funcionários, verifique se o conjunto de dados está dividido em pelo menos 1.000 arquivos.

Os arquivos podem então ser referenciados com uma expressão de sharding como:

  • /caminho/para/conjunto de dados/trem@1000
  • /caminho/para/conjunto de dados/trem@*

O treinamento distribuído é feito da seguinte maneira. Neste exemplo, o conjunto de dados é armazenado como um TFRecord de exemplos do TensorFlow (definido pela chave tfrecord+tfe ).

import tensorflow_decision_forests as tfdf
import tensorflow as tf

strategy = tf.distribute.experimental.ParameterServerStrategy(...)

with strategy.scope():
  model = tfdf.keras.DistributedGradientBoostedTreesModel()

model.fit_on_dataset_path(
    train_path="/path/to/dataset/train@1000",
    label_key="label_key",
    dataset_format="tfrecord+tfe")

print("Trained model")
model.summary()

Exemplo: treinamento distribuído em um conjunto de dados distribuído finito do TensorFlow

O TF-DF espera um conjunto de dados do TensorFlow distribuído finito e fragmentado por trabalhador:

  • Distribuído : um conjunto de dados não distribuído é agrupado em strategy.distribute_datasets_from_function .
  • finito : o conjunto de dados deve ler cada exemplo exatamente uma vez. O conjunto de dados não deve conter nenhuma instrução repeat .
  • worker-sharded : Cada trabalhador deve ler uma parte separada do conjunto de dados.

Aqui está um exemplo:

import tensorflow_decision_forests as tfdf
import tensorflow as tf


def dataset_fn(context, paths):
  """Create a worker-sharded finite dataset from paths.

  Like for non-distributed training, each example should be visited exactly
  once (and by only one worker) during the training. In addition, for optimal
  training speed, the reading of the examples should be distributed among the
  workers (instead of being read by a single worker, or read and discarded
  multiple times).

  In other words, don't add a "repeat" statement and make sure to shard the
  dataset at the file level and not at the example level.
  """

  # List the dataset files
  ds_path = tf.data.Dataset.from_tensor_slices(paths)

  # Make sure the dataset is used with distributed training.
  assert context is not None


  # Split the among the workers.
  #
  # Note: The "shard" is applied on the file path. The shard should not be
  # applied on the examples directly.
  # Note: You cannot use 'context.num_input_pipelines' with ParameterServerV2.
  current_worker = tfdf.keras.get_worker_idx_and_num_workers(context)
  ds_path = ds_path.shard(
      num_shards=current_worker.num_workers,
      index=current_worker.worker_idx)

  def read_csv_file(path):
    """Reads a single csv file."""

    numerical = tf.constant([0.0], dtype=tf.float32)
    categorical_string = tf.constant(["NA"], dtype=tf.string)
    csv_columns = [
        numerical,  # feature 1
        categorical_string,  # feature 2
        numerical,  # feature 3
        # ... define the features here.
    ]
    return tf.data.experimental.CsvDataset(path, csv_columns, header=True)

  ds_columns = ds_path.interleave(read_csv_file)

  # We assume a binary classification label with the following possible values.
  label_values = ["<=50K", ">50K"]

  # Convert the text labels into integers:
  # "<=50K" => 0
  # ">50K" => 1
  init_label_table = tf.lookup.KeyValueTensorInitializer(
      keys=tf.constant(label_values),
      values=tf.constant(range(label_values), dtype=tf.int64))
  label_table = tf.lookup.StaticVocabularyTable(
      init_label_table, num_oov_buckets=1)

  def extract_label(*columns):
    return columns[0:-1], label_table.lookup(columns[-1])

  ds_dataset = ds_columns.map(extract_label)

  # The batch size has no impact on the quality of the model. However, a larger
  # batch size generally is faster.
  ds_dataset = ds_dataset.batch(500)
  return ds_dataset


strategy = tf.distribute.experimental.ParameterServerStrategy(...)
with strategy.scope():
  model = tfdf.keras.DistributedGradientBoostedTreesModel()

  train_dataset = strategy.distribute_datasets_from_function(
      lambda context: dataset_fn(context, [...list of csv files...])
  )

model.fit(train_dataset)

print("Trained model")
model.summary()

Exemplo: ajuste de hiperparâmetro distribuído em um caminho de conjunto de dados

O ajuste de hiperparâmetro distribuído em um caminho de conjunto de dados é semelhante ao treinamento distribuído. A única diferença é que esta opção é compatível com modelos não distribuídos. Por exemplo, você pode distribuir o ajuste de hiperparâmetros do modelo Gradient Boosted Trees (não distribuído).

with strategy.scope():
  tuner = tfdf.tuner.RandomSearch(num_trials=30, use_predefined_hps=True)
  model = tfdf.keras.GradientBoostedTreesModel(tuner=tuner)

training_history = model.fit_on_dataset_path(
  train_path=train_path,
  label_key=label,
  dataset_format="csv",
  valid_path=test_path)

logging.info("Trained model:")
model.summary()

Exemplo: teste de unidade

Para testar a unidade de treinamento distribuído, você pode criar processos de trabalho simulados. Consulte o método _create_in_process_tf_ps_cluster nos testes de unidade TF-DF para obter mais informações.