Alguns conjuntos de dados são grandes demais para serem processados em uma única máquina. tfds
oferece suporte à geração de dados em muitas máquinas usando Apache Beam .
Este documento tem duas seções:
- Para usuários que desejam gerar um conjunto de dados existente do Beam
- Para desenvolvedores que desejam criar um novo conjunto de dados do Beam
Gerando um conjunto de dados do Beam
Abaixo estão diferentes exemplos de geração de um conjunto de dados do Beam, tanto na nuvem quanto localmente.
No Google Cloud Dataflow
Para executar o pipeline usando o Google Cloud Dataflow e aproveitar a computação distribuída, primeiro siga as instruções de início rápido .
Depois que seu ambiente estiver configurado, você poderá executar a CLI do tfds build
usando um diretório de dados no GCS e especificando as opções necessárias para o sinalizador --beam_pipeline_options
.
Para facilitar o lançamento do script, é útil definir as seguintes variáveis usando os valores reais da configuração do GCP/GCS e do conjunto de dados que você deseja gerar:
DATASET_NAME=<dataset-name>
DATASET_CONFIG=<dataset-config>
GCP_PROJECT=my-project-id
GCS_BUCKET=gs://my-gcs-bucket
Em seguida, você precisará criar um arquivo para instruir o Dataflow a instalar tfds
nos trabalhadores:
echo "tensorflow_datasets[$DATASET_NAME]" > /tmp/beam_requirements.txt
Se você estiver usando tfds-nightly
, certifique-se de ecoar de tfds-nightly
caso o conjunto de dados tenha sido atualizado desde o último lançamento.
echo "tfds-nightly[$DATASET_NAME]" > /tmp/beam_requirements.txt
Se você estiver usando dependências adicionais não incluídas na biblioteca TFDS, siga as instruções para gerenciar dependências de pipeline do Python .
Finalmente, você pode iniciar o trabalho usando o comando abaixo:
tfds build $DATASET_NAME/$DATASET_CONFIG \
--data_dir=$GCS_BUCKET/tensorflow_datasets \
--beam_pipeline_options=\
"runner=DataflowRunner,project=$GCP_PROJECT,job_name=$DATASET_NAME-gen,"\
"staging_location=$GCS_BUCKET/binaries,temp_location=$GCS_BUCKET/temp,"\
"requirements_file=/tmp/beam_requirements.txt"
Localmente
Para executar seu script localmente usando o executor padrão do Apache Beam (ele deve caber todos os dados na memória), o comando é o mesmo que para outros conjuntos de dados:
tfds build my_dataset
Com Apache Flink
Para executar o pipeline usando Apache Flink você pode ler a documentação oficial . Certifique-se de que seu Beam seja compatível com a compatibilidade de versão do Flink
Para facilitar o lançamento do script, é útil definir as seguintes variáveis usando os valores reais da configuração do Flink e do conjunto de dados que você deseja gerar:
DATASET_NAME=<dataset-name>
DATASET_CONFIG=<dataset-config>
FLINK_CONFIG_DIR=<flink-config-directory>
FLINK_VERSION=<flink-version>
Para executar em um cluster Flink incorporado, você pode iniciar o trabalho usando o comando abaixo:
tfds build $DATASET_NAME/$DATASET_CONFIG \
--beam_pipeline_options=\
"runner=FlinkRunner,flink_version=$FLINK_VERSION,flink_conf_dir=$FLINK_CONFIG_DIR"
Com um script personalizado
Para gerar o conjunto de dados no Beam, a API é a mesma de outros conjuntos de dados. Você pode personalizar o beam.Pipeline
usando os argumentos beam_options
(e beam_runner
) de DownloadConfig
.
# If you are running on Dataflow, Spark,..., you may have to set-up runtime
# flags. Otherwise, you can leave flags empty [].
flags = ['--runner=DataflowRunner', '--project=<project-name>', ...]
# `beam_options` (and `beam_runner`) will be forwarded to `beam.Pipeline`
dl_config = tfds.download.DownloadConfig(
beam_options=beam.options.pipeline_options.PipelineOptions(flags=flags)
)
data_dir = 'gs://my-gcs-bucket/tensorflow_datasets'
builder = tfds.builder('wikipedia/20190301.en', data_dir=data_dir)
builder.download_and_prepare(download_config=dl_config)
Implementando um conjunto de dados do Beam
Pré-requisitos
Para escrever conjuntos de dados do Apache Beam, você deve estar familiarizado com os seguintes conceitos:
- Familiarize-se com o guia de criação de conjunto de dados
tfds
, pois a maior parte do conteúdo ainda se aplica aos conjuntos de dados do Beam. - Obtenha uma introdução ao Apache Beam com o guia de programação do Beam .
- Se você quiser gerar seu conjunto de dados usando o Cloud Dataflow, leia a documentação do Google Cloud e o guia de dependência do Apache Beam .
Instruções
Se você estiver familiarizado com o guia de criação de conjunto de dados , adicionar um conjunto de dados do Beam requer apenas a modificação da função _generate_examples
. A função deve retornar um objeto de feixe, em vez de um gerador:
Conjunto de dados sem feixe:
def _generate_examples(self, path):
for f in path.iterdir():
yield _process_example(f)
Conjunto de dados de feixe:
def _generate_examples(self, path):
return (
beam.Create(path.iterdir())
| beam.Map(_process_example)
)
Todo o resto pode ser 100% idêntico, incluindo testes.
Algumas considerações adicionais:
- Use
tfds.core.lazy_imports
para importar o Apache Beam. Ao usar uma dependência lenta, os usuários ainda podem ler o conjunto de dados após ele ter sido gerado, sem precisar instalar o Beam. - Tenha cuidado com os fechamentos do Python. Ao executar o pipeline, as funções
beam.Map
ebeam.DoFn
são serializadas usandopickle
e enviadas a todos os trabalhadores. Não use objetos mutáveis dentro de umbeam.PTransform
se o estado tiver que ser compartilhado entre trabalhadores. - Devido à maneira como
tfds.core.DatasetBuilder
é serializado com pickle, a mutaçãotfds.core.DatasetBuilder
durante a criação de dados será ignorada nos trabalhadores (por exemplo, não é possível definirself.info.metadata['offset'] = 123
em_split_generators
e acesse-o dos trabalhadores comobeam.Map(lambda x: x + self.info.metadata['offset'])
) - Se precisar compartilhar algumas etapas do pipeline entre as divisões, você pode adicionar um
pipeline: beam.Pipeline
kwarg para_split_generator
e controlar o pipeline de geração completo. Consulte a documentação_generate_examples
detfds.core.GeneratorBasedBuilder
.
Exemplo
Aqui está um exemplo de um conjunto de dados do Beam.
class DummyBeamDataset(tfds.core.GeneratorBasedBuilder):
VERSION = tfds.core.Version('1.0.0')
def _info(self):
return self.dataset_info_from_configs(
features=tfds.features.FeaturesDict({
'image': tfds.features.Image(shape=(16, 16, 1)),
'label': tfds.features.ClassLabel(names=['dog', 'cat']),
}),
)
def _split_generators(self, dl_manager):
...
return {
'train': self._generate_examples(file_dir='path/to/train_data/'),
'test': self._generate_examples(file_dir='path/to/test_data/'),
}
def _generate_examples(self, file_dir: str):
"""Generate examples as dicts."""
beam = tfds.core.lazy_imports.apache_beam
def _process_example(filename):
# Use filename as key
return filename, {
'image': os.path.join(file_dir, filename),
'label': filename.split('.')[1], # Extract label: "0010102.dog.jpeg"
}
return (
beam.Create(tf.io.gfile.listdir(file_dir))
| beam.Map(_process_example)
)
Executando seu pipeline
Para executar o pipeline, dê uma olhada na seção acima.
tfds build my_dataset --register_checksums
Pipeline usando TFDS como entrada
Se você deseja criar um pipeline de feixe que usa um conjunto de dados TFDS como origem, você pode usar tfds.beam.ReadFromTFDS
:
builder = tfds.builder('my_dataset')
_ = (
pipeline
| tfds.beam.ReadFromTFDS(builder, split='train')
| beam.Map(tfds.as_numpy)
| ...
)
Ele processará cada fragmento do conjunto de dados em paralelo.