Help protect the Great Barrier Reef with TensorFlow on Kaggle Join Challenge


API for using the service.

This module contains:

  1. server implementations for running the service.
  2. APIs for registering datasets with the service and reading from the registered datasets.

The service provides the following benefits:

  • Horizontal scaling of input pipeline processing to solve input bottlenecks.
  • Data coordination for distributed training. Coordinated reads enable all replicas to train on similar-length examples across each global training step, improving step times in synchronous training.
  • Dynamic balancing of data across training replicas.
dispatcher =
dispatcher_address ="://")[1]
worker =
dataset =
dataset = dataset.apply(,
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]


This section goes over how to set up the service.

Run servers

The service consists of one dispatch server and n worker servers. servers should be brought up alongside your training jobs, then brought down when the jobs are finished. Use to start a dispatch server, and to start worker servers. Servers can be run in the same process for testing purposes, or scaled up on separate machines.

See for an example of using Google Kubernetes Engine (GKE) to manage the service. Note that the server implementation in is not GKE-specific, and can be used to run the service in other contexts.

Custom ops

If your dataset uses custom ops, these ops need to be made available to servers by calling load_op_library from the dispatcher and worker processes at startup.


Users interact with service by programmatically registering their datasets with service, then creating datasets that read from the registered datasets. The register_dataset function registers a dataset, then the from_dataset_id function creates a new dataset which reads from the registered dataset. The distribute function wraps register_dataset and from_dataset_id into a single convenient transformation which registers its input dataset and then reads from it. distribute enables service to be used with a one-line code change. However, it assumes that the dataset is created and consumed by the same entity and this assumption might not always be valid or desirable. In particular, in certain scenarios, such as distributed training, it might be desirable to decouple the creation and consumption of the dataset (via register_dataset and from_dataset_id respectively) to avoid having to create the dataset on each of the training workers.



To use the distribute transformation, apply the transformation after the prefix of your input pipeline that you would like to be executed using service (typically at the end).

dataset = ...  # Define your dataset here.
# Move dataset processing from the local machine to the service
dataset = dataset.apply(,
# Any transformations added after `distribute` will be run on the local machine.
dataset = dataset.prefetch(1)

The above code will create a service "job", which iterates through the dataset to generate data. To share the data from a job across multiple clients (e.g. when using TPUStrategy or MultiWorkerMirroredStrategy), set a common job_name across all clients.

register_dataset and from_dataset_id

register_dataset registers a dataset with the service, returning a dataset id for the registered dataset. from_dataset_id creates a dataset that reads from the registered dataset. These APIs can be used to reduce dataset building time for distributed training. Instead of building the dataset on all training workers, we can build the dataset just once and then register the dataset using register_dataset. Then all workers can call from_dataset_id without needing to build the dataset themselves.

dataset = ...  # Define your dataset here.
dataset_id =
# Use `from_dataset_id` to create per-worker datasets.
per_worker_datasets = {}
for worker in workers:
  per_worker_datasets[worker] =,

Processing Modes

processing_mode specifies how to shard a dataset among service workers. service supports OFF, DYNAMIC, FILE, DATA, FILE_OR_DATA, HINT sharding policies.

OFF: No sharding will be performed. The entire input dataset will be processed independently by each of the service workers. For this reason, it is important to shuffle data (e.g. filenames) non-deterministically, so that each worker will process the elements of the dataset in a different order. This mode can be used to distribute datasets that aren't splittable.

If a worker is added or restarted during ShardingPolicy.OFF processing, the worker will instantiate a new copy of the dataset and begin producing data from the beginning.

Dynamic Sharding

DYNAMIC: In this mode, service divides the dataset into two components: a source component that generates "splits" such as filenames, and a processing component that takes splits and outputs dataset elements. The source component is executed in a centralized fashion by the service dispatcher, which generates different splits of input data. The processing component is executed in a parallel fashion by the service workers, each operating on a different set of input data splits.

For example, consider the following dataset:

dataset =
dataset = dataset.interleave(TFRecordDataset)
dataset =
dataset = dataset.batch(batch_size)
dataset = dataset.apply(,

The from_tensor_slices will be run on the dispatcher, while the interleave, map, and batch will be run on service workers. The workers will pull filenames from the dispatcher for processing. To process a dataset with dynamic sharding, the dataset must have a splittable source, and all of its transformations must be compatible with splitting. While most sources and transformations support splitting, there are exceptions, such as custom datasets which may not implement the splitting API. Please file a Github issue if you would like to use distributed epoch processing for a currently unsupported dataset source or transformation.

If no workers are restarted during training, dynamic sharding mode will visit every example exactly once. If workers are restarted during training, the splits they were processing will not be fully visited. The dispatcher maintains a cursor through the dataset's splits. Assuming fault tolerance is enabled (See "Fault Tolerance" below), the dispatcher will store cursor state in write-ahead logs so that the cursor can be restored in case the dispatcher is restarted mid-training. This provides an at-most-once visitation guarantee in the presence of server restarts.

Static Sharding

The following are static sharding policies. The semantics are similar to These policies require:

  • The service cluster is configured with a fixed list of workers in DispatcherConfig.
  • Each client only reads from the local service worker.

If a worker is restarted while performing static sharding, the worker will begin processing its shard again from the beginning.

FILE: Shards by input files (i.e. each worker will get a fixed set of files to process). When this option is selected, make sure that there is at least as