Generowanie dużych zbiorów danych za pomocą Apache Beam

Niektóre zbiory danych są zbyt duże, aby można je było przetwarzać na jednej maszynie. tfds obsługuje generowanie danych na wielu komputerach przy użyciu Apache Beam .

Ten dokument składa się z dwóch sekcji:

  • Dla użytkownika, który chce wygenerować istniejący zbiór danych Beam
  • Dla programistów, którzy chcą utworzyć nowy zestaw danych Beam

Generowanie zestawu danych Beam

Poniżej znajdują się różne przykłady generowania zestawu danych Beam, zarówno w chmurze, jak i lokalnie.

W Google Cloud Dataflow

Aby uruchomić potok przy użyciu Google Cloud Dataflow i skorzystać z obliczeń rozproszonych, najpierw postępuj zgodnie z instrukcjami szybkiego startu .

Po skonfigurowaniu środowiska możesz uruchomić interfejs CLI tfds build , korzystając z katalogu danych w GCS i określając wymagane opcje dla flagi --beam_pipeline_options .

Aby ułatwić uruchomienie skryptu, warto zdefiniować następujące zmienne przy użyciu rzeczywistych wartości konfiguracji GCP/GCS i zbioru danych, który chcesz wygenerować:

DATASET_NAME=<dataset-name>
DATASET_CONFIG=<dataset-config>
GCP_PROJECT=my-project-id
GCS_BUCKET=gs://my-gcs-bucket

Następnie będziesz musiał utworzyć plik, aby poinformować Dataflow o zainstalowaniu tfds na pracownikach:

echo "tensorflow_datasets[$DATASET_NAME]" > /tmp/beam_requirements.txt

Jeśli używasz tfds-nightly , pamiętaj o powtórzeniu polecenia tfds-nightly na wypadek, gdyby zbiór danych został zaktualizowany od ostatniej wersji.

echo "tfds-nightly[$DATASET_NAME]" > /tmp/beam_requirements.txt

Jeśli używasz dodatkowych zależności, których nie ma w bibliotece TFDS, postępuj zgodnie z instrukcjami zarządzania zależnościami potoków Pythona .

Na koniec możesz uruchomić zadanie za pomocą poniższego polecenia:

tfds build $DATASET_NAME/$DATASET_CONFIG \
  --data_dir=$GCS_BUCKET/tensorflow_datasets \
  --beam_pipeline_options=\
"runner=DataflowRunner,project=$GCP_PROJECT,job_name=$DATASET_NAME-gen,"\
"staging_location=$GCS_BUCKET/binaries,temp_location=$GCS_BUCKET/temp,"\
"requirements_file=/tmp/beam_requirements.txt"

Lokalnie

Aby uruchomić skrypt lokalnie przy użyciu domyślnego modułu uruchamiającego Apache Beam (musi on zmieścić wszystkie dane w pamięci), polecenie jest takie samo jak w przypadku innych zestawów danych:

tfds build my_dataset

Aby uruchomić potok za pomocą Apache Flink, możesz przeczytać oficjalną dokumentację . Upewnij się, że Twój Beam jest zgodny z kompatybilnością wersji Flink

Aby ułatwić uruchomienie skryptu, pomocne jest zdefiniowanie następujących zmiennych przy użyciu rzeczywistych wartości konfiguracji Flink i zestawu danych, który chcesz wygenerować:

DATASET_NAME=<dataset-name>
DATASET_CONFIG=<dataset-config>
FLINK_CONFIG_DIR=<flink-config-directory>
FLINK_VERSION=<flink-version>

Aby uruchomić zadanie na osadzonym klastrze Flink, możesz uruchomić zadanie za pomocą poniższego polecenia:

tfds build $DATASET_NAME/$DATASET_CONFIG \
  --beam_pipeline_options=\
"runner=FlinkRunner,flink_version=$FLINK_VERSION,flink_conf_dir=$FLINK_CONFIG_DIR"

Z niestandardowym skryptem

Aby wygenerować zbiór danych w Beam, API jest takie samo jak w przypadku innych zbiorów danych. Możesz dostosować beam.Pipeline za pomocą argumentów beam_options (i beam_runner ) DownloadConfig .

# If you are running on Dataflow, Spark,..., you may have to set-up runtime
# flags. Otherwise, you can leave flags empty [].
flags = ['--runner=DataflowRunner', '--project=<project-name>', ...]

# `beam_options` (and `beam_runner`) will be forwarded to `beam.Pipeline`
dl_config = tfds.download.DownloadConfig(
    beam_options=beam.options.pipeline_options.PipelineOptions(flags=flags)
)
data_dir = 'gs://my-gcs-bucket/tensorflow_datasets'
builder = tfds.builder('wikipedia/20190301.en', data_dir=data_dir)
builder.download_and_prepare(download_config=dl_config)

Implementacja zbioru danych Beam

Warunki wstępne

Aby pisać zbiory danych Apache Beam, powinieneś znać następujące pojęcia:

Instrukcje

Jeśli znasz przewodnik tworzenia zestawu danych , dodanie zestawu danych Beam wymaga jedynie zmodyfikowania funkcji _generate_examples . Funkcja powinna zwrócić obiekt belki, a nie generator:

Zbiór danych inny niż wiązka:

def _generate_examples(self, path):
  for f in path.iterdir():
    yield _process_example(f)

Zbiór danych belki:

def _generate_examples(self, path):
  return (
      beam.Create(path.iterdir())
      | beam.Map(_process_example)
  )

Cała reszta może być w 100% identyczna, łącznie z testami.

Kilka dodatkowych uwag:

  • Użyj tfds.core.lazy_imports , aby zaimportować Apache Beam. Korzystając z leniwej zależności, użytkownicy mogą nadal czytać zbiór danych po jego wygenerowaniu, bez konieczności instalowania Beama.
  • Bądź ostrożny z zamknięciami Pythona. Podczas uruchamiania potoku funkcje beam.Map i beam.DoFn są serializowane za pomocą pickle i wysyłane do wszystkich procesów roboczych. Nie używaj zmiennych obiektów wewnątrz beam.PTransform , jeśli stan ma być współdzielony między pracownikami.
  • Ze względu na sposób serializacji tfds.core.DatasetBuilder za pomocą pickle, mutowanie tfds.core.DatasetBuilder podczas tworzenia danych zostanie zignorowane na pracownikach (np. nie jest możliwe ustawienie self.info.metadata['offset'] = 123 w _split_generators i uzyskaj do niego dostęp z poziomu pracowników, takich jak beam.Map(lambda x: x + self.info.metadata['offset'])
  • Jeśli chcesz udostępnić niektóre kroki potoku pomiędzy podziałami, możesz dodać dodatkowy pipeline: beam.Pipeline kwarg do _split_generator i kontrolować potok pełnej generacji. Zobacz dokumentację _generate_examples tfds.core.GeneratorBasedBuilder .

Przykład

Oto przykład zestawu danych Beam.

class DummyBeamDataset(tfds.core.GeneratorBasedBuilder):

  VERSION = tfds.core.Version('1.0.0')

  def _info(self):
    return self.dataset_info_from_configs(
        features=tfds.features.FeaturesDict({
            'image': tfds.features.Image(shape=(16, 16, 1)),
            'label': tfds.features.ClassLabel(names=['dog', 'cat']),
        }),
    )

  def _split_generators(self, dl_manager):
    ...
    return {
        'train': self._generate_examples(file_dir='path/to/train_data/'),
        'test': self._generate_examples(file_dir='path/to/test_data/'),
    }

  def _generate_examples(self, file_dir: str):
    """Generate examples as dicts."""
    beam = tfds.core.lazy_imports.apache_beam

    def _process_example(filename):
      # Use filename as key
      return filename, {
          'image': os.path.join(file_dir, filename),
          'label': filename.split('.')[1],  # Extract label: "0010102.dog.jpeg"
      }

    return (
        beam.Create(tf.io.gfile.listdir(file_dir))
        | beam.Map(_process_example)
    )

Uruchamianie rurociągu

Aby uruchomić potok, zapoznaj się z powyższą sekcją.

tfds build my_dataset --register_checksums

Potok wykorzystujący TFDS jako dane wejściowe

Jeśli chcesz utworzyć potok wiązki, który jako źródło przyjmuje zbiór danych TFDS, możesz użyć tfds.beam.ReadFromTFDS :

builder = tfds.builder('my_dataset')

_ = (
    pipeline
    | tfds.beam.ReadFromTFDS(builder, split='train')
    | beam.Map(tfds.as_numpy)
    | ...
)

Będzie przetwarzać każdy fragment zbioru danych równolegle.