tf.data.experimental.service.DispatchServer

An in-process tf.data service dispatch server.

A tf.data.experimental.service.DispatchServer coordinates a cluster of tf.data.experimental.service.WorkerServers. When the workers start, they register themselves with the dispatcher.

dispatcher = tf.data.experimental.service.DispatchServer()
dispatcher_address = dispatcher.target.split("://")[1]
worker = tf.data.experimental.service.WorkerServer(WorkerConfig(
    dispatcher_address=dispatcher_address))
dataset = tf.data.Dataset.range(10)
dataset = dataset.apply(tf.data.experimental.service.distribute(
    processing_mode="parallel_epochs", service=dispatcher.target))
print(list(dataset.as_numpy_iterator()))
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

When starting a dedicated tf.data dispatch process, use join() to block indefinitely after starting up the server.

dispatcher = tf.data.experimental.service.DispatchServer(
    tf.data.experimental.service.DispatcherConfig(port=5050))
dispatcher.join()

To start a DispatchServer in fault-tolerant mode, set work_dir and fault_tolerant_mode like below:

dispatcher = tf.data.experimental.service.DispatchServer(
    tf.data.experimental.service.DispatcherConfig(
        port=5050,
        work_dir="gs://my-bucket/dispatcher/work_dir",
        fault_tolerant_mode=True))

config (Optional.) A tf.data.experimental.service.DispatcherConfig configration. If None, the dispatcher will use default configuration values.
start (Optional.) Boolean, indicating whether to start the server after creating it. Defaults to True.

target Returns a target that can be used to connect to the server.

dispatcher = tf.data.experimental.service.DispatchServer()
dataset = tf.data.Dataset.range(10)
dataset = dataset.apply(tf.data.experimental.service.distribute(
    processing_mode="parallel_epochs", service=dispatcher.target))

The returned string will be in the form protocol://address, e.g. "grpc://localhost:5050".

Methods

join

View source

Blocks until the server has shut down.

This is useful when starting a dedicated dispatch process.

dispatcher = tf.data.experimental.service.DispatchServer(
    tf.data.experimental.service.DispatcherConfig(port=5050))
dispatcher.join()

Raises
tf.errors.OpError Or one of its subclasses if an error occurs while joining the server.

start

View source

Starts this server.

dispatcher = tf.data.experimental.service.DispatchServer(start=False)
dispatcher.start()

Raises
tf.errors.OpError Or one of its subclasses if an error occurs while starting the server.