tft.ptransform_analyzer

Applies a user-provided PTransform over the whole dataset.

Note that in order to have asset files copied correctly, any outputs that represent asset filenames must be added to the tf.GraphKeys.ASSET_FILEPATHS collection by the caller if using Transform's APIs in compat v1 mode.

Example:

class MeanPerKey(beam.PTransform):
  def expand(self, pcoll):
    # Returning a single PCollection since this analyzer has 1 output.
    return (pcoll
            | 'TuplesOfArraysToTuples' >> beam.FlatMap(lambda kv: list(zip(*kv)))
            | 'MeanPerKey' >> beam.CombinePerKey(beam.combiners.MeanCombineFn())
            | 'ToList' >> beam.combiners.ToList()
            | 'ExtractMeans' >>
            beam.Map(lambda outputs: [v for _, v in sorted(outputs)]))
def preprocessing_fn(inputs):
  outputs = tft.ptransform_analyzer(
      inputs=[inputs['s'], inputs['x']],
      output_dtypes=[tf.float32],
      output_shapes=[[2]],
      ptransform=MeanPerKey())
  (mean_per_key,) = outputs
  return { 'x/mean_a': inputs['x'] / mean_per_key[0] }
raw_data = [dict(x=1, s='a'), dict(x=8, s='b'), dict(x=3, s='a')]
feature_spec = dict(
    x=tf.io.FixedLenFeature([], tf.float32),
    s=tf.io.FixedLenFeature([], tf.string))
raw_data_metadata = tft.tf_metadata.dataset_metadata.DatasetMetadata(
    tft.tf_metadata.schema_utils.schema_from_feature_spec(feature_spec))
with tft_beam.Context(temp_dir=tempfile.mkdtemp()):
  transformed_dataset, transform_fn = (
      (raw_data, raw_data_metadata)
      | tft_beam.AnalyzeAndTransformDataset(preprocessing_fn))
transformed_data, transformed_metadata = transformed_dataset
transformed_data
[{'x/mean_a': 0.5}, {'x/mean_a': 4.0}, {'x/mean_a': 1.5}]

inputs A list of input Tensors.
output_dtypes The list of TensorFlow dtypes of the output of the analyzer.
output_shapes The list of shapes of the output of the analyzer. Must have the same length as output_dtypes.
ptransform A Beam PTransform that accepts a Beam PCollection where each element is a list of ndarrays. Each element in the list contains a batch of values for the corresponding input tensor of the analyzer. It returns a tuple of PCollection, each containing a single element which is an ndarray.
name (Optional) Similar to a TF op name. Used to define a unique scope for this analyzer, which can be used for debugging info.

A list of output Tensors. These will have dtype and shape as specified by output_dtypes and output_shapes.

ValueError If output_dtypes and output_shapes have different lengths.