tf.data.experimental.service.WorkerServer

An in-process tf.data service worker server.

A tf.data.experimental.service.WorkerServer performs tf.data.Dataset processing for user-defined datasets, and provides the resulting elements over RPC. A worker is associated with a single tf.data.experimental.service.DispatchServer.

dispatcher = tf.data.experimental.service.DispatchServer()
dispatcher_address = dispatcher.target.split("://")[1]
worker = tf.data.experimental.service.WorkerServer(
    tf.data.experimental.service.WorkerConfig(
        dispatcher_address=dispatcher_address))
dataset = tf.data.Dataset.range(10)
dataset = dataset.apply(tf.data.experimental.service.distribute(
    processing_mode="parallel_epochs", service=dispatcher.target))
print(list(dataset.as_numpy_iterator()))
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

When starting a dedicated tf.data worker process, use join() to block indefinitely after starting up the server.

worker = tf.data.experimental.service.WorkerServer(
    port=5051, dispatcher_address="grpc://localhost:5050")
worker.join()

config A tf.data.experimental.service.WorkerConfig configration.
start (Optional.) Boolean, indicating whether to start the server after creating it. Defaults to True.

Methods

join

View source

Blocks until the server has shut down.

This is useful when starting a dedicated worker process.

worker_server = tf.data.experimental.service.WorkerServer(
    port=5051, dispatcher_address="grpc://localhost:5050")
worker_server.join()

This method currently blocks forever.

Raises
tf.errors.OpError Or one of its subclasses if an error occurs while joining the server.

start

View source

Starts this server.

Raises
tf.errors.OpError Or one of its subclasses if an error occurs while starting the server.