tf.data.experimental.service.WorkerServer
Stay organized with collections
Save and categorize content based on your preferences.
An in-process tf.data service worker server.
tf.data.experimental.service.WorkerServer(
config, start=True
)
A tf.data.experimental.service.WorkerServer
performs tf.data.Dataset
processing for user-defined datasets, and provides the resulting elements over
RPC. A worker is associated with a single
tf.data.experimental.service.DispatchServer
.
dispatcher = tf.data.experimental.service.DispatchServer()
dispatcher_address = dispatcher.target.split("://")[1]
worker = tf.data.experimental.service.WorkerServer(
tf.data.experimental.service.WorkerConfig(
dispatcher_address=dispatcher_address))
dataset = tf.data.Dataset.range(10)
dataset = dataset.apply(tf.data.experimental.service.distribute(
processing_mode="parallel_epochs", service=dispatcher.target))
print(list(dataset.as_numpy_iterator()))
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
When starting a dedicated tf.data worker process, use join() to block
after starting up the worker, until the worker terminates.
worker = tf.data.experimental.service.WorkerServer(
port=5051, dispatcher_address="localhost:5050")
worker.join()
Call stop() to gracefully terminate the worker. The worker automatically stops
when all reference to it have been deleted.
Methods
join
View source
join() -> None
Blocks until the server has shut down.
This is useful when starting a dedicated worker process.
worker_server = tf.data.experimental.service.WorkerServer(
port=5051, dispatcher_address="localhost:5050")
worker_server.join()
This method currently blocks forever.
Raises |
tf.errors.OpError
|
Or one of its subclasses if an error occurs while
joining the server.
|
start
View source
start() -> None
Starts this server.
Raises |
tf.errors.OpError
|
Or one of its subclasses if an error occurs while
starting the server.
|
stop
View source
stop() -> None
Stops the server.
Raises |
tf.errors.OpError
|
Or one of its subclasses if an error occurs while
stopping the server.
|
Except as otherwise noted, the content of this page is licensed under the Creative Commons Attribution 4.0 License, and code samples are licensed under the Apache 2.0 License. For details, see the Google Developers Site Policies. Java is a registered trademark of Oracle and/or its affiliates. Some content is licensed under the numpy license.
Last updated 2024-04-26 UTC.
[null,null,["Last updated 2024-04-26 UTC."],[],[],null,["# tf.data.experimental.service.WorkerServer\n\n\u003cbr /\u003e\n\n|----------------------------------------------------------------------------------------------------------------------------------------------------|\n| [View source on GitHub](https://github.com/tensorflow/tensorflow/blob/v2.16.1/tensorflow/python/data/experimental/service/server_lib.py#L348-L478) |\n\nAn in-process tf.data service worker server. \n\n tf.data.experimental.service.WorkerServer(\n config, start=True\n )\n\nA [`tf.data.experimental.service.WorkerServer`](../../../../tf/data/experimental/service/WorkerServer) performs [`tf.data.Dataset`](../../../../tf/data/Dataset)\nprocessing for user-defined datasets, and provides the resulting elements over\nRPC. A worker is associated with a single\n[`tf.data.experimental.service.DispatchServer`](../../../../tf/data/experimental/service/DispatchServer). \n\n dispatcher = tf.data.experimental.service.DispatchServer()\n dispatcher_address = dispatcher.target.split(\"://\")[1]\n worker = tf.data.experimental.service.WorkerServer(\n tf.data.experimental.service.WorkerConfig(\n dispatcher_address=dispatcher_address))\n dataset = tf.data.Dataset.range(10)\n dataset = dataset.apply(tf.data.experimental.service.distribute(\n processing_mode=\"parallel_epochs\", service=dispatcher.target))\n print(list(dataset.as_numpy_iterator()))\n [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]\n\nWhen starting a dedicated tf.data worker process, use join() to block\nafter starting up the worker, until the worker terminates. \n\n worker = tf.data.experimental.service.WorkerServer(\n port=5051, dispatcher_address=\"localhost:5050\")\n worker.join()\n\nCall stop() to gracefully terminate the worker. The worker automatically stops\nwhen all reference to it have been deleted.\n\n\u003cbr /\u003e\n\n\u003cbr /\u003e\n\n\u003cbr /\u003e\n\n| Args ---- ||\n|----------|----------------------------------------------------------------------------------------------------------------------|\n| `config` | A [`tf.data.experimental.service.WorkerConfig`](../../../../tf/data/experimental/service/WorkerConfig) configration. |\n| `start` | (Optional.) Boolean, indicating whether to start the server after creating it. Defaults to True. |\n\n\u003cbr /\u003e\n\nMethods\n-------\n\n### `join`\n\n[View source](https://github.com/tensorflow/tensorflow/blob/v2.16.1/tensorflow/python/data/experimental/service/server_lib.py#L419-L436) \n\n join() -\u003e None\n\nBlocks until the server has shut down.\n\nThis is useful when starting a dedicated worker process. \n\n worker_server = tf.data.experimental.service.WorkerServer(\n port=5051, dispatcher_address=\"localhost:5050\")\n worker_server.join()\n\nThis method currently blocks forever.\n\n\u003cbr /\u003e\n\n\u003cbr /\u003e\n\n\u003cbr /\u003e\n\n| Raises ||\n|-------------------------------------------------------------------------------------|-----------------------------------------------------------------------|\n| [`tf.errors.OpError`](https://www.tensorflow.org/api_docs/python/tf/errors/OpError) | Or one of its subclasses if an error occurs while joining the server. |\n\n\u003cbr /\u003e\n\n### `start`\n\n[View source](https://github.com/tensorflow/tensorflow/blob/v2.16.1/tensorflow/python/data/experimental/service/server_lib.py#L410-L417) \n\n start() -\u003e None\n\nStarts this server.\n\n\u003cbr /\u003e\n\n\u003cbr /\u003e\n\n\u003cbr /\u003e\n\n| Raises ||\n|-------------------------------------------------------------------------------------|------------------------------------------------------------------------|\n| [`tf.errors.OpError`](https://www.tensorflow.org/api_docs/python/tf/errors/OpError) | Or one of its subclasses if an error occurs while starting the server. |\n\n\u003cbr /\u003e\n\n### `stop`\n\n[View source](https://github.com/tensorflow/tensorflow/blob/v2.16.1/tensorflow/python/data/experimental/service/server_lib.py#L438-L445) \n\n stop() -\u003e None\n\nStops the server.\n\n\u003cbr /\u003e\n\n\u003cbr /\u003e\n\n\u003cbr /\u003e\n\n| Raises ||\n|-------------------------------------------------------------------------------------|------------------------------------------------------------------------|\n| [`tf.errors.OpError`](https://www.tensorflow.org/api_docs/python/tf/errors/OpError) | Or one of its subclasses if an error occurs while stopping the server. |\n\n\u003cbr /\u003e"]]