ML Community Day is November 9! Join us for updates from TensorFlow, JAX, and more Learn more

Parameter server training with ParameterServerStrategy

View on TensorFlow.org View source on GitHub Download notebook

Overview

Parameter server training is a common data-parallel method to scale up model training on multiple machines.

A parameter server training cluster consists of workers and parameter servers. Variables are created on parameter servers and they are read and updated by workers in each step. By default, workers read and update these variables independently without synchronizing with each other. This is why sometimes parameter server-style training is called asynchronous training.

In TensorFlow 2, parameter server training is powered by the tf.distribute.experimental.ParameterServerStrategy class, which distributes the training steps to a cluster that scales up to thousands of workers (accompanied by parameter servers).

Supported training methods

There are two main supported training methods:

A cluster with jobs and tasks

Regardless of the API of choice (Model.fit or a custom training loop), distributed training in TensorFlow 2 involves: a 'cluster' with several 'jobs', and each of the jobs may have one or more 'tasks'.

When using parameter server training, it is recommended to have:

  • One coordinator job (which has the job name chief)
  • Multiple worker jobs (job name worker); and
  • Multiple parameter server jobs (job name ps)

While the coordinator creates resources, dispatches training tasks, writes checkpoints, and deals with task failures, workers and parameter servers run tf.distribute.Server that listen for requests from the coordinator.

Parameter server training with Model.fit API

Parameter server training with the Model.fit API requires the coordinator to use a tf.distribute.experimental.ParameterServerStrategy object, and a tf.keras.utils.experimental.DatasetCreator as the input. Similar to Model.fit usage with no strategy, or with other strategies, the workflow involves creating and compiling the model, preparing the callbacks, followed by a Model.fit call.

Parameter server training with a custom training loop

With custom training loops, the tf.distribute.experimental.coordinator.ClusterCoordinator class is the key component used for the coordinator.

The most important API provided by the ClusterCoordinator object is schedule:

  • The schedule API enqueues a tf.function and returns a future-like RemoteValue immediately.
  • The queued functions will be dispatched to remote workers in background threads and their RemoteValues will be filled asynchronously.
  • Since schedule doesn’t require worker assignment, the tf.function passed in can be executed on any available worker.
  • If the worker it is executed on becomes unavailable before its completion, the function will be retried on another available worker.
  • Because of this fact and the fact that function execution is not atomic, a function may be executed more than once.

In addition to dispatching remote functions, the ClusterCoordinator also helps to create datasets on all the workers and rebuild these datasets when a worker recovers from failure.

Tutorial setup

The tutorial will branch into Model.fit and custom training loop paths, and you can choose the one that fits your needs. Sections other than "Training with X" are applicable to both paths.

pip install portpicker
pip uninstall tensorflow keras -y
pip install tf-nightly
import multiprocessing
import os
import random
import portpicker
import tensorflow as tf
from tensorflow.keras.layers.experimental import preprocessing

Cluster setup

As mentioned above, a parameter server training cluster requires a coordinator task that runs your training program, one or several workers and parameter server tasks that run TensorFlow servers—tf.distribute.Server—and possibly an additional evaluation task that runs side-car evaluation (see the side-car evaluation section below). The requirements to set them up are:

  • The coordinator task needs to know the addresses and ports of all other TensorFlow servers except the evaluator.
  • The workers and parameter servers need to know which port they need to listen to. For the sake of simplicity, you can usually pass in the complete cluster information when creating TensorFlow servers on these tasks.
  • The evaluator task doesn’t have to know the setup of the training cluster. If it does, it should not attempt to connect to the training cluster.
  • Workers and parameter servers should have task types as "worker" and "ps", respectively. The coordinator should use "chief" as the task type for legacy reasons.

In this tutorial, you will create an in-process cluster so that the whole parameter server training can be run in Colab. You will learn how to set up real clusters in a later section.

In-process cluster

You will start by creating several TensorFlow servers in advance and connect to them later. Note that this is only for the purpose of this tutorial's demonstration, and in real training the servers will be started on "worker" and "ps" machines.

def create_in_process_cluster(num_workers, num_ps):
  """Creates and starts local servers and returns the cluster_resolver."""
  worker_ports = [portpicker.pick_unused_port() for _ in range(num_workers)]
  ps_ports = [portpicker.pick_unused_port() for _ in range(num_ps)]

  cluster_dict = {}
  cluster_dict["worker"] = ["localhost:%s" % port for port in worker_ports]
  if num_ps > 0:
    cluster_dict["ps"] = ["localhost:%s" % port for port in ps_ports]

  cluster_spec = tf.train.ClusterSpec(cluster_dict)

  # Workers need some inter_ops threads to work properly.
  worker_config = tf.compat.v1.ConfigProto()
  if multiprocessing.cpu_count() < num_workers + 1:
    worker_config.inter_op_parallelism_threads = num_workers + 1

  for i in range(num_workers):
    tf.distribute.Server(
        cluster_spec,
        job_name="worker",
        task_index=i,
        config=worker_config,
        protocol="grpc")

  for i in range(num_ps):
    tf.distribute.Server(
        cluster_spec,
        job_name="ps",
        task_index=i,
        protocol="grpc")

  cluster_resolver = tf.distribute.cluster_resolver.SimpleClusterResolver(
      cluster_spec, rpc_layer="grpc")
  return cluster_resolver

# Set the environment variable to allow reporting worker and ps failure to the
# coordinator. This is a workaround and won't be necessary in the future.
os.environ["GRPC_FAIL_FAST"] = "use_caller"

NUM_WORKERS = 3
NUM_PS = 2
cluster_resolver = create_in_process_cluster(NUM_WORKERS, NUM_PS)
2021-07-22 01:22:29.962567: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 AVX512F FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2021-07-22 01:22:29.967320: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_SYSTEM_DRIVER_MISMATCH: system has unsupported display driver / cuda driver combination
2021-07-22 01:22:29.967351: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:169] retrieving CUDA diagnostic information for host: kokoro-gcp-ubuntu-prod-1124739887
2021-07-22 01:22:29.967359: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:176] hostname: kokoro-gcp-ubuntu-prod-1124739887
2021-07-22 01:22:29.967434: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:200] libcuda reported version is: 470.57.2
2021-07-22 01:22:29.967458: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:204] kernel reported version is: 465.27.0
2021-07-22 01:22:29.967464: E tensorflow/stream_executor/cuda/cuda_diagnostics.cc:313] kernel version 465.27.0 does not match DSO version 470.57.2 -- cannot find working devices in this configuration
2021-07-22 01:22:29.971985: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job ps -> {0 -> localhost:19682, 1 -> localhost:18874}
2021-07-22 01:22:29.972012: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:17310, 1 -> localhost:22663, 2 -> localhost:17641}
2021-07-22 01:22:29.972974: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:427] Started server with target: grpc://localhost:17310
2021-07-22 01:22:29.985134: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job ps -> {0 -> localhost:19682, 1 -> localhost:18874}
2021-07-22 01:22:29.985164: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:17310, 1 -> localhost:22663, 2 -> localhost:17641}
2021-07-22 01:22:29.985628: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:427] Started server with target: grpc://localhost:22663
2021-07-22 01:22:30.034392: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job ps -> {0 -> localhost:19682, 1 -> localhost:18874}
2021-07-22 01:22:30.034437: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:17310, 1 -> localhost:22663, 2 -> localhost:17641}
2021-07-22 01:22:30.035565: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:427] Started server with target: grpc://localhost:17641
2021-07-22 01:22:30.044623: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job ps -> {0 -> localhost:19682, 1 -> localhost:18874}
2021-07-22 01:22:30.044656: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:17310, 1 -> localhost:22663, 2 -> localhost:17641}
2021-07-22 01:22:30.045149: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:427] Started server with target: grpc://localhost:19682
2021-07-22 01:22:30.090235: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job ps -> {0 -> localhost:19682, 1 -> localhost:18874}
2021-07-22 01:22:30.090288: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:17310, 1 -> localhost:22663, 2 -> localhost:17641}
2021-07-22 01:22:30.090650: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:427] Started server with target: grpc://localhost:18874

The in-process cluster setup is frequently used in unit testing, such as here.

Another option for local testing is to launch processes on the local machine—check out Multi-worker training with Keras for an example of this approach.

Instantiate a ParameterServerStrategy

Before you dive into the training code, let's instantiate a ParameterServerStrategy object. Note that this is needed regardless of whether you are proceeding with Model.fit or a custom training loop. The variable_partitioner argument will be explained in the Variable sharding section.

variable_partitioner = (
    tf.distribute.experimental.partitioners.MinSizePartitioner(
        min_shard_bytes=(256 << 10),
        max_shards=NUM_PS))

strategy = tf.distribute.experimental.ParameterServerStrategy(
    cluster_resolver,
    variable_partitioner=variable_partitioner)
INFO:tensorflow:`tf.distribute.experimental.ParameterServerStrategy` is initialized with cluster_spec: ClusterSpec({'ps': ['localhost:19682', 'localhost:18874'], 'worker': ['localhost:17310', 'localhost:22663', 'localhost:17641']})
INFO:tensorflow:ParameterServerStrategyV2 is now connecting to cluster with cluster_spec: ClusterSpec({'ps': ['localhost:19682', 'localhost:18874'], 'worker': ['localhost:17310', 'localhost:22663', 'localhost:17641']})
INFO:tensorflow:ParameterServerStrategy (CentralStorageStrategy if you are using a single machine) with compute_devices = ['/job:chief/replica:0/task:0/device:CPU:0'], variable_device = '/job:chief/replica:0/task:0/device:CPU:0'
INFO:tensorflow:Number of GPUs on workers: 0
2021-07-22 01:22:30.112542: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job ps -> {0 -> localhost:19682, 1 -> localhost:18874}
2021-07-22 01:22:30.112587: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:17310, 1 -> localhost:22663, 2 -> localhost:17641}
2021-07-22 01:22:30.112599: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job chief -> {0 -> localhost:34915}
2021-07-22 01:22:30.136652: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job ps -> {0 -> localhost:19682, 1 -> localhost:18874}
2021-07-22 01:22:30.136690: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:17310, 1 -> localhost:22663, 2 -> localhost:17641}
2021-07-22 01:22:30.136703: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job chief -> {0 -> localhost:34915}
2021-07-22 01:22:30.136754: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job ps -> {0 -> localhost:19682, 1 -> localhost:18874}
2021-07-22 01:22:30.136781: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:17310, 1 -> localhost:22663, 2 -> localhost:17641}
2021-07-22 01:22:30.136789: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job chief -> {0 -> localhost:34915}
2021-07-22 01:22:30.136876: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job ps -> {0 -> localhost:19682, 1 -> localhost:18874}
2021-07-22 01:22:30.136917: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:17310, 1 -> localhost:22663, 2 -> localhost:17641}
2021-07-22 01:22:30.136931: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job chief -> {0 -> localhost:34915}
2021-07-22 01:22:30.136937: I tensorflow/core/distributed_runtime/eager/eager_service_impl.cc:270] Creating sync eager service context with rendezvous_id on host kokoro-gcp-ubuntu-prod-1124739887 /job:ps/replica:0/task:1
2021-07-22 01:22:30.136965: I tensorflow/core/distributed_runtime/eager/eager_service_impl.cc:270] Creating sync eager service context with rendezvous_id on host kokoro-gcp-ubuntu-prod-1124739887 /job:ps/replica:0/task:0
2021-07-22 01:22:30.137027: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job ps -> {0 -> localhost:19682, 1 -> localhost:18874}
2021-07-22 01:22:30.137060: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:17310, 1 -> localhost:22663, 2 -> localhost:17641}
2021-07-22 01:22:30.137071: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job chief -> {0 -> localhost:34915}
2021-07-22 01:22:30.137088: I tensorflow/core/distributed_runtime/eager/eager_service_impl.cc:270] Creating sync eager service context with rendezvous_id on host kokoro-gcp-ubuntu-prod-1124739887 /job:worker/replica:0/task:0
2021-07-22 01:22:30.137149: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job ps -> {0 -> localhost:19682, 1 -> localhost:18874}
2021-07-22 01:22:30.137185: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:17310, 1 -> localhost:22663, 2 -> localhost:17641}
2021-07-22 01:22:30.137196: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job chief -> {0 -> localhost:34915}
2021-07-22 01:22:30.137204: I tensorflow/core/distributed_runtime/eager/eager_service_impl.cc:270] Creating sync eager service context with rendezvous_id on host kokoro-gcp-ubuntu-prod-1124739887 /job:worker/replica:0/task:1
2021-07-22 01:22:30.138485: I tensorflow/core/distributed_runtime/eager/eager_service_impl.cc:270] Creating sync eager service context with rendezvous_id on host kokoro-gcp-ubuntu-prod-1124739887 /job:worker/replica:0/task:2
2021-07-22 01:22:30.139971: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job ps -> {0 -> localhost:19682, 1 -> localhost:18874}
2021-07-22 01:22:30.139993: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:17310, 1 -> localhost:22663, 2 -> localhost:17641}
2021-07-22 01:22:30.140000: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job chief -> {0 -> localhost:34915}
2021-07-22 01:22:30.140286: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:427] Started server with target: grpc://localhost:34915

In order to use GPUs for training, allocate GPUs visible to each worker. ParameterServerStrategy will use all the available GPUs on each worker, with the restriction that all workers should have the same number of GPUs available.

Variable sharding

Variable sharding refers to splitting a variable into multiple smaller variables, which are called shards. Variable sharding may be useful to distribute the network load when accessing these shards. It is also useful to distribute computation and storage of a normal variable across multiple parameter servers.

To enable variable sharding, you can pass in a variable_partitioner when constructing a ParameterServerStrategy object. The variable_partitioner will be invoked every time when a variable is created and it is expected to return the number of shards along each dimension of the variable. Some out-of-box variable_partitioners are provided such as tf.distribute.experimental.partitioners.MinSizePartitioner. It is recommended to use size-based partitioners like tf.distribute.experimental.partitioners.MinSizePartitioner to avoid partitioning small variables, which could have negative impact on model training speed.

When a variable_partitioner is passed in and if you create a variable directly under strategy.scope(), it will become a container type with a variables property which provides access to the list of shards. In most cases, this container will be automatically converted to a Tensor by concatenating all the shards. As a result, it can be used as a normal variable. On the other hand, some TensorFlow methods such as tf.nn.embedding_lookup provide efficient implementation for this container type and in these methods automatic concatenation will be avoided.

Please see the API docs of tf.distribute.experimental.ParameterServerStrategy for more details.

Training with Model.fit

Keras provides an easy-to-use training API via Model.fit that handles the training loop under the hood, with the flexibility of overridable train_step, and callbacks, which provide functionalities such as checkpoint saving or summary saving for TensorBoard. With Model.fit, the same training code can be used for other strategies with a simple swap of the strategy object.

Input data

Model.fit with parameter server training requires that the input data be provided in a callable that takes a single argument of type tf.distribute.InputContext, and returns a tf.data.Dataset. Then, create a tf.keras.utils.experimental.DatasetCreator object that takes such callable, and an optional tf.distribute.InputOptions object via input_options argument.

Note that it is recommended to shuffle and repeat the data with parameter server training, and specify steps_per_epoch in fit call so the library knows the epoch boundaries.

Please see the Distributed input tutorial for more information about the InputContext argument.

def dataset_fn(input_context):
  global_batch_size = 64
  batch_size = input_context.get_per_replica_batch_size(global_batch_size)

  x = tf.random.uniform((10, 10))
  y = tf.random.uniform((10,))

  dataset = tf.data.Dataset.from_tensor_slices((x, y)).shuffle(10).repeat()
  dataset = dataset.shard(
      input_context.num_input_pipelines,
      input_context.input_pipeline_id)
  dataset = dataset.batch(batch_size)
  dataset = dataset.prefetch(2)

  return dataset

dc = tf.keras.utils.experimental.DatasetCreator(dataset_fn)

The code in dataset_fn will be invoked on the input device, which is usually the CPU, on each of the worker machines.

Model construction and compiling

Now, you will create a tf.keras.Model—a trivial tf.keras.models.Sequential model for demonstration purposes—followed by a Model.compile call to incorporate components, such as an optimizer, metrics, or parameters such as steps_per_execution:

with strategy.scope():
  model = tf.keras.models.Sequential([tf.keras.layers.Dense(10)])

model.compile(tf.keras.optimizers.SGD(), loss='mse', steps_per_execution=10)

Callbacks and training

Before you call model.fit for the actual training, let's prepare the needed callbacks for common tasks, such as:

  • ModelCheckpoint: to save the model weights.
  • BackupAndRestore: to make sure the training progress is automatically backed up, and recovered if the cluster experiences unavailability (such as abort or preemption); or
  • TensorBoard: to save the progress reports into summary files, which get visualized in TensorBoard tool.
working_dir = '/tmp/my_working_dir'
log_dir = os.path.join(working_dir, 'log')
ckpt_filepath = os.path.join(working_dir, 'ckpt')
backup_dir = os.path.join(working_dir, 'backup')

callbacks = [
    tf.keras.callbacks.TensorBoard(log_dir=log_dir),
    tf.keras.callbacks.ModelCheckpoint(filepath=ckpt_filepath),
    tf.keras.callbacks.experimental.BackupAndRestore(backup_dir=backup_dir),
]

model.fit(dc, epochs=5, steps_per_epoch=20, callbacks=callbacks)
2021-07-22 01:22:30.205180: I tensorflow/core/profiler/lib/profiler_session.cc:131] Profiler session initializing.
2021-07-22 01:22:30.205213: I tensorflow/core/profiler/lib/profiler_session.cc:146] Profiler session started.
2021-07-22 01:22:30.207087: I tensorflow/core/profiler/lib/profiler_session.cc:164] Profiler session tear down.
Epoch 1/5
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
2021-07-22 01:22:34.281880: I tensorflow/core/profiler/lib/profiler_session.cc:131] Profiler session initializing.
2021-07-22 01:22:34.281923: I tensorflow/core/profiler/lib/profiler_session.cc:146] Profiler session started.
2021-07-22 01:22:34.290681: I tensorflow/core/profiler/lib/profiler_session.cc:66] Profiler session collecting data.
2021-07-22 01:22:34.291221: I tensorflow/core/profiler/lib/profiler_session.cc:164] Profiler session tear down.
2021-07-22 01:22:34.292249: I tensorflow/core/profiler/rpc/client/save_profile.cc:136] Creating directory: /tmp/my_working_dir/log/plugins/profile/2021_07_22_01_22_34

2021-07-22 01:22:34.292801: I tensorflow/core/profiler/rpc/client/save_profile.cc:142] Dumped gzipped tool data for trace.json.gz to /tmp/my_working_dir/log/plugins/profile/2021_07_22_01_22_34/kokoro-gcp-ubuntu-prod-1124739887.trace.json.gz
2021-07-22 01:22:34.294605: I tensorflow/core/profiler/rpc/client/save_profile.cc:136] Creating directory: /tmp/my_working_dir/log/plugins/profile/2021_07_22_01_22_34

2021-07-22 01:22:34.294780: I tensorflow/core/profiler/rpc/client/save_profile.cc:142] Dumped gzipped tool data for memory_profile.json.gz to /tmp/my_working_dir/log/plugins/profile/2021_07_22_01_22_34/kokoro-gcp-ubuntu-prod-1124739887.memory_profile.json.gz
2021-07-22 01:22:34.294930: I tensorflow/core/profiler/rpc/client/capture_profile.cc:251] Creating directory: /tmp/my_working_dir/log/plugins/profile/2021_07_22_01_22_34
Dumped tool data for xplane.pb to /tmp/my_working_dir/log/plugins/profile/2021_07_22_01_22_34/kokoro-gcp-ubuntu-prod-1124739887.xplane.pb
Dumped tool data for overview_page.pb to /tmp/my_working_dir/log/plugins/profile/2021_07_22_01_22_34/kokoro-gcp-ubuntu-prod-1124739887.overview_page.pb
Dumped tool data for input_pipeline.pb to /tmp/my_working_dir/log/plugins/profile/2021_07_22_01_22_34/kokoro-gcp-ubuntu-prod-1124739887.input_pipeline.pb
Dumped tool data for tensorflow_stats.pb to /tmp/my_working_dir/log/plugins/profile/2021_07_22_01_22_34/kokoro-gcp-ubuntu-prod-1124739887.tensorflow_stats.pb
Dumped tool data for kernel_stats.pb to /tmp/my_working_dir/log/plugins/profile/2021_07_22_01_22_34/kokoro-gcp-ubuntu-prod-1124739887.kernel_stats.pb

2021-07-22 01:22:34.380988: W tensorflow/core/framework/dataset.cc:679] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.
20/20 - 4s - loss: 0.2856 - 4s/epoch - 201ms/step
2021-07-22 01:22:34.737150: W tensorflow/python/util/util.cc:348] Sets are not currently considered sequences, but this may change in the future, so consider avoiding using them.
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
2021-07-22 01:22:34.993072: I tensorflow/core/common_runtime/eager/kernel_and_device.cc:94] Ignoring error status when releasing multi-device function handle Unimplemented: Releasing a multi-device component handle on a remote device is not yet implemented.
2021-07-22 01:22:35.067372: I tensorflow/core/common_runtime/eager/kernel_and_device.cc:94] Ignoring error status when releasing multi-device function handle Unimplemented: Releasing a multi-device component handle on a remote device is not yet implemented.
Epoch 2/5
20/20 - 0s - loss: 0.3160 - 187ms/epoch - 9ms/step
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
Epoch 3/5
20/20 - 0s - loss: 0.2000 - 32ms/epoch - 2ms/step
2021-07-22 01:22:35.567146: I tensorflow/core/common_runtime/eager/kernel_and_device.cc:94] Ignoring error status when releasing multi-device function handle Unimplemented: Releasing a multi-device component handle on a remote device is not yet implemented.
2021-07-22 01:22:35.639496: I tensorflow/core/common_runtime/eager/kernel_and_device.cc:94] Ignoring error status when releasing multi-device function handle Unimplemented: Releasing a multi-device component handle on a remote device is not yet implemented.
WARNING:tensorflow:5 out of the last 5 calls to <function MultiDeviceSaver.save.<locals>.tf_function_save at 0x7f6ce1aeb200> triggered tf.function retracing. Tracing is expensive and the excessive number of tracings could be due to (1) creating @tf.function repeatedly in a loop, (2) passing tensors with different shapes, (3) passing Python objects instead of tensors. For (1), please define your @tf.function outside of the loop. For (2), @tf.function has experimental_relax_shapes=True option that relaxes argument shapes that can avoid unnecessary retracing. For (3), please refer to https://www.tensorflow.org/guide/function#controlling_retracing and https://www.tensorflow.org/api_docs/python/tf/function for  more details.
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
WARNING:tensorflow:6 out of the last 6 calls to <function MultiDeviceSaver.save.<locals>.tf_function_save at 0x7f6cfc1e5560> triggered tf.function retracing. Tracing is expensive and the excessive number of tracings could be due to (1) creating @tf.function repeatedly in a loop, (2) passing tensors with different shapes, (3) passing Python objects instead of tensors. For (1), please define your @tf.function outside of the loop. For (2), @tf.function has experimental_relax_shapes=True option that relaxes argument shapes that can avoid unnecessary retracing. For (3), please refer to https://www.tensorflow.org/guide/function#controlling_retracing and https://www.tensorflow.org/api_docs/python/tf/function for  more details.
Epoch 4/5
20/20 - 0s - loss: 0.2395 - 32ms/epoch - 2ms/step
2021-07-22 01:22:35.986756: I tensorflow/core/common_runtime/eager/kernel_and_device.cc:94] Ignoring error status when releasing multi-device function handle Unimplemented: Releasing a multi-device component handle on a remote device is not yet implemented.
2021-07-22 01:22:36.059412: I tensorflow/core/common_runtime/eager/kernel_and_device.cc:94] Ignoring error status when releasing multi-device function handle Unimplemented: Releasing a multi-device component handle on a remote device is not yet implemented.
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
Epoch 5/5
20/20 - 0s - loss: 0.1527 - 32ms/epoch - 2ms/step
2021-07-22 01:22:36.403661: I tensorflow/core/common_runtime/eager/kernel_and_device.cc:94] Ignoring error status when releasing multi-device function handle Unimplemented: Releasing a multi-device component handle on a remote device is not yet implemented.
2021-07-22 01:22:36.475197: I tensorflow/core/common_runtime/eager/kernel_and_device.cc:94] Ignoring error status when releasing multi-device function handle Unimplemented: Releasing a multi-device component handle on a remote device is not yet implemented.
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
2021-07-22 01:22:36.818981: I tensorflow/core/common_runtime/eager/kernel_and_device.cc:94] Ignoring error status when releasing multi-device function handle Unimplemented: Releasing a multi-device component handle on a remote device is not yet implemented.
2021-07-22 01:22:36.891188: I tensorflow/core/common_runtime/eager/kernel_and_device.cc:94] Ignoring error status when releasing multi-device function handle Unimplemented: Releasing a multi-device component handle on a remote device is not yet implemented.
<keras.callbacks.History at 0x7f6e7801fc50>

Direct usage with ClusterCoordinator (optional)

Even if you choose the Model.fit training path, you can optionally instantiate a tf.distribute.experimental.coordinator.ClusterCoordinator object to schedule other functions you would like to be executed on the workers. See the Training with a custom training loop section for more details and examples.

Training with a custom training loop

Using custom training loops with tf.distribute.Strategy provides great flexibility to define training loops. With the ParameterServerStrategy defined above (as strategy), you will use a tf.distribute.experimental.coordinator.ClusterCoordinator to dispatch the execution of training steps to remote workers.

Then, you will create a model, define a dataset and a step function, as you have done in the training loop with other tf.distribute.Strategys. You can find more details in the Custom training with tf.distribute.Strategy tutorial.

To ensure efficient dataset prefetching, use the recommended distributed dataset creation APIs mentioned in the Dispatch training steps to remote workers section below. Also, make sure to call Strategy.run inside worker_fn to take full advantage of GPUs allocated to workers. The rest of the steps are the same for training with or without GPUs.

Let’s create these components in the following steps:

Set up the data

First, write a function that creates a dataset that includes preprocessing logic implemented by Keras preprocessing layers.

You will create these layers outside the dataset_fn but apply the transformation inside the dataset_fn, since you will wrap the dataset_fn into a tf.function, which doesn't allow variables to be created inside it.

feature_vocab = [
    "avenger", "ironman", "batman", "hulk", "spiderman", "kingkong", "wonder_woman"
]
label_vocab = ["yes", "no"]

with strategy.scope():
  feature_lookup_layer = preprocessing.StringLookup(
      vocabulary=feature_vocab,
      mask_token=None)
  label_lookup_layer = preprocessing.StringLookup(
      vocabulary=label_vocab,
      num_oov_indices=0,
      mask_token=None)

  raw_feature_input = tf.keras.layers.Input(
      shape=(3,),
      dtype=tf.string,
      name="feature")
  feature_id_input = feature_lookup_layer(raw_feature_input)
  feature_preprocess_stage = tf.keras.Model(
      {"features": raw_feature_input},
      feature_id_input)

  raw_label_input = tf.keras.layers.Input(
      shape=(1,),
      dtype=tf.string,
      name="label")
  label_id_input = label_lookup_layer(raw_label_input)

  label_preprocess_stage = tf.keras.Model(
      {"label": raw_label_input},
      label_id_input)

Generate toy examples in a dataset:

def feature_and_label_gen(num_examples=200):
  examples = {"features": [], "label": []}
  for _ in range(num_examples):
    features = random.sample(feature_vocab, 3)
    label = ["yes"] if "avenger" in features else ["no"]
    examples["features"].append(features)
    examples["label"].append(label)
  return examples

examples = feature_and_label_gen()

Then, create the training dataset wrapped in a dataset_fn:

def dataset_fn(_):
  raw_dataset = tf.data.Dataset.from_tensor_slices(examples)

  train_dataset = raw_dataset.map(
      lambda x: (
          {"features": feature_preprocess_stage(x["features"])},
          label_preprocess_stage(x["label"])
      )).shuffle(200).batch(32).repeat()
  return train_dataset

Build the model

Next, create the model and other objects. Make sure to create all variables under strategy.scope.

# These variables created under the `strategy.scope` will be placed on parameter
# servers in a round-robin fashion.
with strategy.scope():
  # Create the model. The input needs to be compatible with Keras processing layers.
  model_input = tf.keras.layers.Input(
      shape=(3,), dtype=tf.int64, name="model_input")

  emb_layer = tf.keras.layers.Embedding(
      input_dim=len(feature_lookup_layer.get_vocabulary()), output_dim=16384)
  emb_output = tf.reduce_mean(emb_layer(model_input), axis=1)
  dense_output = tf.keras.layers.Dense(units=1, activation="sigmoid")(emb_output)
  model = tf.keras.Model({"features": model_input}, dense_output)

  optimizer = tf.keras.optimizers.RMSprop(learning_rate=0.1)
  accuracy = tf.keras.metrics.Accuracy()

Let's confirm that the use of FixedShardsPartitioner split all variables into two shards and each shard was assigned to different parameter servers:

assert len(emb_layer.weights) == 2
assert emb_layer.weights[0].shape == (4, 16384)
assert emb_layer.weights[1].shape == (4, 16384)
assert emb_layer.weights[0].device == "/job:ps/replica:0/task:0/device:CPU:0"
assert emb_layer.weights[1].device == "/job:ps/replica:0/task:1/device:CPU:0"

Define the training step

Third, create the training step wrapped into a tf.function:

@tf.function
def step_fn(iterator):

  def replica_fn(batch_data, labels):
    with tf.GradientTape() as tape:
      pred = model(batch_data, training=True)
      per_example_loss = tf.keras.losses.BinaryCrossentropy(
              reduction=tf.keras.losses.Reduction.NONE)(labels, pred)
      loss = tf.nn.compute_average_loss(per_example_loss)
      gradients = tape.gradient(loss, model.trainable_variables)

    optimizer.apply_gradients(zip(gradients, model.trainable_variables))

    actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
    accuracy.update_state(labels, actual_pred)
    return loss

  batch_data, labels = next(iterator)
  losses = strategy.run(replica_fn, args=(batch_data, labels))
  return strategy.reduce(tf.distribute.ReduceOp.SUM, losses, axis=None)

In the above training step function, calling Strategy.run and Strategy.reduce in the step_fn can support multiple GPUs per worker. If the workers have GPUs allocated, Strategy.run will distribute the datasets on multiple replicas.

Dispatch training steps to remote workers

After all the computations are defined by ParameterServerStrategy, you will use the tf.distribute.experimental.coordinator.ClusterCoordinator class to create resources and distribute the training steps to remote workers.

Let’s first create a ClusterCoordinator object and pass in the strategy object:

coordinator = tf.distribute.experimental.coordinator.ClusterCoordinator(strategy)

Then, create a per-worker dataset and an iterator. In the per_worker_dataset_fn below, wrapping the dataset_fn into strategy.distribute_datasets_from_function is recommended to allow efficient prefetching to GPUs seamlessly.

@tf.function
def per_worker_dataset_fn():
  return strategy.distribute_datasets_from_function(dataset_fn)

per_worker_dataset = coordinator.create_per_worker_dataset(per_worker_dataset_fn)
per_worker_iterator = iter(per_worker_dataset)
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).

The final step is to distribute the computation to remote workers using ClusterCoordinator.schedule:

  • The schedule method enqueues a tf.function and returns a future-like RemoteValue immediately. The queued functions will be dispatched to remote workers in background threads and the RemoteValue will be filled asynchronously.
  • The join method (ClusterCoordinator.join) can be used to wait until all scheduled functions are executed.
num_epoches = 4
steps_per_epoch = 5
for i in range(num_epoches):
  accuracy.reset_states()
  for _ in range(steps_per_epoch):
    coordinator.schedule(step_fn, args=(per_worker_iterator,))
  # Wait at epoch boundaries.
  coordinator.join()
  print ("Finished epoch %d, accuracy is %f." % (i, accuracy.result().numpy()))
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
Finished epoch 0, accuracy is 0.668750.
Finished epoch 1, accuracy is 0.450000.
Finished epoch 2, accuracy is 1.000000.
Finished epoch 3, accuracy is 1.000000.

Here is how you can fetch the result of a RemoteValue:

loss = coordinator.schedule(step_fn, args=(per_worker_iterator,))
print ("Final loss is %f" % loss.fetch())
Final loss is 0.000000

Alternatively, you can launch all steps and do something while waiting for completion:

for _ in range(total_steps):
  coordinator.schedule(step_fn, args=(per_worker_iterator,))
while not coordinator.done():
  time.sleep(10)
  # Do something like logging metrics or writing checkpoints.

For the complete training and serving workflow for this particular example, please check out this test.

More about dataset creation

The dataset in the above code is created using the ClusterCoordinator.create_per_worker_dataset API). It creates one dataset per worker and returns a container object. You can call the iter method on it to create a per-worker iterator. The per-worker iterator contains one iterator per worker and the corresponding slice of a worker will be substituted in the input argument of the function passed to the ClusterCoordinator.schedule method before the function is executed on a particular worker.

Currently, the ClusterCoordinator.schedule method assumes workers are equivalent and thus assumes the datasets on different workers are the same except they may be shuffled differently if they contain a Dataset.shuffle operation. Because of this, it is also recommended that the datasets to be repeated indefinitely and you schedule a finite number of steps instead of relying on the OutOfRangeError from a dataset.

Another important note is that tf.data datasets don’t support implicit serialization and deserialization across task boundaries. So it is important to create the whole dataset inside the function passed to ClusterCoordinator.create_per_worker_dataset.

Evaluation

There is more than one way to define and run an evaluation loop in distributed training. Each has its own pros and cons as described below. The inline evaluation method is recommended if you don't have a preference.

Inline evaluation

In this method, the coordinator alternates between training and evaluation and thus it is called it inline evaluation.

There are several benefits of inline evaluation. For example:

  • It can support large evaluation models and evaluation datasets that a single task cannot hold.
  • The evaluation results can be used to make decisions for training the next epoch.

There are two ways to implement inline evaluation: direct evaluation and distributed evaluation.

  • Direct evaluation: For small models and evaluation datasets, the coordinator can run evaluation directly on the distributed model with the evaluation dataset on the coordinator:
eval_dataset = tf.data.Dataset.from_tensor_slices(
      feature_and_label_gen(num_examples=16)).map(
          lambda x: (
              {"features": feature_preprocess_stage(x["features"])},
              label_preprocess_stage(x["label"])
          )).batch(8)

eval_accuracy = tf.keras.metrics.Accuracy()

for batch_data, labels in eval_dataset:
  pred = model(batch_data, training=False)
  actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
  eval_accuracy.update_state(labels, actual_pred)

print ("Evaluation accuracy: %f" % eval_accuracy.result())
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).
Evaluation accuracy: 1.000000
with strategy.scope():
  # Define the eval metric on parameter servers.
  eval_accuracy = tf.keras.metrics.Accuracy()

@tf.function
def eval_step(iterator):
  def replica_fn(batch_data, labels):
    pred = model(batch_data, training=False)
    actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
    eval_accuracy.update_state(labels, actual_pred)
  batch_data, labels = next(iterator)
  strategy.run(replica_fn, args=(batch_data, labels))

def eval_dataset_fn():
  return tf.data.Dataset.from_tensor_slices(
      feature_and_label_gen(num_examples=16)).map(
          lambda x: (
              {"features": feature_preprocess_stage(x["features"])},
              label_preprocess_stage(x["label"])
          )).shuffle(16).repeat().batch(8)

per_worker_eval_dataset = coordinator.create_per_worker_dataset(eval_dataset_fn)
per_worker_eval_iterator = iter(per_worker_eval_dataset)

eval_steps_per_epoch = 2
for _ in range(eval_steps_per_epoch):
  coordinator.schedule(eval_step, args=(per_worker_eval_iterator,))
coordinator.join()
print ("Evaluation accuracy: %f" % eval_accuracy.result())
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).
Evaluation accuracy: 1.000000

Side-car evaluation

Another method is called side-car evaluation where you create a dedicated evaluator task that repeatedly reads checkpoints and runs evaluation on a latest checkpoint. It allows your training program to finish early if you don't need to change your training loop based on evaluation results. However, it requires an additional evaluator task and periodic checkpointing to trigger evaluation. Following is a possible side-car evaluation loop:

checkpoint_dir = ...
eval_model = ...
eval_data = ...
checkpoint = tf.train.Checkpoint(model=eval_model)

for latest_checkpoint in tf.train.checkpoints_iterator(
    checkpoint_dir):
  try:
    checkpoint.restore(latest_checkpoint).expect_partial()
  except (tf.errors.OpError,) as e:
    # checkpoint may be deleted by training when it is about to read it.
    continue

  # Optionally add callbacks to write summaries.
  eval_model.evaluate(eval_data)

  # Evaluation finishes when it has evaluated the last epoch.
  if latest_checkpoint.endswith('-{}'.format(train_epoches)):
    break

Clusters in the real world

In a real production environment, you will run all tasks in different processes on different machines. The simplest way to configure cluster information on each task is to set "TF_CONFIG" environment variables and use a tf.distribute.cluster_resolver.TFConfigClusterResolver to parse "TF_CONFIG".

For a general description about "TF_CONFIG" environment variables, refer to the Distributed training guide.

If you start your training tasks using Kubernetes or other configuration templates, it is very likely that these templates have already set “TF_CONFIG" for you.

Set the "TF_CONFIG" environment variable

Suppose you have 3 workers and 2 parameter servers, the "TF_CONFIG" of worker 1 can be:

os.environ["TF_CONFIG"] = json.dumps({
    "cluster": {
        "worker": ["host1:port", "host2:port", "host3:port"],
        "ps": ["host4:port", "host5:port"],
        "chief": ["host6:port"]
    },
    "task": {"type": "worker", "index": 1}
})

The "TF_CONFIG" of the evaluator can be:

os.environ["TF_CONFIG"] = json.dumps({
    "cluster": {
        "evaluator": ["host7:port"]
    },
    "task": {"type": "evaluator", "index": 0}
})

The "cluster" part in the above "TF_CONFIG" string for the evaluator is optional.

If you use the same binary for all tasks

If you prefer to run all these tasks using a single binary, you will need to let your program branch into different roles at the very beginning:

cluster_resolver = tf.distribute.cluster_resolver.TFConfigClusterResolver()
if cluster_resolver.task_type in ("worker", "ps"):
  # Start a TensorFlow server and wait.
elif cluster_resolver.task_type == "evaluator":
  # Run side-car evaluation
else:
  # Run the coordinator.

The following code starts a TensorFlow server and waits:

# Set the environment variable to allow reporting worker and ps failure to the
# coordinator. This is a workaround and won't be necessary in the future.
os.environ["GRPC_FAIL_FAST"] = "use_caller"

server = tf.distribute.Server(
    cluster_resolver.cluster_spec(),
    job_name=cluster_resolver.task_type,
    task_index=cluster_resolver.task_id,
    protocol=cluster_resolver.rpc_layer or "grpc",
    start=True)
server.join()

Handling task failure

Worker failure

tf.distribute.experimental.coordinator.ClusterCoordinator or Model.fit provide built-in fault tolerance for worker failure. Upon worker recovery, the previously provided dataset function (either to ClusterCoordinator.create_per_worker_dataset for a custom training loop, or tf.keras.utils.experimental.DatasetCreator for Model.fit) will be invoked on the workers to re-create the datasets.

Parameter server or coordinator failure

However, when the coordinator sees a parameter server error, it will raise an UnavailableError or AbortedError immediately. You can restart the coordinator in this case. The coordinator itself can also become unavailable. Therefore, certain tooling is recommended in order to not lose the training progress:

  • For Model.fit, you should use a BackupAndRestore callback, which handles the progress saving and restoration automatically. See Callbacks and training section above for an example.

  • For a custom training loop, you should checkpoint the model variables periodically and load model variables from a checkpoint, if any, before training starts. The training progress can be inferred approximately from optimizer.iterations if an optimizer is checkpointed:

checkpoint_manager = tf.train.CheckpointManager(
    tf.train.Checkpoint(model=model, optimizer=optimizer),
    checkpoint_dir,
    max_to_keep=3)
if checkpoint_manager.latest_checkpoint:
  checkpoint = checkpoint_manager.checkpoint
  checkpoint.restore(
      checkpoint_manager.latest_checkpoint).assert_existing_objects_matched()

global_steps = int(optimizer.iterations.numpy())
starting_epoch = global_steps // steps_per_epoch

for _ in range(starting_epoch, num_epoches):
  for _ in range(steps_per_epoch):
    coordinator.schedule(step_fn, args=(per_worker_iterator,))
  coordinator.join()
  checkpoint_manager.save()

Fetching a RemoteValue

Fetching a RemoteValue is guaranteed to succeed if a function is executed successfully. This is because currently the return value is immediately copied to the coordinator after a function is executed. If there is any worker failure during the copy, the function will be retried on another available worker. Therefore, if you want to optimize for performance, you can schedule functions without a return value.

Error reporting

Once the coordinator sees an error such as UnavailableError from parameter servers or other application errors such as an InvalidArgument from tf.debugging.check_numerics, it will cancel all pending and queued functions before raising the error. Fetching their corresponding RemoteValues will raise a CancelledError.

After an error is raised, the coordinator will not raise the same error or any error from cancelled functions.

Performance improvement

There are several possible reasons if you see performance issues when you train with ParameterServerStrategy and ClusterResolver.

One common reason is parameter servers have unbalanced load and some heavily-loaded parameter servers have reached capacity. There can also be multiple root causes. Some simple methods to mitigate this issue are to:

  1. Shard your large model variables via specifying a variable_partitioner when constructing a ParameterServerStrategy.
  2. Avoid creating a hotspot variable that is required by all parameter servers in a single step if possible. For example, use a constant learning rate or subclass tf.keras.optimizers.schedules.LearningRateSchedule in optimizers since the default behavior is that the learning rate will become a variable placed on a particular parameter server and requested by all other parameter servers in each step.
  3. Shuffle your large vocabularies before passing them to Keras preprocessing layers.

Another possible reason for performance issues is the coordinator. Your first implementation of schedule/join is Python-based and thus may have threading overhead. Also the latency between the coordinator and the workers can be large. If this is the case,

  • For Model.fit, you can set steps_per_execution argument provided at Model.compile to a value larger than 1.

  • For a custom training loop, you can pack multiple steps into a single tf.function:

steps_per_invocation = 10

@tf.function
def step_fn(iterator):
  for _ in range(steps_per_invocation):
    features, labels = next(iterator)
    def replica_fn(features, labels):
      ...

    strategy.run(replica_fn, args=(features, labels))

As the library is optimized further, hopefully most users won't have to manually pack steps in the future.

In addition, a small trick for performance improvement is to schedule functions without a return value as explained in the handling task failure section above.

Known limitations

Most of the known limitations are already covered in the above sections. This section provides a summary.

ParameterServerStrategy general

  • os.environment["grpc_fail_fast"]="use_caller" is needed on every task including the coordinator, to make fault tolerance work properly.
  • Synchronous parameter server training is not supported.
  • It is usually necessary to pack multiple steps into a single function to achieve optimal performance.
  • It is not supported to load a saved_model via tf.saved_model.load containing sharded variables. Note loading such a saved_model using TensorFlow Serving is expected to work.
  • It is not supported to load a checkpoint containing sharded optimizer slot variables into a different number of shards.
  • It is not supported to recover from parameter server failure without restarting the coordinator task.
  • Usage of tf.lookup.StaticHashTable (which is commonly employed by some tf.keras.layers.experimental.preprocessing layers, such as IntegerLookup, StringLookup, and TextVectorization) results in resources placed on the coordinator at this time with parameter server training. This has performance implications for lookup RPCs from workers to the coordinator. This is a current high priority to address.

Model.fit specifics

  • steps_per_epoch argument is required in Model.fit. You can select a value that provides appropriate intervals in an epoch.
  • ParameterServerStrategy does not have support for custom callbacks that have batch-level calls for performance reasons. You should convert those calls into epoch-level calls with suitably picked steps_per_epoch, so that they are called every steps_per_epoch number of steps. Built-in callbacks are not affected: their batch-level calls have been modified to be performant. Supporting batch-level calls for ParameterServerStrategy is being planned.
  • For the same reason, unlike other strategies, progress bar and metrics are logged only at epoch boundaries.
  • run_eagerly is not supported.

Custom training loop specifics