Apache Beam ile büyük veri kümeleri oluşturma

Bazı veri kümeleri tek bir makinede işlenemeyecek kadar büyüktür. tfds Apache Beam kullanarak birçok makinede veri oluşturulmasını destekler.

Bu belgenin iki bölümü vardır:

  • Mevcut bir Beam veri kümesi oluşturmak isteyen kullanıcılar için
  • Yeni bir Beam veri kümesi oluşturmak isteyen geliştiriciler için

Beam veri kümesi oluşturma

Aşağıda hem bulutta hem de yerel olarak Beam veri kümesi oluşturmanın farklı örnekleri bulunmaktadır.

Google Cloud Dataflow'da

Ardışık düzeni Google Cloud Veri Akışı'nı kullanarak çalıştırmak ve dağıtılmış hesaplamanın avantajlarından yararlanmak için öncelikle Hızlı Başlangıç ​​talimatlarını izleyin.

Ortamınız kurulduktan sonra, GCS'deki bir veri dizinini kullanarak ve --beam_pipeline_options bayrağı için gerekli seçenekleri belirterek tfds build CLI'yi çalıştırabilirsiniz.

Komut dosyasını başlatmayı kolaylaştırmak için, GCP/GCS kurulumunuzun ve oluşturmak istediğiniz veri kümesinin gerçek değerlerini kullanarak aşağıdaki değişkenleri tanımlamanız faydalı olacaktır:

DATASET_NAME=<dataset-name>
DATASET_CONFIG=<dataset-config>
GCP_PROJECT=my-project-id
GCS_BUCKET=gs://my-gcs-bucket

Daha sonra Dataflow'a çalışanlara tfds yüklemesini bildirmek için bir dosya oluşturmanız gerekecektir:

echo "tensorflow_datasets[$DATASET_NAME]" > /tmp/beam_requirements.txt

tfds-nightly kullanıyorsanız, veri kümesinin son sürümden bu yana güncellenmesi ihtimaline karşı tfds-nightly yankı yaptığınızdan emin olun.

echo "tfds-nightly[$DATASET_NAME]" > /tmp/beam_requirements.txt

TFDS kitaplığında yer almayan ek bağımlılıklar kullanıyorsanız Python işlem hattı bağımlılıklarını yönetme talimatlarını izleyin.

Son olarak aşağıdaki komutu kullanarak işi başlatabilirsiniz:

tfds build $DATASET_NAME/$DATASET_CONFIG \
  --data_dir=$GCS_BUCKET/tensorflow_datasets \
  --beam_pipeline_options=\
"runner=DataflowRunner,project=$GCP_PROJECT,job_name=$DATASET_NAME-gen,"\
"staging_location=$GCS_BUCKET/binaries,temp_location=$GCS_BUCKET/temp,"\
"requirements_file=/tmp/beam_requirements.txt"

Yerel olarak

Komut dosyanızı varsayılan Apache Beam çalıştırıcısını kullanarak yerel olarak çalıştırmak için (tüm verileri belleğe sığdırmalıdır), komut diğer veri kümeleriyle aynıdır:

tfds build my_dataset

Boru hattını Apache Flink kullanarak çalıştırmak için resmi belgeleri okuyabilirsiniz. Beam'inizin Flink Sürüm Uyumluluğu ile uyumlu olduğundan emin olun

Komut dosyasını başlatmayı kolaylaştırmak için, Flink kurulumunuzun ve oluşturmak istediğiniz veri kümesinin gerçek değerlerini kullanarak aşağıdaki değişkenleri tanımlamanız faydalı olacaktır:

DATASET_NAME=<dataset-name>
DATASET_CONFIG=<dataset-config>
FLINK_CONFIG_DIR=<flink-config-directory>
FLINK_VERSION=<flink-version>

Yerleşik bir Flink kümesinde çalıştırmak için aşağıdaki komutu kullanarak işi başlatabilirsiniz:

tfds build $DATASET_NAME/$DATASET_CONFIG \
  --beam_pipeline_options=\
"runner=FlinkRunner,flink_version=$FLINK_VERSION,flink_conf_dir=$FLINK_CONFIG_DIR"

Özel bir komut dosyasıyla

Beam'de veri kümesi oluşturmak için kullanılan API, diğer veri kümeleriyle aynıdır. beam.Pipeline öğesini, DownloadConfig öğesinin beam_options (ve beam_runner ) bağımsız değişkenlerini kullanarak özelleştirebilirsiniz.

# If you are running on Dataflow, Spark,..., you may have to set-up runtime
# flags. Otherwise, you can leave flags empty [].
flags = ['--runner=DataflowRunner', '--project=<project-name>', ...]

# `beam_options` (and `beam_runner`) will be forwarded to `beam.Pipeline`
dl_config = tfds.download.DownloadConfig(
    beam_options=beam.options.pipeline_options.PipelineOptions(flags=flags)
)
data_dir = 'gs://my-gcs-bucket/tensorflow_datasets'
builder = tfds.builder('wikipedia/20190301.en', data_dir=data_dir)
builder.download_and_prepare(download_config=dl_config)

Beam veri kümesinin uygulanması

Önkoşullar

Apache Beam veri kümelerini yazmak için aşağıdaki kavramlara aşina olmanız gerekir:

Talimatlar

Veri kümesi oluşturma kılavuzuna aşina iseniz, Beam veri kümesi eklemek yalnızca _generate_examples işlevini değiştirmenizi gerektirir. İşlev, bir üreteç yerine bir ışın nesnesi döndürmelidir:

Işın olmayan veri kümesi:

def _generate_examples(self, path):
  for f in path.iterdir():
    yield _process_example(f)

Işın veri kümesi:

def _generate_examples(self, path):
  return (
      beam.Create(path.iterdir())
      | beam.Map(_process_example)
  )

Testler dahil geri kalan her şey %100 aynı olabilir.

Bazı ek hususlar:

  • Apache Beam'i içe aktarmak için tfds.core.lazy_imports kullanın. Tembel bir bağımlılık kullanarak, kullanıcılar veri kümesini oluşturulduktan sonra Beam'i yüklemeye gerek kalmadan okuyabilirler.
  • Python kapanışlarına dikkat edin. İşlem hattını çalıştırırken, beam.Map ve beam.DoFn işlevleri pickle kullanılarak serileştirilir ve tüm çalışanlara gönderilir. Bir beam.PTransform içinde değiştirilebilir nesneler kullanmayın. Durumun çalışanlar arasında paylaşılması gerekiyorsa dönüştürün.
  • tfds.core.DatasetBuilder turşu ile serileştirilme şekli nedeniyle, veri oluşturma sırasında tfds.core.DatasetBuilder mutasyona uğraması çalışanlarda göz ardı edilecektir (örneğin, _split_generators self.info.metadata['offset'] = 123 ayarlamak mümkün değildir) ve buna beam.Map(lambda x: x + self.info.metadata['offset']) gibi işçilerden erişin
  • Bölmeler arasında bazı işlem hattı adımlarını paylaşmanız gerekiyorsa, fazladan bir işlem pipeline: beam.Pipeline _split_generator ışın.Pipeline kwarg ve tam nesil işlem hattını kontrol edebilirsiniz. tfds.core.GeneratorBasedBuilder _generate_examples belgelerine bakın.

Örnek

İşte Beam veri kümesinin bir örneği.

class DummyBeamDataset(tfds.core.GeneratorBasedBuilder):

  VERSION = tfds.core.Version('1.0.0')

  def _info(self):
    return self.dataset_info_from_configs(
        features=tfds.features.FeaturesDict({
            'image': tfds.features.Image(shape=(16, 16, 1)),
            'label': tfds.features.ClassLabel(names=['dog', 'cat']),
        }),
    )

  def _split_generators(self, dl_manager):
    ...
    return {
        'train': self._generate_examples(file_dir='path/to/train_data/'),
        'test': self._generate_examples(file_dir='path/to/test_data/'),
    }

  def _generate_examples(self, file_dir: str):
    """Generate examples as dicts."""
    beam = tfds.core.lazy_imports.apache_beam

    def _process_example(filename):
      # Use filename as key
      return filename, {
          'image': os.path.join(file_dir, filename),
          'label': filename.split('.')[1],  # Extract label: "0010102.dog.jpeg"
      }

    return (
        beam.Create(tf.io.gfile.listdir(file_dir))
        | beam.Map(_process_example)
    )

İşlem hattınızı çalıştırma

Boru hattını çalıştırmak için yukarıdaki bölüme bakın.

tfds build my_dataset --register_checksums

Giriş olarak TFDS'yi kullanan işlem hattı

Kaynak olarak TFDS veri kümesini alan bir ışın hattı oluşturmak istiyorsanız tfds.beam.ReadFromTFDS öğesini kullanabilirsiniz:

builder = tfds.builder('my_dataset')

_ = (
    pipeline
    | tfds.beam.ReadFromTFDS(builder, split='train')
    | beam.Map(tfds.as_numpy)
    | ...
)

Veri kümesinin her bir parçasını paralel olarak işleyecektir.