![]() |
![]() |
A one-machine strategy that puts all variables on a single device.
Inherits From: Strategy
tf.distribute.experimental.CentralStorageStrategy(
compute_devices=None, parameter_device=None
)
Variables are assigned to local CPU or the only GPU. If there is more than one GPU, compute operations (other than variable update operations) will be replicated across all GPUs.
For Example:
strategy = tf.distribute.experimental.CentralStorageStrategy()
# Create a dataset
ds = tf.data.Dataset.range(5).batch(2)
# Distribute that dataset
dist_dataset = strategy.experimental_distribute_dataset(ds)
with strategy.scope():
@tf.function
def train_step(val):
return val + 1
# Iterate over the distributed dataset
for x in dist_dataset:
# process dataset elements
strategy.run(train_step, args=(x,))
Attributes | |
---|---|
cluster_resolver
|
Returns the cluster resolver associated with this strategy.
In general, when using a multi-worker Strategies that intend to have an associated
Single-worker strategies usually do not have a
The
For more information, please see
|
extended
|
tf.distribute.StrategyExtended with additional methods.
|
num_replicas_in_sync
|
Returns number of replicas over which gradients are aggregated. |
Methods
experimental_assign_to_logical_device
experimental_assign_to_logical_device(
tensor, logical_device_id
)
Adds annotation that tensor
will be assigned to a logical device.
# Initializing TPU system with 2 logical devices and 4 replicas.
resolver = tf.distribute.cluster_resolver.TPUClusterResolver(tpu='')
tf.config.experimental_connect_to_cluster(resolver)
topology = tf.tpu.experimental.initialize_tpu_system(resolver)
device_assignment = tf.tpu.experimental.DeviceAssignment.build(
topology,
computation_shape=[1, 1, 1, 2],
num_replicas=4)
strategy = tf.distribute.TPUStrategy(
resolver, experimental_device_assignment=device_assignment)
iterator = iter(inputs)
@tf.function()
def step_fn(inputs):
output = tf.add(inputs, inputs)
# Add operation will be executed on logical device 0.
output = strategy.experimental_assign_to_logical_device(output, 0)
return output
strategy.run(step_fn, args=(next(iterator),))
Args | |
---|---|
tensor
|
Input tensor to annotate. |
logical_device_id
|
Id of the logical core to which the tensor will be assigned. |
Raises | |
---|---|
ValueError
|
The logical device id presented is not consistent with total number of partitions specified by the device assignment. |
Returns | |
---|---|
Annotated tensor with idential value as tensor .
|
experimental_distribute_dataset
experimental_distribute_dataset(
dataset
)
Distributes a tf.data.Dataset instance provided via dataset.
The returned dataset is a wrapped strategy dataset which creates a multidevice iterator under the hood. It prefetches the input data to the specified devices on the worker. The returned distributed dataset can be iterated over similar to how regular datasets can.
For Example:
strategy = tf.distribute.CentralStorageStrategy() # with 1 CPU and 1 GPU
dataset = tf.data.Dataset.range(10).batch(2)
dist_dataset = strategy.experimental_distribute_dataset(dataset)
for x in dist_dataset:
print(x) # Prints PerReplica values [0, 1], [2, 3],...
Args:
dataset: tf.data.Dataset
to be prefetched to device.
Returns | |
---|---|
A "distributed Dataset " that the caller can iterate over.
|
experimental_distribute_datasets_from_function
experimental_distribute_datasets_from_function(
dataset_fn
)
Distributes tf.data.Dataset
instances created by calls to dataset_fn
.
dataset_fn
will be called once for each worker in the strategy. In this
case, we only have one worker so dataset_fn
is called once. Each replica
on this worker will then dequeue a batch of elements from this local
dataset.
The dataset_fn
should take an tf.distribute.InputContext
instance where
information about batching and input replication can be accessed.
For Example:
def dataset_fn(input_context):
batch_size = input_context.get_per_replica_batch_size(global_batch_size)
d = tf.data.Dataset.from_tensors([[1.]]).repeat().batch(batch_size)
return d.shard(
input_context.num_input_pipelines, input_context.input_pipeline_id)
inputs = strategy.experimental_distribute_datasets_from_function(dataset_fn)
for batch in inputs:
replica_results = strategy.run(replica_fn, args=(batch,))
Args | |
---|---|
dataset_fn
|
A function taking a tf.distribute.InputContext instance and
returning a tf.data.Dataset .
|
Returns | |
---|---|
A "distributed Dataset ", which the caller can iterate over like regular
datasets.
|
experimental_distribute_values_from_function
experimental_distribute_values_from_function(
value_fn
)
Generates tf.distribute.DistributedValues
from value_fn
.
This function is to generate tf.distribute.DistributedValues
to pass
into run
, reduce
, or other methods that take
distributed values when not using datasets.
Args | |
---|---|
value_fn
|
The function to run to generate values. It is called for
each replica with tf.distribute.ValueContext as the sole argument. It
must return a Tensor or a type that can be converted to a Tensor.
|
Returns | |
---|---|
A tf.distribute.DistributedValues containing a value for each replica.
|
Example usage:
- Return constant value per replica:
strategy = tf.distribute.MirroredStrategy()
def value_fn(ctx):
return tf.constant(1.)
distributed_values = (
strategy.experimental_distribute_values_from_function(
value_fn))
local_result = strategy.experimental_local_results(distributed_values)
local_result
(<tf.Tensor: shape=(), dtype=float32, numpy=1.0>,)
- Distribute values in array based on replica_id:
strategy = tf.distribute.MirroredStrategy()
array_value = np.array([3., 2., 1.])
def value_fn(ctx):
return array_value[ctx.replica_id_in_sync_group]
distributed_values = (
strategy.experimental_distribute_values_from_function(
value_fn))
local_result = strategy.experimental_local_results(distributed_values)
local_result
(3.0,)
- Specify values using num_replicas_in_sync:
strategy = tf.distribute.MirroredStrategy()
def value_fn(ctx):
return ctx.num_replicas_in_sync
distributed_values = (
strategy.experimental_distribute_values_from_function(
value_fn))
local_result = strategy.experimental_local_results(distributed_values)
local_result
(1,)
- Place values on devices and distribute:
strategy = tf.distribute.TPUStrategy()
worker_devices = strategy.extended.worker_devices
multiple_values = []
for i in range(strategy.num_replicas_in_sync):
with tf.device(worker_devices[i]):
multiple_values.append(tf.constant(1.0))
def value_fn(ctx):
return multiple_values[ctx.replica_id_in_sync_group]
distributed_values = strategy.
experimental_distribute_values_from_function(
value_fn)