Certains ensembles de données sont trop volumineux pour être traités sur une seule machine. tfds
prend en charge la génération de données sur de nombreuses machines à l'aide d'Apache Beam .
Ce document comporte deux sections :
- Pour les utilisateurs souhaitant générer un jeu de données Beam existant
- Pour les développeurs qui souhaitent créer un nouvel ensemble de données Beam
Générer un jeu de données Beam
Vous trouverez ci-dessous différents exemples de génération d'un ensemble de données Beam, à la fois sur le cloud ou localement.
Sur Google Cloud Dataflow
Pour exécuter le pipeline à l'aide de Google Cloud Dataflow et profiter du calcul distribué, suivez d'abord les instructions de démarrage rapide .
Une fois votre environnement configuré, vous pouvez exécuter la CLI tfds build
en utilisant un répertoire de données sur GCS et en spécifiant les options requises pour l'indicateur --beam_pipeline_options
.
Pour faciliter le lancement du script, il est utile de définir les variables suivantes à l'aide des valeurs réelles de votre configuration GCP/GCS et de l'ensemble de données que vous souhaitez générer :
DATASET_NAME=<dataset-name>
DATASET_CONFIG=<dataset-config>
GCP_PROJECT=my-project-id
GCS_BUCKET=gs://my-gcs-bucket
Vous devrez ensuite créer un fichier pour indiquer à Dataflow d'installer tfds
sur les nœuds de calcul :
echo "tensorflow_datasets[$DATASET_NAME]" > /tmp/beam_requirements.txt
Si vous utilisez tfds-nightly
, assurez-vous de faire écho à tfds-nightly
au cas où l'ensemble de données aurait été mis à jour depuis la dernière version.
echo "tfds-nightly[$DATASET_NAME]" > /tmp/beam_requirements.txt
Si vous utilisez des dépendances supplémentaires non incluses dans la bibliothèque TFDS, suivez les instructions de gestion des dépendances du pipeline Python .
Enfin, vous pouvez lancer le travail à l'aide de la commande ci-dessous :
tfds build $DATASET_NAME/$DATASET_CONFIG \
--data_dir=$GCS_BUCKET/tensorflow_datasets \
--beam_pipeline_options=\
"runner=DataflowRunner,project=$GCP_PROJECT,job_name=$DATASET_NAME-gen,"\
"staging_location=$GCS_BUCKET/binaries,temp_location=$GCS_BUCKET/temp,"\
"requirements_file=/tmp/beam_requirements.txt"
Localement
Pour exécuter votre script localement à l'aide du programme d'exécution Apache Beam par défaut (il doit contenir toutes les données en mémoire), la commande est la même que pour les autres ensembles de données :
tfds build my_dataset
Avec Apache Flink
Pour exécuter le pipeline à l'aide d'Apache Flink, vous pouvez lire la documentation officielle . Assurez-vous que votre Beam est conforme à la compatibilité des versions Flink
Pour faciliter le lancement du script, il est utile de définir les variables suivantes en utilisant les valeurs réelles de votre configuration Flink et l'ensemble de données que vous souhaitez générer :
DATASET_NAME=<dataset-name>
DATASET_CONFIG=<dataset-config>
FLINK_CONFIG_DIR=<flink-config-directory>
FLINK_VERSION=<flink-version>
Pour exécuter sur un cluster Flink intégré, vous pouvez lancer la tâche à l'aide de la commande ci-dessous :
tfds build $DATASET_NAME/$DATASET_CONFIG \
--beam_pipeline_options=\
"runner=FlinkRunner,flink_version=$FLINK_VERSION,flink_conf_dir=$FLINK_CONFIG_DIR"
Avec un script personnalisé
Pour générer le jeu de données sur Beam, l'API est la même que pour les autres jeux de données. Vous pouvez personnaliser le beam.Pipeline
en utilisant les arguments beam_options
(et beam_runner
) de DownloadConfig
.
# If you are running on Dataflow, Spark,..., you may have to set-up runtime
# flags. Otherwise, you can leave flags empty [].
flags = ['--runner=DataflowRunner', '--project=<project-name>', ...]
# `beam_options` (and `beam_runner`) will be forwarded to `beam.Pipeline`
dl_config = tfds.download.DownloadConfig(
beam_options=beam.options.pipeline_options.PipelineOptions(flags=flags)
)
data_dir = 'gs://my-gcs-bucket/tensorflow_datasets'
builder = tfds.builder('wikipedia/20190301.en', data_dir=data_dir)
builder.download_and_prepare(download_config=dl_config)
Implémentation d'un jeu de données Beam
Conditions préalables
Afin d'écrire des ensembles de données Apache Beam, vous devez être familier avec les concepts suivants :
- Familiarisez-vous avec le guide de création de jeux de données
tfds
, car la plupart du contenu s'applique toujours aux jeux de données Beam. - Obtenez une introduction à Apache Beam avec le guide de programmation Beam .
- Si vous souhaitez générer votre ensemble de données à l'aide de Cloud Dataflow, lisez la documentation Google Cloud et le guide des dépendances Apache Beam .
Instructions
Si vous connaissez le guide de création de jeux de données , l'ajout d'un jeu de données Beam nécessite uniquement de modifier la fonction _generate_examples
. La fonction doit renvoyer un objet poutre, plutôt qu'un générateur :
Ensemble de données sans faisceau :
def _generate_examples(self, path):
for f in path.iterdir():
yield _process_example(f)
Ensemble de données de poutre :
def _generate_examples(self, path):
return (
beam.Create(path.iterdir())
| beam.Map(_process_example)
)
Tout le reste peut être 100% identique, tests compris.
Quelques considérations supplémentaires :
- Utilisez
tfds.core.lazy_imports
pour importer Apache Beam. En utilisant une dépendance paresseuse, les utilisateurs peuvent toujours lire l'ensemble de données après sa génération sans avoir à installer Beam. - Soyez prudent avec les fermetures Python. Lors de l'exécution du pipeline, les fonctions
beam.Map
etbeam.DoFn
sont sérialisées à l'aidepickle
et envoyées à tous les travailleurs. N'utilisez pas d'objets mutables à l'intérieur d'unbeam.PTransform
si l'état doit être partagé entre les Workers. - En raison de la façon dont
tfds.core.DatasetBuilder
est sérialisé avec pickle, la mutationtfds.core.DatasetBuilder
lors de la création des données sera ignorée sur les travailleurs (par exemple, il n'est pas possible de définirself.info.metadata['offset'] = 123
dans_split_generators
et accédez-y à partir des travailleurs commebeam.Map(lambda x: x + self.info.metadata['offset'])
) - Si vous devez partager certaines étapes du pipeline entre les divisions, vous pouvez ajouter un
pipeline: beam.Pipeline
kwarg à_split_generator
et contrôler le pipeline de génération complet. Voir la documentation_generate_examples
detfds.core.GeneratorBasedBuilder
.
Exemple
Voici un exemple d'un ensemble de données Beam.
class DummyBeamDataset(tfds.core.GeneratorBasedBuilder):
VERSION = tfds.core.Version('1.0.0')
def _info(self):
return self.dataset_info_from_configs(
features=tfds.features.FeaturesDict({
'image': tfds.features.Image(shape=(16, 16, 1)),
'label': tfds.features.ClassLabel(names=['dog', 'cat']),
}),
)
def _split_generators(self, dl_manager):
...
return {
'train': self._generate_examples(file_dir='path/to/train_data/'),
'test': self._generate_examples(file_dir='path/to/test_data/'),
}
def _generate_examples(self, file_dir: str):
"""Generate examples as dicts."""
beam = tfds.core.lazy_imports.apache_beam
def _process_example(filename):
# Use filename as key
return filename, {
'image': os.path.join(file_dir, filename),
'label': filename.split('.')[1], # Extract label: "0010102.dog.jpeg"
}
return (
beam.Create(tf.io.gfile.listdir(file_dir))
| beam.Map(_process_example)
)
Exécution de votre pipeline
Pour exécuter le pipeline, consultez la section ci-dessus.
tfds build my_dataset --register_checksums
Pipeline utilisant TFDS comme entrée
Si vous souhaitez créer un pipeline de poutres qui prend un jeu de données TFDS comme source, vous pouvez utiliser tfds.beam.ReadFromTFDS
:
builder = tfds.builder('my_dataset')
_ = (
pipeline
| tfds.beam.ReadFromTFDS(builder, split='train')
| beam.Map(tfds.as_numpy)
| ...
)
Il traitera chaque fragment de l'ensemble de données en parallèle.