Some datasets are too big to be processed on a single machine. tfds
supports
generating data across many machines by using
Apache Beam.
This doc has two sections:
- For user who want to generate an existing Beam dataset
- For developers who want to create a new Beam dataset
Generating a Beam dataset
Below are different examples of generating a Beam dataset, both on the cloud or locally.
On Google Cloud Dataflow
To run the pipeline using Google Cloud Dataflow and take advantage of distributed computation, first follow the Quickstart instructions.
Once your environment is set up, you can run the
tfds build
CLI
using a data directory on GCS and
specifying the
required options
for the --beam_pipeline_options
flag.
To make it easier to launch the script, it's helpful to define the following variables using the actual values for your GCP/GCS setup and the dataset you want to generate:
DATASET_NAME=<dataset-name>
DATASET_CONFIG=<dataset-config>
GCP_PROJECT=my-project-id
GCS_BUCKET=gs://my-gcs-bucket
You will then need to create a file to tell Dataflow to install tfds
on the
workers:
echo "tensorflow_datasets[$DATASET_NAME]" > /tmp/beam_requirements.txt
If you're using tfds-nightly
, make sure to echo from tfds-nightly
in case
the dataset has been updated since the last release.
echo "tfds-nightly[$DATASET_NAME]" > /tmp/beam_requirements.txt
If you're using additional dependencies not included in TFDS library follow the instructions for managing Python pipeline dependencies.
Finally, you can launch the job using the command below:
tfds build $DATASET_NAME/$DATASET_CONFIG \
--data_dir=$GCS_BUCKET/tensorflow_datasets \
--beam_pipeline_options=\
"runner=DataflowRunner,project=$GCP_PROJECT,job_name=$DATASET_NAME-gen,"\
"staging_location=$GCS_BUCKET/binaries,temp_location=$GCS_BUCKET/temp,"\
"requirements_file=/tmp/beam_requirements.txt"
Locally
To run your script locally using the default Apache Beam runner (it must fit all data in memory), the command is the same as for other datasets:
tfds build my_dataset
With Apache Flink
To run the pipeline using Apache Flink you can read the official documentation. Make sure your Beam is compliant with Flink Version Compatibility
To make it easier to launch the script, it's helpful to define the following variables using the actual values for your Flink setup and the dataset you want to generate:
DATASET_NAME=<dataset-name>
DATASET_CONFIG=<dataset-config>
FLINK_CONFIG_DIR=<flink-config-directory>
FLINK_VERSION=<flink-version>
To run on an embedded Flink cluster, you can launch the job using the command below:
tfds build $DATASET_NAME/$DATASET_CONFIG \
--beam_pipeline_options=\
"runner=FlinkRunner,flink_version=$FLINK_VERSION,flink_conf_dir=$FLINK_CONFIG_DIR"
With a custom script
To generate the dataset on Beam, the API is the same as for other datasets. You
can customize the
beam.Pipeline
using the beam_options
(and beam_runner
) arguments of DownloadConfig
.
# If you are running on Dataflow, Spark,..., you may have to set-up runtime
# flags. Otherwise, you can leave flags empty [].
flags = ['--runner=DataflowRunner', '--project=<project-name>', ...]
# `beam_options` (and `beam_runner`) will be forwarded to `beam.Pipeline`
dl_config = tfds.download.DownloadConfig(
beam_options=beam.options.pipeline_options.PipelineOptions(flags=flags)
)
data_dir = 'gs://my-gcs-bucket/tensorflow_datasets'
builder = tfds.builder('wikipedia/20190301.en', data_dir=data_dir)
builder.download_and_prepare(download_config=dl_config)
Implementing a Beam dataset
Prerequisites
In order to write Apache Beam datasets, you should be familiar with the following concepts:
- Be familiar with the
tfds
dataset creation guide as most of the content still applies for Beam datasets. - Get an introduction to Apache Beam with the Beam programming guide.
- If you want to generate your dataset using Cloud Dataflow, read the Google Cloud Documentation and the Apache Beam dependency guide.
Instructions
If you are familiar with the
dataset creation guide,
adding a Beam dataset only requires to modify the _generate_examples
function.
The function should returns a beam object, rather than a generator:
Non-beam dataset:
def _generate_examples(self, path):
for f in path.iterdir():
yield _process_example(f)
Beam dataset:
def _generate_examples(self, path):
return (
beam.Create(path.iterdir())
| beam.Map(_process_example)
)
All the rest can be 100% identical, including tests.
Some additional considerations:
- Use
tfds.core.lazy_imports
to import Apache Beam. By using a lazy dependency, users can still read the dataset after it has been generated without having to install Beam. - Be careful with Python closures. When running the pipeline, the
beam.Map
andbeam.DoFn
functions are serialized usingpickle
and sent to all workers. Do not use mutable objects inside abeam.PTransform
if the state has to be shared across workers. - Due to the way
tfds.core.DatasetBuilder
is serialized with pickle, mutatingtfds.core.DatasetBuilder
during data creation will be ignored on the workers (e.g. it's not possible to setself.info.metadata['offset'] = 123
in_split_generators
and access it from the workers likebeam.Map(lambda x: x + self.info.metadata['offset'])
) - If you need to share some pipeline steps between the splits, you can add add
an extra
pipeline: beam.Pipeline
kwarg to_split_generator
and control the full generation pipeline. See_generate_examples
documentation oftfds.core.GeneratorBasedBuilder
.
Example
Here is an example of a Beam dataset.
class DummyBeamDataset(tfds.core.GeneratorBasedBuilder):
VERSION = tfds.core.Version('1.0.0')
def _info(self):
return self.dataset_info_from_configs(
features=tfds.features.FeaturesDict({
'image': tfds.features.Image(shape=(16, 16, 1)),
'label': tfds.features.ClassLabel(names=['dog', 'cat']),
}),
)
def _split_generators(self, dl_manager):
...
return {
'train': self._generate_examples(file_dir='path/to/train_data/'),
'test': self._generate_examples(file_dir='path/to/test_data/'),
}
def _generate_examples(self, file_dir: str):
"""Generate examples as dicts."""
beam = tfds.core.lazy_imports.apache_beam
def _process_example(filename):
# Use filename as key
return filename, {
'image': os.path.join(file_dir, filename),
'label': filename.split('.')[1], # Extract label: "0010102.dog.jpeg"
}
return (
beam.Create(tf.io.gfile.listdir(file_dir))
| beam.Map(_process_example)
)
Running your pipeline
To run the pipeline, have a look at the above section.
tfds build my_dataset --register_checksums
Pipeline using TFDS as input
If you want to create a beam pipeline which takes a TFDS dataset as source, you
can use the tfds.beam.ReadFromTFDS
:
builder = tfds.builder('my_dataset')
_ = (
pipeline
| tfds.beam.ReadFromTFDS(builder, split='train')
| beam.Map(tfds.as_numpy)
| ...
)
It will process each shard of the dataset in parallel.