Applies a user-provided PTransform over the whole dataset.
tft.experimental.ptransform_analyzer(
inputs: Collection[tf.Tensor],
ptransform: Union[_BeamPTransform, tft.experimental.CacheablePTransformAnalyzer
],
output_dtypes: Collection[tf.dtypes.DType],
output_shapes: Collection[List[int]],
output_asset_default_values: Optional[Collection[Optional[bytes]]] = None,
name: Optional[str] = None
)
Note that in order to have asset files copied correctly, any outputs that
represent asset filenames must be added to the tf.GraphKeys.ASSET_FILEPATHS
collection by the caller if using Transform's APIs in compat v1 mode.
Example:
class MeanPerKey(beam.PTransform):
def expand(self, pcoll: beam.PCollection[Tuple[np.ndarray, np.ndarray]]) -> Tuple[beam.PCollection[np.ndarray], beam.PCollection[np.ndarray]]:
def extract_output(key_value_pairs):
keys, values = zip(*key_value_pairs)
return [beam.TaggedOutput('keys', keys),
beam.TaggedOutput('values', values)]
return tuple(
pcoll
| 'ZipAndFlatten' >> beam.FlatMap(lambda batches: list(zip(*batches)))
| 'MeanPerKey' >> beam.CombinePerKey(beam.combiners.MeanCombineFn())
| 'ToList' >> beam.combiners.ToList()
| 'Extract' >> beam.FlatMap(extract_output).with_outputs(
'keys', 'values'))
def preprocessing_fn(inputs):
outputs = tft.experimental.ptransform_analyzer(
inputs=[inputs['s'], inputs['x']],
ptransform=MeanPerKey(),
output_dtypes=[tf.string, tf.float32],
output_shapes=[[2], [2]])
(keys, means) = outputs
mean_a = tf.reshape(tf.gather(means, tf.where(keys == 'a')), [])
return { 'x/mean_a': inputs['x'] / mean_a }
raw_data = [dict(x=1, s='a'), dict(x=8, s='b'), dict(x=3, s='a')]
feature_spec = dict(
x=tf.io.FixedLenFeature([], tf.float32),
s=tf.io.FixedLenFeature([], tf.string))
raw_data_metadata = tft.DatasetMetadata.from_feature_spec(feature_spec)
with tft_beam.Context(temp_dir=tempfile.mkdtemp()):
transformed_dataset, transform_fn = (
(raw_data, raw_data_metadata)
| tft_beam.AnalyzeAndTransformDataset(preprocessing_fn))
transformed_data, transformed_metadata = transformed_dataset
transformed_data
[{'x/mean_a': 0.5}, {'x/mean_a': 4.0}, {'x/mean_a': 1.5}]
Args | |
---|---|
inputs
|
An ordered collection of input Tensor s.
|
ptransform
|
A Beam PTransform that accepts a Beam PCollection where each
element is a tuple of ndarray s. Each element in the tuple contains a
batch of values for the corresponding input tensor of the analyzer and
maintain their shapes and dtypes.
It returns a PCollection , or a tuple of PCollections , each containing
a single element which is an ndarray or a list of primitive types. The
contents of these output PCollection s must be consistent with the given
values of output_dtypes and output_shapes .
It may inherit from tft_beam.experimental.PTransformAnalyzer if access
to a temp base directory is needed.
Alternatively, it could be an instance of
tft.experimental.CacheablePTransformAnalyzer in order to enable cache
for this analyzer, when analyzer cache is enabled for this pipeline.
|
output_dtypes
|
An ordered collection of TensorFlow dtypes of the output of the analyzer. |
output_shapes
|
An ordered collection of shapes of the output of the analyzer. Must have the same length as output_dtypes. |
output_asset_default_values
|
(Optional) An ordered collection of optional
bytes aligned with output_dtypes/output_shapes. Every item in this
collection which is not None indicates that the output is a TF asset
path, and its value would be used as the default value of this asset file
prior to analysis.
|
name
|
(Optional) Similar to a TF op name. Used to define a unique scope for this analyzer, which can be used for debugging info. |
Returns | |
---|---|
A list of output Tensor s. These will have dtype and shape as
specified by output_dtypes and output_shapes .
|