Ver en TensorFlow.org | Ejecutar en Google Colab | Ver fuente en GitHub | Descargar libreta |
Descripción general
El entrenamiento del servidor de parámetros es un método paralelo de datos común para escalar el entrenamiento del modelo en varias máquinas.
Un clúster de entrenamiento de servidor de parámetros consta de trabajadores y servidores de parámetros . Las variables se crean en servidores de parámetros y los trabajadores las leen y actualizan en cada paso. De forma predeterminada, los trabajadores leen y actualizan estas variables de forma independiente sin sincronizarse entre sí. Esta es la razón por la cual, a veces, el entrenamiento estilo servidor de parámetros se denomina entrenamiento asíncrono .
En TensorFlow 2, el entrenamiento del servidor de parámetros funciona con la clase tf.distribute.experimental.ParameterServerStrategy
, que distribuye los pasos de entrenamiento a un clúster que se amplía a miles de trabajadores (acompañado de servidores de parámetros).
Métodos de entrenamiento compatibles
Hay dos métodos principales de entrenamiento admitidos:
- La API Keras
Model.fit
, que se recomienda cuando se prefiere una abstracción y un manejo del entrenamiento de alto nivel. - Un bucle de entrenamiento personalizado (puede consultar Entrenamiento personalizado , Escribir un bucle de entrenamiento desde cero y Bucle de entrenamiento personalizado con Keras y MultiWorkerMirroredStrategy para obtener más detalles). Se recomienda el entrenamiento de bucle personalizado cuando prefiera definir los detalles de su bucle de entrenamiento.
Un clúster con trabajos y tareas
Independientemente de la API elegida ( Model.fit
o un ciclo de entrenamiento personalizado), el entrenamiento distribuido en TensorFlow 2 implica: un 'cluster'
con varios 'jobs'
, y cada uno de los trabajos puede tener una o más 'tasks'
.
Al usar el entrenamiento del servidor de parámetros, se recomienda tener:
- Un trabajo de coordinador (que tiene el nombre de trabajo
chief
) - Múltiples trabajos de trabajador (nombre de trabajo
worker
); y - Varios trabajos de servidor de parámetros (nombre de trabajo
ps
)
Mientras que el coordinador crea recursos, distribuye tareas de capacitación, escribe puntos de control y se ocupa de fallas de tareas, los trabajadores y los servidores de parámetros ejecutan tf.distribute.Server
que escucha las solicitudes del coordinador.
Entrenamiento del servidor de parámetros con la API Model.fit
El entrenamiento del servidor de parámetros con la API de Model.fit
requiere que el coordinador use un objeto tf.distribute.experimental.ParameterServerStrategy
y un tf.keras.utils.experimental.DatasetCreator
como entrada. Similar al uso de Model.fit
sin estrategia, o con otras estrategias, el flujo de trabajo implica crear y compilar el modelo, preparar las devoluciones de llamada, seguidas de una llamada Model.fit
.
Entrenamiento del servidor de parámetros con un bucle de entrenamiento personalizado
Con bucles de entrenamiento personalizados, la clase tf.distribute.experimental.coordinator.ClusterCoordinator
es el componente clave que se usa para el coordinador.
- La clase
ClusterCoordinator
debe funcionar junto con un objetotf.distribute.Strategy
. - Este objeto
tf.distribute.Strategy
es necesario para proporcionar la información del clúster y se usa para definir un paso de entrenamiento, como se demuestra en Entrenamiento personalizado con tf.distribute.Strategy . - Luego, el objeto
ClusterCoordinator
envía la ejecución de estos pasos de capacitación a los trabajadores remotos. - Para el entrenamiento del servidor de parámetros, el
ClusterCoordinator
necesita trabajar con untf.distribute.experimental.ParameterServerStrategy
.
La API más importante proporcionada por el objeto ClusterCoordinator
es la schedule
:
- La API de
schedule
pone en cola unatf.function
y devuelve unRemoteValue
similar al futuro inmediatamente. - Las funciones en cola se enviarán a los trabajadores remotos en subprocesos en segundo plano y sus
RemoteValue
s se completarán de forma asíncrona. - Dado que la
schedule
no requiere la asignación de trabajadores, latf.function
se puede ejecutar en cualquier trabajador disponible. - Si el trabajador en el que se ejecuta deja de estar disponible antes de su finalización, la función se volverá a intentar en otro trabajador disponible.
- Debido a este hecho y al hecho de que la ejecución de la función no es atómica, una función puede ejecutarse más de una vez.
Además de enviar funciones remotas, ClusterCoordinator
también ayuda a crear conjuntos de datos sobre todos los trabajadores y reconstruir estos conjuntos de datos cuando un trabajador se recupera de una falla.
Configuración del tutorial
El tutorial se ramificará en Model.fit
y rutas de bucle de entrenamiento personalizadas, y puede elegir la que mejor se adapte a sus necesidades. Las secciones que no sean "Entrenamiento con X" son aplicables a ambos caminos.
pip install portpicker
import multiprocessing
import os
import random
import portpicker
import tensorflow as tf
Configuración del clúster
Como se mencionó anteriormente, un clúster de entrenamiento de servidor de parámetros requiere una tarea de coordinador que ejecute su programa de entrenamiento, uno o varios trabajadores y tareas de servidor de parámetros que ejecuten servidores de TensorFlow ( tf.distribute.Server
y posiblemente una tarea de evaluación adicional que ejecute una evaluación adicional. (consulte la sección de evaluación del sidecar a continuación). Los requisitos para configurarlos son:
- La tarea del coordinador necesita conocer las direcciones y los puertos de todos los demás servidores de TensorFlow, excepto el evaluador.
- Los trabajadores y los servidores de parámetros necesitan saber qué puerto deben escuchar. En aras de la simplicidad, generalmente puede pasar la información completa del clúster al crear servidores TensorFlow en estas tareas.
- La tarea del evaluador no tiene que conocer la configuración del clúster de entrenamiento. Si lo hace, no debería intentar conectarse al clúster de entrenamiento.
- Los trabajadores y los servidores de parámetros deben tener tipos de tareas como
"worker"
y"ps"
, respectivamente. El coordinador debe usar"chief"
como tipo de tarea por razones heredadas.
En este tutorial, creará un clúster en proceso para que todo el entrenamiento del servidor de parámetros se pueda ejecutar en Colab. Aprenderá a configurar clústeres reales en una sección posterior.
Clúster en proceso
Comenzará creando varios servidores TensorFlow por adelantado y se conectará a ellos más tarde. Tenga en cuenta que esto es solo para el propósito de la demostración de este tutorial, y en el entrenamiento real, los servidores se iniciarán en máquinas "worker"
y "ps"
.
def create_in_process_cluster(num_workers, num_ps):
"""Creates and starts local servers and returns the cluster_resolver."""
worker_ports = [portpicker.pick_unused_port() for _ in range(num_workers)]
ps_ports = [portpicker.pick_unused_port() for _ in range(num_ps)]
cluster_dict = {}
cluster_dict["worker"] = ["localhost:%s" % port for port in worker_ports]
if num_ps > 0:
cluster_dict["ps"] = ["localhost:%s" % port for port in ps_ports]
cluster_spec = tf.train.ClusterSpec(cluster_dict)
# Workers need some inter_ops threads to work properly.
worker_config = tf.compat.v1.ConfigProto()
if multiprocessing.cpu_count() < num_workers + 1:
worker_config.inter_op_parallelism_threads = num_workers + 1
for i in range(num_workers):
tf.distribute.Server(
cluster_spec,
job_name="worker",
task_index=i,
config=worker_config,
protocol="grpc")
for i in range(num_ps):
tf.distribute.Server(
cluster_spec,
job_name="ps",
task_index=i,
protocol="grpc")
cluster_resolver = tf.distribute.cluster_resolver.SimpleClusterResolver(
cluster_spec, rpc_layer="grpc")
return cluster_resolver
# Set the environment variable to allow reporting worker and ps failure to the
# coordinator. This is a workaround and won't be necessary in the future.
os.environ["GRPC_FAIL_FAST"] = "use_caller"
NUM_WORKERS = 3
NUM_PS = 2
cluster_resolver = create_in_process_cluster(NUM_WORKERS, NUM_PS)
La configuración del clúster en proceso se usa con frecuencia en las pruebas unitarias, como aquí .
Otra opción para las pruebas locales es iniciar procesos en la máquina local; consulte la capacitación para varios trabajadores con Keras para ver un ejemplo de este enfoque.
Crear una instancia de ParameterServerStrategy
Antes de sumergirse en el código de entrenamiento, creemos una instancia de un objeto ParameterServerStrategy
. Tenga en cuenta que esto es necesario independientemente de si está procediendo con Model.fit
o con un ciclo de entrenamiento personalizado. El argumento variable_partitioner
se explicará en la sección Fragmentación de variables .
variable_partitioner = (
tf.distribute.experimental.partitioners.MinSizePartitioner(
min_shard_bytes=(256 << 10),
max_shards=NUM_PS))
strategy = tf.distribute.experimental.ParameterServerStrategy(
cluster_resolver,
variable_partitioner=variable_partitioner)
INFO:tensorflow:`tf.distribute.experimental.ParameterServerStrategy` is initialized with cluster_spec: ClusterSpec({'ps': ['localhost:19069', 'localhost:19342'], 'worker': ['localhost:18007', 'localhost:20252', 'localhost:23613']}) INFO:tensorflow:ParameterServerStrategyV2 is now connecting to cluster with cluster_spec: ClusterSpec({'ps': ['localhost:19069', 'localhost:19342'], 'worker': ['localhost:18007', 'localhost:20252', 'localhost:23613']}) INFO:tensorflow:ParameterServerStrategy (CentralStorageStrategy if you are using a single machine) with compute_devices = ['/job:chief/replica:0/task:0/device:GPU:0'], variable_device = '/job:chief/replica:0/task:0/device:GPU:0' INFO:tensorflow:Number of GPUs on workers: 1
Para usar GPU para capacitación, asigne GPU visibles para cada trabajador. ParameterServerStrategy
usará todas las GPU disponibles en cada trabajador, con la restricción de que todos los trabajadores deben tener la misma cantidad de GPU disponibles.
Fragmentación variable
La fragmentación de variables se refiere a dividir una variable en múltiples variables más pequeñas, que se denominan fragmentos . La fragmentación variable puede ser útil para distribuir la carga de la red al acceder a estos fragmentos. También es útil para distribuir el cálculo y el almacenamiento de una variable normal entre varios servidores de parámetros.
Para habilitar la fragmentación de variables, puede pasar un variable_partitioner
al construir un objeto ParameterServerStrategy
. Se invocará variable_partitioner
cada vez que se cree una variable y se espera que devuelva el número de fragmentos a lo largo de cada dimensión de la variable. Se proporcionan algunos variable_partitioner
listos para usar, como tf.distribute.experimental.partitioners.MinSizePartitioner
. Se recomienda utilizar particiones basadas en el tamaño como tf.distribute.experimental.partitioners.MinSizePartitioner
para evitar la partición de variables pequeñas, lo que podría tener un impacto negativo en la velocidad de entrenamiento del modelo.
Cuando se pasa un variable_partitioner
y si crea una variable directamente en la strategy.scope()
, se convertirá en un tipo de contenedor con una propiedad de variables
que brinda acceso a la lista de fragmentos. En la mayoría de los casos, este contenedor se convertirá automáticamente en un tensor al concatenar todos los fragmentos. Como resultado, se puede utilizar como una variable normal. Por otro lado, algunos métodos de TensorFlow como tf.nn.embedding_lookup
brindan una implementación eficiente para este tipo de contenedor y en estos métodos se evitará la concatenación automática.
Consulte los documentos de la API de tf.distribute.experimental.ParameterServerStrategy
para obtener más detalles.
Entrenamiento con Model.fit
Keras proporciona una API de capacitación fácil de usar a través Model.fit
que maneja el ciclo de capacitación bajo el capó, con la flexibilidad de anular train_step
y devoluciones de llamada, que brindan funcionalidades como guardar puntos de control o guardar resúmenes para TensorBoard. Con Model.fit
, el mismo código de entrenamiento se puede usar para otras estrategias con un simple intercambio del objeto de la estrategia.
Datos de entrada
Model.fit
con el entrenamiento del servidor de parámetros requiere que los datos de entrada se proporcionen en un invocable que tome un solo argumento de tipo tf.distribute.InputContext
y devuelva un tf.data.Dataset
. Luego, cree un objeto tf.keras.utils.experimental.DatasetCreator
que tome dicho objeto callable
y un objeto tf.distribute.InputOptions
opcional a través del argumento input_options
.
Tenga en cuenta que se recomienda mezclar y repetir los datos con el entrenamiento del servidor de parámetros y especificar steps_per_epoch
en la llamada de fit
para que la biblioteca conozca los límites de la época.
Consulte el tutorial de entrada distribuida para obtener más información sobre el argumento InputContext
.
def dataset_fn(input_context):
global_batch_size = 64
batch_size = input_context.get_per_replica_batch_size(global_batch_size)
x = tf.random.uniform((10, 10))
y = tf.random.uniform((10,))
dataset = tf.data.Dataset.from_tensor_slices((x, y)).shuffle(10).repeat()
dataset = dataset.shard(
input_context.num_input_pipelines,
input_context.input_pipeline_id)
dataset = dataset.batch(batch_size)
dataset = dataset.prefetch(2)
return dataset
dc = tf.keras.utils.experimental.DatasetCreator(dataset_fn)
El código en dataset_fn
se invocará en el dispositivo de entrada, que suele ser la CPU, en cada una de las máquinas de trabajo.
Construcción y compilación de modelos.
Ahora, creará un tf.keras.Model
, un modelo tf.keras.models.Sequential
trivial con fines de demostración, seguido de una llamada Model.compile
para incorporar componentes, como un optimizador, métricas o parámetros como steps_per_execution
:
with strategy.scope():
model = tf.keras.models.Sequential([tf.keras.layers.Dense(10)])
model.compile(tf.keras.optimizers.SGD(), loss='mse', steps_per_execution=10)
Devoluciones de llamadas y capacitación
Antes de llamar a model.fit
para la capacitación real, preparemos las devoluciones de llamada necesarias para tareas comunes, como:
-
ModelCheckpoint
: para guardar los pesos del modelo. -
BackupAndRestore
: para asegurarse de que el progreso de la capacitación se respalde automáticamente y se recupere si el clúster experimenta falta de disponibilidad (como cancelación o preferencia); o -
TensorBoard
: para guardar los informes de progreso en archivos de resumen, que se visualizan en la herramienta TensorBoard.
working_dir = '/tmp/my_working_dir'
log_dir = os.path.join(working_dir, 'log')
ckpt_filepath = os.path.join(working_dir, 'ckpt')
backup_dir = os.path.join(working_dir, 'backup')
callbacks = [
tf.keras.callbacks.TensorBoard(log_dir=log_dir),
tf.keras.callbacks.ModelCheckpoint(filepath=ckpt_filepath),
tf.keras.callbacks.BackupAndRestore(backup_dir=backup_dir),
]
model.fit(dc, epochs=5, steps_per_epoch=20, callbacks=callbacks)
Epoch 1/5 INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',). INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',). INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',). INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',). INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',). INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',). INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',). INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',). 2022-01-26 05:32:01.399347: W tensorflow/python/util/util.cc:368] Sets are not currently considered sequences, but this may change in the future, so consider avoiding using them. INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets 20/20 - 4s - loss: 0.5761 - 4s/epoch - 185ms/step Epoch 2/5 INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets 20/20 - 1s - loss: 0.4423 - 561ms/epoch - 28ms/step Epoch 3/5 WARNING:tensorflow:5 out of the last 5 calls to <function MultiDeviceSaver.save.<locals>.tf_function_save at 0x7f89783e7560> triggered tf.function retracing. Tracing is expensive and the excessive number of tracings could be due to (1) creating @tf.function repeatedly in a loop, (2) passing tensors with different shapes, (3) passing Python objects instead of tensors. For (1), please define your @tf.function outside of the loop. For (2), @tf.function has experimental_relax_shapes=True option that relaxes argument shapes that can avoid unnecessary retracing. For (3), please refer to https://www.tensorflow.org/guide/function#controlling_retracing and https://www.tensorflow.org/api_docs/python/tf/function for more details. INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets WARNING:tensorflow:6 out of the last 6 calls to <function MultiDeviceSaver.save.<locals>.tf_function_save at 0x7f897851f050> triggered tf.function retracing. Tracing is expensive and the excessive number of tracings could be due to (1) creating @tf.function repeatedly in a loop, (2) passing tensors with different shapes, (3) passing Python objects instead of tensors. For (1), please define your @tf.function outside of the loop. For (2), @tf.function has experimental_relax_shapes=True option that relaxes argument shapes that can avoid unnecessary retracing. For (3), please refer to https://www.tensorflow.org/guide/function#controlling_retracing and https://www.tensorflow.org/api_docs/python/tf/function for more details. 20/20 - 0s - loss: 0.3983 - 392ms/epoch - 20ms/step Epoch 4/5 INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets 20/20 - 1s - loss: 0.3592 - 507ms/epoch - 25ms/step Epoch 5/5 INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets 20/20 - 0s - loss: 0.2705 - 384ms/epoch - 19ms/step <keras.callbacks.History at 0x7f89984ca890>
Uso directo con ClusterCoordinator
(opcional)
Incluso si elige la ruta de entrenamiento Model.fit
, puede crear una instancia opcional de un objeto tf.distribute.experimental.coordinator.ClusterCoordinator
para programar otras funciones que le gustaría que se ejecutaran en los trabajadores. Consulte la sección Entrenamiento con un bucle de entrenamiento personalizado para obtener más detalles y ejemplos.
Entrenamiento con un bucle de entrenamiento personalizado
El uso de bucles de entrenamiento personalizados con tf.distribute.Strategy
proporciona una gran flexibilidad para definir bucles de entrenamiento. Con la ParameterServerStrategy
definida anteriormente (como strategy
), utilizará un tf.distribute.experimental.coordinator.ClusterCoordinator
para enviar la ejecución de los pasos de capacitación a los trabajadores remotos.
Luego, creará un modelo, definirá un conjunto de datos y una función de paso, como lo ha hecho en el ciclo de entrenamiento con otros tf.distribute.Strategy
s. Puede encontrar más detalles en el tutorial Entrenamiento personalizado con tf.distribute.Strategy .
Para garantizar una obtención previa eficiente del conjunto de datos, use las API de creación de conjuntos de datos distribuidos recomendadas que se mencionan en la sección Pasos de capacitación de envío a trabajadores remotos a continuación. Además, asegúrese de llamar a Strategy.run
dentro de worker_fn
para aprovechar al máximo las GPU asignadas a los trabajadores. El resto de pasos son los mismos para entrenar con o sin GPU.
Vamos a crear estos componentes en los siguientes pasos:
configurar los datos
Primero, escriba una función que cree un conjunto de datos que incluya la lógica de preprocesamiento implementada por las capas de preprocesamiento de Keras .
Creará estas capas fuera de dataset_fn
pero aplicará la transformación dentro de dataset_fn
, ya que envolverá dataset_fn
en una tf.function
, que no permite que se creen variables dentro de él.
feature_vocab = [
"avenger", "ironman", "batman", "hulk", "spiderman", "kingkong", "wonder_woman"
]
label_vocab = ["yes", "no"]
with strategy.scope():
feature_lookup_layer = tf.keras.layers.StringLookup(
vocabulary=feature_vocab,
mask_token=None)
label_lookup_layer = tf.keras.layers.StringLookup(
vocabulary=label_vocab,
num_oov_indices=0,
mask_token=None)
raw_feature_input = tf.keras.layers.Input(
shape=(3,),
dtype=tf.string,
name="feature")
feature_id_input = feature_lookup_layer(raw_feature_input)
feature_preprocess_stage = tf.keras.Model(
{"features": raw_feature_input},
feature_id_input)
raw_label_input = tf.keras.layers.Input(
shape=(1,),
dtype=tf.string,
name="label")
label_id_input = label_lookup_layer(raw_label_input)
label_preprocess_stage = tf.keras.Model(
{"label": raw_label_input},
label_id_input)
/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/numpy/core/numeric.py:2446: FutureWarning: elementwise comparison failed; returning scalar instead, but in the future will perform elementwise comparison return bool(asarray(a1 == a2).all())
Genere ejemplos de juguetes en un conjunto de datos:
def feature_and_label_gen(num_examples=200):
examples = {"features": [], "label": []}
for _ in range(num_examples):
features = random.sample(feature_vocab, 3)
label = ["yes"] if "avenger" in features else ["no"]
examples["features"].append(features)
examples["label"].append(label)
return examples
examples = feature_and_label_gen()
Luego, crea el conjunto de datos de entrenamiento envuelto en un dataset_fn
:
def dataset_fn(_):
raw_dataset = tf.data.Dataset.from_tensor_slices(examples)
train_dataset = raw_dataset.map(
lambda x: (
{"features": feature_preprocess_stage(x["features"])},
label_preprocess_stage(x["label"])
)).shuffle(200).batch(32).repeat()
return train_dataset
Construye el modelo
A continuación, cree el modelo y otros objetos. Asegúrese de crear todas las variables en la strategy.scope
.
# These variables created under the `strategy.scope` will be placed on parameter
# servers in a round-robin fashion.
with strategy.scope():
# Create the model. The input needs to be compatible with Keras processing layers.
model_input = tf.keras.layers.Input(
shape=(3,), dtype=tf.int64, name="model_input")
emb_layer = tf.keras.layers.Embedding(
input_dim=len(feature_lookup_layer.get_vocabulary()), output_dim=16384)
emb_output = tf.reduce_mean(emb_layer(model_input), axis=1)
dense_output = tf.keras.layers.Dense(units=1, activation="sigmoid")(emb_output)
model = tf.keras.Model({"features": model_input}, dense_output)
optimizer = tf.keras.optimizers.RMSprop(learning_rate=0.1)
accuracy = tf.keras.metrics.Accuracy()
Confirmemos que el uso de FixedShardsPartitioner
dividió todas las variables en dos fragmentos y cada fragmento se asignó a diferentes servidores de parámetros:
assert len(emb_layer.weights) == 2
assert emb_layer.weights[0].shape == (4, 16384)
assert emb_layer.weights[1].shape == (4, 16384)
assert emb_layer.weights[0].device == "/job:ps/replica:0/task:0/device:CPU:0"
assert emb_layer.weights[1].device == "/job:ps/replica:0/task:1/device:CPU:0"
Definir el paso de entrenamiento
Tercero, cree el paso de entrenamiento envuelto en una tf.function
:
@tf.function
def step_fn(iterator):
def replica_fn(batch_data, labels):
with tf.GradientTape() as tape:
pred = model(batch_data, training=True)
per_example_loss = tf.keras.losses.BinaryCrossentropy(
reduction=tf.keras.losses.Reduction.NONE)(labels, pred)
loss = tf.nn.compute_average_loss(per_example_loss)
gradients = tape.gradient(loss, model.trainable_variables)
optimizer.apply_gradients(zip(gradients, model.trainable_variables))
actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
accuracy.update_state(labels, actual_pred)
return loss
batch_data, labels = next(iterator)
losses = strategy.run(replica_fn, args=(batch_data, labels))
return strategy.reduce(tf.distribute.ReduceOp.SUM, losses, axis=None)
En la función de paso de entrenamiento anterior, llamar a Strategy.run
y Strategy.reduce
en step_fn
puede admitir varias GPU por trabajador. Si los trabajadores tienen GPU asignadas, Strategy.run
distribuirá los conjuntos de datos en varias réplicas.
Envíe los pasos de capacitación a los trabajadores remotos
Después de que ParameterServerStrategy
defina todos los cálculos, utilizará la clase tf.distribute.experimental.coordinator.ClusterCoordinator
para crear recursos y distribuir los pasos de capacitación a los trabajadores remotos.
Primero creemos un objeto ClusterCoordinator
y pasemos el objeto de estrategia:
coordinator = tf.distribute.experimental.coordinator.ClusterCoordinator(strategy)
Luego, cree un conjunto de datos por trabajador y un iterador. En per_worker_dataset_fn
continuación, se recomienda incluir strategy.distribute_datasets_from_function
dataset_fn
para permitir una captación previa eficiente a las GPU sin problemas.
@tf.function
def per_worker_dataset_fn():
return strategy.distribute_datasets_from_function(dataset_fn)
per_worker_dataset = coordinator.create_per_worker_dataset(per_worker_dataset_fn)
per_worker_iterator = iter(per_worker_dataset)
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).
El paso final es distribuir el cómputo a los trabajadores remotos usando ClusterCoordinator.schedule
:
- El método de
schedule
pone en cola unatf.function
y devuelve unRemoteValue
similar al futuro inmediatamente. Las funciones en cola se enviarán a los trabajadores remotos en subprocesos en segundo plano yRemoteValue
se completará de forma asíncrona. - El método de
join
(ClusterCoordinator.join
) se puede usar para esperar hasta que se ejecuten todas las funciones programadas.
num_epoches = 4
steps_per_epoch = 5
for i in range(num_epoches):
accuracy.reset_states()
for _ in range(steps_per_epoch):
coordinator.schedule(step_fn, args=(per_worker_iterator,))
# Wait at epoch boundaries.
coordinator.join()
print ("Finished epoch %d, accuracy is %f." % (i, accuracy.result().numpy()))
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',). INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',). Finished epoch 0, accuracy is 0.543750. Finished epoch 1, accuracy is 0.543750. Finished epoch 2, accuracy is 0.950000. Finished epoch 3, accuracy is 1.000000.
Así es como puede obtener el resultado de un RemoteValue
:
loss = coordinator.schedule(step_fn, args=(per_worker_iterator,))
print ("Final loss is %f" % loss.fetch())
Final loss is 0.000000
Alternativamente, puede iniciar todos los pasos y hacer algo mientras espera que se completen:
for _ in range(total_steps):
coordinator.schedule(step_fn, args=(per_worker_iterator,))
while not coordinator.done():
time.sleep(10)
# Do something like logging metrics or writing checkpoints.
Para ver el flujo de trabajo completo de capacitación y servicio para este ejemplo en particular, consulte esta prueba .
Más sobre la creación de conjuntos de datos
El conjunto de datos del código anterior se crea mediante la API ClusterCoordinator.create_per_worker_dataset
). Crea un conjunto de datos por trabajador y devuelve un objeto contenedor. Puede llamar al método iter
para crear un iterador por trabajador. El iterador por trabajador contiene un iterador por trabajador y la porción correspondiente de un trabajador se sustituirá en el argumento de entrada de la función que se pasa al método ClusterCoordinator.schedule
antes de que la función se ejecute en un trabajador en particular.
Actualmente, el método ClusterCoordinator.schedule
asume que los trabajadores son equivalentes y, por lo tanto, asume que los conjuntos de datos de diferentes trabajadores son los mismos, excepto que se pueden barajar de manera diferente si contienen una operación Dataset.shuffle
. Debido a esto, también se recomienda que los conjuntos de datos se repitan indefinidamente y programe un número finito de pasos en lugar de depender del OutOfRangeError
de un conjunto de datos.
Otra nota importante es que los conjuntos de datos tf.data
no admiten la serialización y deserialización implícitas a través de los límites de la tarea. Por lo tanto, es importante crear todo el conjunto de datos dentro de la función que se pasa a ClusterCoordinator.create_per_worker_dataset
.
Evaluación
Hay más de una forma de definir y ejecutar un ciclo de evaluación en el entrenamiento distribuido. Cada uno tiene sus pros y sus contras, como se describe a continuación. Se recomienda el método de evaluación en línea si no tiene preferencia.
Evaluación en línea
En este método, el coordinador alterna entre capacitación y evaluación y por eso se le llama evaluación en línea .
Hay varios beneficios de la evaluación en línea. Por ejemplo:
- Puede admitir grandes modelos de evaluación y conjuntos de datos de evaluación que una sola tarea no puede contener.
- Los resultados de la evaluación se pueden utilizar para tomar decisiones para entrenar la próxima época.
Hay dos formas de implementar la evaluación en línea: evaluación directa y evaluación distribuida.
- Evaluación directa : para modelos pequeños y conjuntos de datos de evaluación, el coordinador puede ejecutar la evaluación directamente en el modelo distribuido con el conjunto de datos de evaluación en el coordinador:
eval_dataset = tf.data.Dataset.from_tensor_slices(
feature_and_label_gen(num_examples=16)).map(
lambda x: (
{"features": feature_preprocess_stage(x["features"])},
label_preprocess_stage(x["label"])
)).batch(8)
eval_accuracy = tf.keras.metrics.Accuracy()
for batch_data, labels in eval_dataset:
pred = model(batch_data, training=False)
actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
eval_accuracy.update_state(labels, actual_pred)
print ("Evaluation accuracy: %f" % eval_accuracy.result())
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,). Evaluation accuracy: 1.000000
- Evaluación distribuida : para modelos grandes o conjuntos de datos que no son factibles de ejecutar directamente en el coordinador, la tarea del coordinador puede distribuir tareas de evaluación a los trabajadores a través de los métodos
ClusterCoordinator.schedule
/ClusterCoordinator.join
:
with strategy.scope():
# Define the eval metric on parameter servers.
eval_accuracy = tf.keras.metrics.Accuracy()
@tf.function
def eval_step(iterator):
def replica_fn(batch_data, labels):
pred = model(batch_data, training=False)
actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
eval_accuracy.update_state(labels, actual_pred)
batch_data, labels = next(iterator)
strategy.run(replica_fn, args=(batch_data, labels))
def eval_dataset_fn():
return tf.data.Dataset.from_tensor_slices(
feature_and_label_gen(num_examples=16)).map(
lambda x: (
{"features": feature_preprocess_stage(x["features"])},
label_preprocess_stage(x["label"])
)).shuffle(16).repeat().batch(8)
per_worker_eval_dataset = coordinator.create_per_worker_dataset(eval_dataset_fn)
per_worker_eval_iterator = iter(per_worker_eval_dataset)
eval_steps_per_epoch = 2
for _ in range(eval_steps_per_epoch):
coordinator.schedule(eval_step, args=(per_worker_eval_iterator,))
coordinator.join()
print ("Evaluation accuracy: %f" % eval_accuracy.result())
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,). WARNING:tensorflow:1 GPUs are allocated per worker. Please use DistributedDataset by calling strategy.experimental_distribute_dataset or strategy.distribute_datasets_from_function to make best use of GPU resources WARNING:tensorflow:1 GPUs are allocated per worker. Please use DistributedDataset by calling strategy.experimental_distribute_dataset or strategy.distribute_datasets_from_function to make best use of GPU resources WARNING:tensorflow:1 GPUs are allocated per worker. Please use DistributedDataset by calling strategy.experimental_distribute_dataset or strategy.distribute_datasets_from_function to make best use of GPU resources Evaluation accuracy: 1.000000
Evaluación del sidecar
Otro método se llama evaluación de sidecar, donde crea una tarea de evaluador dedicada que lee repetidamente los puntos de control y ejecuta la evaluación en un punto de control más reciente. Permite que su programa de entrenamiento termine antes si no necesita cambiar su ciclo de entrenamiento en función de los resultados de la evaluación. Sin embargo, requiere una tarea de evaluador adicional y controles periódicos para activar la evaluación. A continuación se muestra un posible ciclo de evaluación del sidecar:
checkpoint_dir = ...
eval_model = ...
eval_data = ...
checkpoint = tf.train.Checkpoint(model=eval_model)
for latest_checkpoint in tf.train.checkpoints_iterator(
checkpoint_dir):
try:
checkpoint.restore(latest_checkpoint).expect_partial()
except (tf.errors.OpError,) as e:
# checkpoint may be deleted by training when it is about to read it.
continue
# Optionally add callbacks to write summaries.
eval_model.evaluate(eval_data)
# Evaluation finishes when it has evaluated the last epoch.
if latest_checkpoint.endswith('-{}'.format(train_epoches)):
break
Clústeres en el mundo real
En un entorno de producción real, ejecutará todas las tareas en diferentes procesos en diferentes máquinas. La forma más sencilla de configurar la información del clúster en cada tarea es configurar las variables de entorno "TF_CONFIG"
y usar un tf.distribute.cluster_resolver.TFConfigClusterResolver
para analizar "TF_CONFIG"
.
Para obtener una descripción general sobre las variables de entorno "TF_CONFIG"
, consulte la guía de capacitación distribuida .
Si comienza sus tareas de capacitación utilizando Kubernetes u otras plantillas de configuración, es muy probable que estas plantillas ya hayan configurado “TF_CONFIG"
para usted.
Establecer la variable de entorno "TF_CONFIG"
Supongamos que tiene 3 trabajadores y 2 servidores de parámetros, el "TF_CONFIG"
del trabajador 1 puede ser:
os.environ["TF_CONFIG"] = json.dumps({
"cluster": {
"worker": ["host1:port", "host2:port", "host3:port"],
"ps": ["host4:port", "host5:port"],
"chief": ["host6:port"]
},
"task": {"type": "worker", "index": 1}
})
El "TF_CONFIG"
del evaluador puede ser:
os.environ["TF_CONFIG"] = json.dumps({
"cluster": {
"evaluator": ["host7:port"]
},
"task": {"type": "evaluator", "index": 0}
})
La parte "cluster"
en la cadena "TF_CONFIG"
anterior para el evaluador es opcional.
Si usa el mismo binario para todas las tareas
Si prefiere ejecutar todas estas tareas usando un solo binario, deberá dejar que su programa se divida en diferentes roles desde el principio:
cluster_resolver = tf.distribute.cluster_resolver.TFConfigClusterResolver()
if cluster_resolver.task_type in ("worker", "ps"):
# Start a TensorFlow server and wait.
elif cluster_resolver.task_type == "evaluator":
# Run side-car evaluation
else:
# Run the coordinator.
El siguiente código inicia un servidor TensorFlow y espera:
# Set the environment variable to allow reporting worker and ps failure to the
# coordinator. This is a workaround and won't be necessary in the future.
os.environ["GRPC_FAIL_FAST"] = "use_caller"
server = tf.distribute.Server(
cluster_resolver.cluster_spec(),
job_name=cluster_resolver.task_type,
task_index=cluster_resolver.task_id,
protocol=cluster_resolver.rpc_layer or "grpc",
start=True)
server.join()
Manejo de fallas en la tarea
fracaso del trabajador
tf.distribute.experimental.coordinator.ClusterCoordinator
o Model.fit
brindan tolerancia a fallas integrada para fallas del trabajador. Tras la recuperación del trabajador, la función del conjunto de datos proporcionada anteriormente (ya sea a ClusterCoordinator.create_per_worker_dataset
para un bucle de entrenamiento personalizado o tf.keras.utils.experimental.DatasetCreator
para Model.fit
) se invocará en los trabajadores para volver a crear los conjuntos de datos.
Fallo del servidor de parámetros o del coordinador
Sin embargo, cuando el coordinador ve un error del servidor de parámetros, generará un error no UnavailableError
o un AbortedError
inmediato. Puede reiniciar el coordinador en este caso. El propio coordinador también puede dejar de estar disponible. Por lo tanto, se recomienda cierto utillaje para no perder el progreso del entrenamiento:
Para
Model.fit
, debe usar una devolución de llamadaBackupAndRestore
, que maneja el guardado y la restauración del progreso automáticamente. Consulte la sección Devolución de llamada y capacitación anterior para ver un ejemplo.Para un ciclo de entrenamiento personalizado, debe verificar las variables del modelo periódicamente y cargar las variables del modelo desde un punto de control, si lo hay, antes de que comience el entrenamiento. El progreso del entrenamiento se puede deducir aproximadamente de las iteraciones del
optimizer.iterations
si se marca un optimizador:
checkpoint_manager = tf.train.CheckpointManager(
tf.train.Checkpoint(model=model, optimizer=optimizer),
checkpoint_dir,
max_to_keep=3)
if checkpoint_manager.latest_checkpoint:
checkpoint = checkpoint_manager.checkpoint
checkpoint.restore(
checkpoint_manager.latest_checkpoint).assert_existing_objects_matched()
global_steps = int(optimizer.iterations.numpy())
starting_epoch = global_steps // steps_per_epoch
for _ in range(starting_epoch, num_epoches):
for _ in range(steps_per_epoch):
coordinator.schedule(step_fn, args=(per_worker_iterator,))
coordinator.join()
checkpoint_manager.save()
Obtener un RemoteValue
Se garantiza que la obtención de un RemoteValue
tenga éxito si una función se ejecuta correctamente. Esto se debe a que actualmente el valor de retorno se copia inmediatamente al coordinador después de ejecutar una función. Si hay algún error en el trabajador durante la copia, la función se volverá a intentar en otro trabajador disponible. Por lo tanto, si desea optimizar el rendimiento, puede programar funciones sin valor de retorno.
Error al reportar
Una vez que el coordinador ve un error como UnavailableError
de los servidores de parámetros u otros errores de la aplicación, como un argumento no tf.debugging.check_numerics
InvalidArgument
cancelará todas las funciones pendientes y en cola antes de generar el error. Obtener sus RemoteValue
correspondientes generará un CancelledError
.
Después de que se genera un error, el coordinador no generará el mismo error ni ningún error de las funciones canceladas.
Mejora del rendimiento
Hay varias razones posibles si ve problemas de rendimiento cuando entrena con ParameterServerStrategy
y ClusterResolver
.
Una razón común es que los servidores de parámetros tienen una carga desequilibrada y algunos servidores de parámetros muy cargados han alcanzado su capacidad máxima. También puede haber múltiples causas raíz. Algunos métodos simples para mitigar este problema son:
- Fragmente las variables de su modelo grande especificando un
variable_partitioner
al construir unaParameterServerStrategy
. - Si es posible, evite crear una variable de punto de acceso requerida por todos los servidores de parámetros en un solo paso. Por ejemplo, use una tasa de aprendizaje constante o subclase
tf.keras.optimizers.schedules.LearningRateSchedule
en los optimizadores, ya que el comportamiento predeterminado es que la tasa de aprendizaje se convertirá en una variable colocada en un servidor de parámetros en particular y solicitada por todos los demás servidores de parámetros en cada paso. . - Mezcla tus grandes vocabularios antes de pasarlos a las capas de preprocesamiento de Keras.
Otra posible razón de los problemas de rendimiento es el coordinador. Su primera implementación de schedule
/ join
está basada en Python y, por lo tanto, puede tener una sobrecarga de subprocesos. Además, la latencia entre el coordinador y los trabajadores puede ser grande. Si este es el caso,
Para
Model.fit
, puede establecer el argumentosteps_per_execution
proporcionado enModel.compile
en un valor mayor que 1.Para un ciclo de entrenamiento personalizado, puede empaquetar varios pasos en una sola
tf.function
:
steps_per_invocation = 10
@tf.function
def step_fn(iterator):
for _ in range(steps_per_invocation):
features, labels = next(iterator)
def replica_fn(features, labels):
...
strategy.run(replica_fn, args=(features, labels))
A medida que la biblioteca se optimice aún más, es de esperar que la mayoría de los usuarios no tengan que empaquetar manualmente los pasos en el futuro.
Además, un pequeño truco para mejorar el rendimiento es programar funciones sin un valor de retorno, como se explica en la sección de errores de tareas de manejo anterior.
Limitaciones conocidas
La mayoría de las limitaciones conocidas ya están cubiertas en las secciones anteriores. Esta sección proporciona un resumen.
ParameterServerStrategy
general
-
os.environment["grpc_fail_fast"]="use_caller"
es necesario en todas las tareas, incluido el coordinador, para que la tolerancia a fallas funcione correctamente. - No se admite el entrenamiento del servidor de parámetros síncrono.
- Por lo general, es necesario empaquetar varios pasos en una sola función para lograr un rendimiento óptimo.
- No se admite cargar un modelo guardado a través
tf.saved_model.load
que contiene variables fragmentadas. Tenga en cuenta que se espera que funcione la carga de un modelo guardado de este tipo con TensorFlow Serving. - No se admite cargar un punto de control que contenga variables de ranura del optimizador fragmentadas en una cantidad diferente de fragmentos.
- No se admite la recuperación de una falla del servidor de parámetros sin reiniciar la tarea del coordinador.
- El uso de
tf.lookup.StaticHashTable
(que comúnmente emplean algunas capas de preprocesamiento de Keras, comotf.keras.layers.IntegerLookup
,tf.keras.layers.StringLookup
ytf.keras.layers.TextVectorization
) da como resultado recursos colocados en el coordinador en este momento con la formación del servidor de parámetros. Esto tiene implicaciones de rendimiento para la búsqueda de RPC desde los trabajadores hasta el coordinador. Esta es una alta prioridad actual para abordar.
Específicos Model.fit
- Se requiere el argumento
steps_per_epoch
enModel.fit
. Puede seleccionar un valor que proporcione intervalos apropiados en una época. -
ParameterServerStrategy
no admite devoluciones de llamada personalizadas que tengan llamadas de nivel de lote por motivos de rendimiento. Debe convertir esas llamadas en llamadas de nivel de época con los pasos seleccionadossteps_per_epoch
para que sesteps_per_epoch
cada número de pasos de pasos por época. Las devoluciones de llamada integradas no se ven afectadas: sus llamadas a nivel de lote se han modificado para que funcionen. Se está planificando admitir llamadas a nivel de lote paraParameterServerStrategy
. - Por la misma razón, a diferencia de otras estrategias, la barra de progreso y las métricas se registran solo en los límites de época.
-
run_eagerly
no es compatible.
Detalles específicos del bucle de entrenamiento personalizado
-
ClusterCoordinator.schedule
no admite garantías de visitas para un conjunto de datos. - Cuando se usa
ClusterCoordinator.create_per_worker_dataset
, se debe crear todo el conjunto de datos dentro de la función que se le pasa. -
tf.data.Options
se ignora en un conjunto de datos creado porClusterCoordinator.create_per_worker_dataset
.