tf.data.experimental.service.WorkerServer

View source on GitHub

An in-process tf.data service worker server.

A tf.data.experimental.service.WorkerServer performs tf.data.Dataset processing for user-defined datasets, and provides the resulting elements over RPC. A worker is associated with a single tf.data.experimental.service.DispatchServer.

dispatcher = tf.data.experimental.service.DispatchServer(port=0)
dispatcher_address = dispatcher.target.split("://")[1]
worker = tf.data.experimental.service.WorkerServer(
    port=0, dispatcher_address=dispatcher_address)
dataset = tf.data.Dataset.range(10)
dataset = dataset.apply(tf.data.experimental.service.distribute(
    processing_mode="parallel_epochs", service=dispatcher.target))
print(list(dataset.as_numpy_iterator()))
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

When starting a dedicated tf.data worker process, use join() to block indefinitely after starting up the server.

worker = tf.data.experimental.service.WorkerServer(
    port=5051, dispatcher_address="grpc://localhost:5050")
worker.join()

port Specifies the port to bind to. A value of 0 indicates that the worker can bind to any available port.
dispatcher_address Specifies the address of the dispatcher.
worker_address (Optional.) Specifies the address of the worker server. This address is passed to the dispatcher so that the dispatcher can tell clients how to connect to this worker. Defaults to "localhost:%port%", where %port% will be replaced with the port used by the worker.
protocol (Optional.) Specifies the protocol to be used by the server. Acceptable values include "grpc", "grpc+local". Defaults to "grpc".
start (Optional.) Boolean, indicating whether to start the server after creating it. Defaults to True.

tf.errors.OpError Or one of its subclasses if an error occurs while creating the TensorFlow server.

Methods

join

View source

Blocks until the server has shut down.

This is useful when starting a dedicated worker process.

worker_server = tf.data.experimental.service.WorkerServer(
    port=5051, dispatcher_address="grpc://localhost:5050")
worker_server.join()

This method currently blocks forever.

Raises
tf.errors.OpError Or one of its subclasses if an error occurs while joining the server.

start

View source

Starts this server.

Raises
tf.errors.OpError Or one of its subclasses if an error occurs while starting the server.