The ExampleGen TFX Pipeline Component

The ExampleGen TFX Pipeline component ingests data into TFX pipelines. It consumes external files/services to generate Examples which will be read by other TFX components. It also provides consistent and configurable partition, and shuffles the dataset for ML best practice.

  • Consumes: Data from external data sources such as CSV, TFRecord, Avro, Parquet and BigQuery.
  • Emits: tf.Example records, tf.SequenceExample records, or proto format, depending on the payload format.

ExampleGen and Other Components

ExampleGen provides data to components that make use of the TensorFlow Data Validation library, such as SchemaGen, StatisticsGen, and Example Validator. It also provides data to Transform, which makes use of the TensorFlow Transform library, and ultimately to deployment targets during inference.

Data Sources and Formats

Currently a standard installation of TFX includes full ExampleGen components for these data sources and formats:

Custom executors are also available which enable the development of ExampleGen components for these data sources and formats:

See the usage examples in the source code and this discussion for more information on how to use and develop custom executors.

In addition, these data sources and formats are available as custom component examples:

Ingesting data formats which are supported by Apache Beam

Apache Beam supports ingesting data from a broad range of data sources and formats, (see below). These capabilities can be used to create custom ExampleGen components for TFX, which is demonstrated by some existing ExampleGen components (see below).

How to use an ExampleGen Component

For supported data sources (currently, CSV files, TFRecord files with tf.Example, tf.SequenceExample and proto format, and results of BigQuery queries) the ExampleGen pipeline component can be used directly in deploy and requires little customization. For example:

example_gen = CsvExampleGen(input_base='data_root')

or like below for importing external TFRecord with tf.Example directly:

example_gen = ImportExampleGen(input_base=path_to_tfrecord_dir)

Span, Version and Split

A Span is a grouping of training examples. If your data is persisted on a filesystem, each Span may be stored in a separate directory. The semantics of a Span are not hardcoded into TFX; a Span may correspond to a day of data, an hour of data, or any other grouping that is meaningful to your task.

Each Span can hold multiple Versions of data. To give an example, if you remove some examples from a Span to clean up poor quality data, this could result in a new Version of that Span. By default, TFX components operate on the latest Version within a Span.

Each Version within a Span can further be subdivided into multiple Splits. The most common use-case for splitting a Span is to split it into training and eval data.

Spans and Splits

Custom input/output split

To customize the train/eval split ratio which ExampleGen will output, set the output_config for ExampleGen component. For example:

# Input has a single split 'input_dir/*'.
# Output 2 splits: train:eval=3:1.
output = proto.Output(
             split_config=example_gen_pb2.SplitConfig(splits=[
                 proto.SplitConfig.Split(name='train', hash_buckets=3),
                 proto.SplitConfig.Split(name='eval', hash_buckets=1)
             ]))
example_gen = CsvExampleGen(input_base=input_dir, output_config=output)

Notice how the hash_buckets were set in this example.

For an input source which has already been split, set the input_config for ExampleGen component:


# Input train split is 'input_dir/train/*', eval split is 'input_dir/eval/*'.
# Output splits are generated one-to-one mapping from input splits.
input = proto.Input(splits=[
                example_gen_pb2.Input.Split(name='train', pattern='train/*'),
                example_gen_pb2.Input.Split(name='eval', pattern='eval/*')
            ])
example_gen = CsvExampleGen(input_base=input_dir, input_config=input)

For file based example gen (e.g. CsvExampleGen and ImportExampleGen), pattern is a glob relative file pattern that maps to input files with root directory given by input base path. For query-based example gen (e.g. BigQueryExampleGen, PrestoExampleGen), pattern is a SQL query.

By default, the entire input base dir is treated as a single input split, and the train and eval output split is generated with a 2:1 ratio.

Please refer to proto/example_gen.proto for ExampleGen's input and output split configuration. And refer to downstream components guide for utilizing the custom splits downstream.

Splitting Method

When using hash_buckets splitting method, instead of the entire record, one can use a feature for partitioning the examples. If a feature is present, ExampleGen will use a fingerprint of that feature as the partition key.

This feature can be used to maintain a stable split w.r.t. certain properties of examples: for example, a user will always be put in the same split if "user_id" were selected as the partition feature name.

The interpretation of what a "feature" means and how to match a "feature" with the specified name depends on the ExampleGen implementation and the type of the examples.

For ready-made ExampleGen implementations:

  • If it generates tf.Example, then a "feature" means an entry in tf.Example.features.feature.
  • If it generates tf.SequenceExample, then a "feature" means an entry in tf.SequenceExample.context.feature.
  • Only int64 and bytes features are supported.

In the following cases, ExampleGen throws runtime errors:

  • Specified feature name does not exist in the example.
  • Empty feature: tf.train.Feature().
  • Non supported feature types, e.g., float features.

To output the train/eval split based on a feature in the examples, set the output_config for ExampleGen component. For example:

# Input has a single split 'input_dir/*'.
# Output 2 splits based on 'user_id' features: train:eval=3:1.
output = proto.Output(
             split_config=proto.SplitConfig(splits=[
                 proto.SplitConfig.Split(name='train', hash_buckets=3),
                 proto.SplitConfig.Split(name='eval', hash_buckets=1)
             ],
             partition_feature_name='user_id'))
example_gen = CsvExampleGen(input_base=input_dir, output_config=output)

Notice how the partition_feature_name was set in this example.

Span

Span can be retrieved by using '{SPAN}' spec in the input glob pattern:

  • This spec matches digits and maps the data into the relevant SPAN numbers. For example, 'data_{SPAN}-*.tfrecord' will collect files like 'data_12-a.tfrecord', 'data_12-b.tfrecord'.
  • Optionally, this spec can be specified with the width of the integers when mapped. For example, 'data_{SPAN:2}.file' maps to files like 'data_02.file' and 'data_27.file' (as inputs for Span-2 and Span-27 respectively), but does not map to 'data_1.file' nor 'data_123.file'.
  • When SPAN spec is missing, it's assumed to be always Span '0'.
  • If SPAN is specified, pipeline will process the latest span, and store the span number in metadata.

For example, let's assume there are input data:

  • '/tmp/span-1/train/data'
  • '/tmp/span-1/eval/data'
  • '/tmp/span-2/train/data'
  • '/tmp/span-2/eval/data'

and the input config is shown as below:

splits {
  name: 'train'
  pattern: 'span-{SPAN}/train/*'
}
splits {
  name: 'eval'
  pattern: 'span-{SPAN}/eval/*'
}

when triggering the pipeline, it will process:

  • '/tmp/span-2/train/data' as train split
  • '/tmp/span-2/eval/data' as eval split

with span number as '2'. If later on '/tmp/span-3/...' are ready, simply trigger the pipeline again and it will pick up span '3' for processing. Below shows the code example for using span spec:

input = proto.Input(splits=[
                proto.Input.Split(name='train',
                                            pattern='span-{SPAN}/train/*'),
                proto.Input.Split(name='eval',
                                            pattern='span-{SPAN}/eval/*')
            ])
example_gen = CsvExampleGen(input_base='/tmp', input_config=input)

Retrieving a certain span can be done with RangeConfig, which is detailed below.

Date

If your data source is organized on filesystem by date, TFX supports mapping dates directly to span numbers. There are three specs to represent mapping from dates to spans: {YYYY}, {MM} and {DD}:

  • The three specs should be altogether present in the input glob pattern if any is specified:
  • Either {SPAN} spec or this set of date specs can be specified exclusively.
  • A calendar date with the year from YYYY, the month from MM, and the day of the month from DD is calculated, then the span number is calculated as as the number of days since unix epoch (i.e. 1970-01-01). For example, 'log-{YYYY}{MM}{DD}.data' matches to a file 'log-19700101.data' and consumes it as input for Span-0, and 'log-20170101.data' as input for Span-17167.
  • If this set of date specs is specified, pipeline will process the latest latest date, and store the corresponding span number in metadata.

For example, let's assume there are input data organized by calendar date:

  • '/tmp/1970-01-02/train/data'
  • '/tmp/1970-01-02/eval/data'
  • '/tmp/1970-01-03/train/data'
  • '/tmp/1970-01-03/eval/data'

and the input config is shown as below:

splits {
  name: 'train'
  pattern: '{YYYY}-{MM}-{DD}/train/*'
}
splits {
  name: 'eval'
  pattern: '{YYYY}-{MM}-{DD}/eval/*'
}

when triggering the pipeline, it will process:

  • '/tmp/1970-01-03/train/data' as train split
  • '/tmp/1970-01-03/eval/data' as eval split

with span number as '2'. If later on '/tmp/1970-01-04/...' are ready, simply trigger the pipeline again and it will pick up span '3' for processing. Below shows the code example for using date spec:

input = proto.Input(splits=[
                proto.Input.Split(name='train',
                                            pattern='{YYYY}-{MM}-{DD}/train/*'),
                proto.Input.Split(name='eval',
                                            pattern='{YYYY}-{MM}-{DD}/eval/*')
            ])
example_gen = CsvExampleGen(input_base='/tmp', input_config=input)

Version

Version can be retrieved by using '{VERSION}' spec in the input glob pattern:

  • This spec matches digits and maps the data to the relevant VERSION numbers under the SPAN. Note that the Version spec can be used combination with either Span or Date spec.
  • This spec can also be optionally specified with the width in the same way as SPAN spec. e.g. 'span-{SPAN}/version-{VERSION:4}/data-*'.
  • When VERSION spec is missing, version is set to be None.
  • If SPAN and VERSION are both specified, pipeline will process the latest version for the latest span, and store the version number in metadata.
  • If VERSION is specified, but not SPAN (or date spec), an error will be thrown.

For example, let's assume there are input data:

  • '/tmp/span-1/ver-1/train/data'
  • '/tmp/span-1/ver-1/eval/data'
  • '/tmp/span-2/ver-1/train/data'
  • '/tmp/span-2/ver-1/eval/data'
  • '/tmp/span-2/ver-2/train/data'
  • '/tmp/span-2/ver-2/eval/data'

and the input config is shown as below:

splits {
  name: 'train'
  pattern: 'span-{SPAN}/ver-{VERSION}/train/*'
}
splits {
  name: 'eval'
  pattern: 'span-{SPAN}/ver-{VERSION}/eval/*'
}

when triggering the pipeline, it will process:

  • '/tmp/span-2/ver-2/train/data' as train split
  • '/tmp/span-2/ver-2/eval/data' as eval split

with span number as '2' and version number as '2'. If later on '/tmp/span-2/ver-3/...' are ready, simply trigger the pipeline again and it will pick up span '2' and version '3' for processing. Below shows the code example for using version spec:

input = proto.Input(splits=[
                proto.Input.Split(name='train',
                                            pattern='span-{SPAN}/ver-{VERSION}/train/*'),
                proto.Input.Split(name='eval',
                                            pattern='span-{SPAN}/ver-{VERSION}/eval/*')
            ])
example_gen = CsvExampleGen(input_base='/tmp', input_config=input)

Range Config

TFX supports retrieval and processing of a specific span in file-based ExampleGen using range config, an abstract config used to describe ranges for different TFX entities. To retrieve a specific span, set the range_config for a file-based ExampleGen component. For example, let's assume there are input data:

  • '/tmp/span-01/train/data'
  • '/tmp/span-01/eval/data'
  • '/tmp/span-02/train/data'
  • '/tmp/span-02/eval/data'

To specifically retrieve and process data with span '1', we specify a range config in addition to the input config. Note that ExampleGen only supports single-span static ranges (to specify processing of specific individual spans). Thus, for StaticRange, start_span_number must equal end_span_number. Using the provided span, and the span width information (if provided) for zero-padding, ExampleGen will replace the SPAN spec in the provided split patterns with the desired span number. An example of usage is shown below:

# In cases where files have zero-padding, the width modifier in SPAN spec is
# required so TFX can correctly substitute spec with zero-padded span number.
input = proto.Input(splits=[
                proto.Input.Split(name='train',
                                            pattern='span-{SPAN:2}/train/*'),
                proto.Input.Split(name='eval',
                                            pattern='span-{SPAN:2}/eval/*')
            ])
# Specify the span number to be processed here using StaticRange.
range = proto.RangeConfig(
                static_range=proto.StaticRange(
                        start_span_number=1, end_span_number=1)
            )

# After substitution, the train and eval split patterns will be
# 'input_dir/span-01/train/*' and 'input_dir/span-01/eval/*', respectively.
example_gen = CsvExampleGen(input_base=input_dir, input_config=input,
                            range_config=range)

Range config can also be used to process specific dates, if the date spec is used instead of SPAN spec. For example, let's assume there are input data organized by calendar date:

  • '/tmp/1970-01-02/train/data'
  • '/tmp/1970-01-02/eval/data'
  • '/tmp/1970-01-03/train/data'
  • '/tmp/1970-01-03/eval/data'

To specifically retrieve and process data on January 2nd, 1970, we do the following:

from  tfx.components.example_gen import utils

input = proto.Input(splits=[
                proto.Input.Split(name='train',
                                            pattern='{YYYY}-{MM}-{DD}/train/*'),
                proto.Input.Split(name='eval',
                                            pattern='{YYYY}-{MM}-{DD}/eval/*')
            ])
# Specify date to be converted to span number to be processed using StaticRange.
span = utils.date_to_span_number(1970, 1, 2)
range = proto.RangeConfig(
                static_range=range_config_pb2.StaticRange(
                        start_span_number=span, end_span_number=span)
            )

# After substitution, the train and eval split patterns will be
# 'input_dir/1970-01-02/train/*' and 'input_dir/1970-01-02/eval/*',
# respectively.
example_gen = CsvExampleGen(input_base=input_dir, input_config=input,
                            range_config=range)

Custom ExampleGen

If the currently available ExampleGen components don't fit your needs, you can create a custom ExampleGen, which will enable you to read from different data sources or in different data formats.

File-Based ExampleGen Customization (Experimental)

First, extend BaseExampleGenExecutor with a custom Beam PTransform, which provides the conversion from your train/eval input split to TF examples. For example, the CsvExampleGen executor provides the conversion from an input CSV split to TF examples.

Then, create a component with above executor, as done in CsvExampleGen component. Alternatively, pass a custom executor into the standard ExampleGen component as shown below.

from tfx.components.base import executor_spec
from tfx.components.example_gen.csv_example_gen import executor

example_gen = FileBasedExampleGen(
    input_base=os.path.join(base_dir, 'data/simple'),
    custom_executor_spec=executor_spec.ExecutorClassSpec(executor.Executor))

Now, we also support reading Avro and Parquet files using this method.

Additional Data Formats

Apache Beam supports reading a number of additional data formats. through Beam I/O Transforms. You can create custom ExampleGen components by leveraging the Beam I/O Transforms using a pattern similar to the Avro example

  return (pipeline
          | 'ReadFromAvro' >> beam.io.ReadFromAvro(avro_pattern)
          | 'ToTFExample' >> beam.Map(utils.dict_to_example))

As of this writing the currently supported formats and data sources for the Beam Python SDK include:

  • Amazon S3
  • Apache Avro
  • Apache Hadoop
  • Apache Kafka
  • Apache Parquet
  • Google Cloud BigQuery
  • Google Cloud BigTable
  • Google Cloud Datastore
  • Google Cloud Pub/Sub
  • Google Cloud Storage (GCS)
  • MongoDB

Check the Beam docs for the latest list.

Query-Based ExampleGen Customization (Experimental)

First, extend BaseExampleGenExecutor with a custom Beam PTransform, which reads from the external data source. Then, create a simple component by extending QueryBasedExampleGen.

This may or may not require additional connection configurations. For example, the BigQuery executor reads using a default beam.io connector, which abstracts the connection configuration details. The Presto executor, requires a custom Beam PTransform and a custom connection configuration protobuf as input.

If a connection configuration is required for a custom ExampleGen component, create a new protobuf and pass it in through custom_config, which is now an optional execution parameter. Below is an example of how to use a configured component.

from tfx.examples.custom_components.presto_example_gen.proto import presto_config_pb2
from tfx.examples.custom_components.presto_example_gen.presto_component.component import PrestoExampleGen

presto_config = presto_config_pb2.PrestoConnConfig(host='localhost', port=8080)
example_gen = PrestoExampleGen(presto_config, query='SELECT * FROM chicago_taxi_trips')

ExampleGen Downstream Components

Custom split configuration is supported for downstream components.

StatisticsGen

Default behavior is to perform stats generation for all splits.

To exclude any splits, set the exclude_splits for StatisticsGen component. For example:

# Exclude the 'eval' split.
statistics_gen = StatisticsGen(
             examples=example_gen.outputs['examples'],
             exclude_splits=['eval'])

SchemaGen

Default behavior is to generate a schema based on all splits.

To exclude any splits, set the exclude_splits for SchemaGen component. For example:

# Exclude the 'eval' split.
schema_gen = SchemaGen(
             statistics=statistics_gen.outputs['statistics'],
             exclude_splits=['eval'])

ExampleValidator

Default behavior is to validate the statistics of all splits on input examples against a schema.

To exclude any splits, set the exclude_splits for ExampleValidator component. For example:

# Exclude the 'eval' split.
example_validator = ExampleValidator(
             statistics=statistics_gen.outputs['statistics'],
             schema=schema_gen.outputs['schema'],
             exclude_splits=['eval'])

Transform

Default behavior is analyze and produce the metadata from the 'train' split and transform all splits.

To specify the analyze splits and transform splits, set the splits_config for Transform component. For example:

# Analyze the 'train' split and transform all splits.
transform = Transform(
      examples=example_gen.outputs['examples'],
      schema=schema_gen.outputs['schema'],
      module_file=_taxi_module_file,
      splits_config=proto.SplitsConfig(analyze=['train'],
                                               transform=['train', 'eval']))

Trainer and Tuner

Default behavior is train on the 'train' split and evaluate on the 'eval' split.

To specify the train splits and evaluate splits, set the train_args and eval_args for Trainer component. For example:

# Train on the 'train' split and evaluate on the 'eval' split.
Trainer = Trainer(
      module_file=_taxi_module_file,
      examples=transform.outputs['transformed_examples'],
      schema=schema_gen.outputs['schema'],
      transform_graph=transform.outputs['transform_graph'],
      train_args=proto.TrainArgs(splits=['train'], num_steps=10000),
      eval_args=proto.EvalArgs(splits=['eval'], num_steps=5000))

Evaluator

Default behavior is provide metrics computed on the 'eval' split.

To compute evaluation statistics on custom splits, set the example_splits for Evaluator component. For example:

# Compute metrics on the 'eval1' split and the 'eval2' split.
evaluator = Evaluator(
      examples=example_gen.outputs['examples'],
      model=trainer.outputs['model'],
      example_splits=['eval1', 'eval2'])

More details are available in the CsvExampleGen API reference, FileBasedExampleGen API implementation and ImportExampleGen API reference.