Tạo tập dữ liệu lớn với Apache Beam

Một số bộ dữ liệu quá lớn để có thể xử lý trên một máy. tfds hỗ trợ tạo dữ liệu trên nhiều máy bằng cách sử dụng Apache Beam .

Tài liệu này có hai phần:

  • Đối với người dùng muốn tạo tập dữ liệu Beam hiện có
  • Dành cho các nhà phát triển muốn tạo tập dữ liệu Beam mới

Tạo tập dữ liệu Beam

Dưới đây là các ví dụ khác nhau về cách tạo tập dữ liệu Beam, cả trên đám mây hoặc cục bộ.

Trên Google Cloud Dataflow

Để chạy quy trình bằng Google Cloud Dataflow và tận dụng tính toán phân tán, trước tiên hãy làm theo hướng dẫn Bắt đầu nhanh.

Sau khi môi trường của bạn được thiết lập, bạn có thể chạy CLI tfds build bằng cách sử dụng thư mục dữ liệu trên GCS và chỉ định các tùy chọn bắt buộc cho cờ --beam_pipeline_options .

Để khởi chạy tập lệnh dễ dàng hơn, sẽ hữu ích khi xác định các biến sau bằng cách sử dụng các giá trị thực tế cho thiết lập GCP / GCS của bạn và tập dữ liệu bạn muốn tạo:

DATASET_NAME=<dataset-name>
DATASET_CONFIG=<dataset-config>
GCP_PROJECT=my-project-id
GCS_BUCKET=gs://my-gcs-bucket

Sau đó, bạn sẽ cần tạo một tệp để yêu cầu Dataflow cài đặt tfds trên worker:

echo "tensorflow_datasets[$DATASET_NAME]" > /tmp/beam_requirements.txt

Nếu bạn đang sử dụng tfds-nightly , hãy đảm bảo gửi lại từ tfds-nightly trong trường hợp tập dữ liệu đã được cập nhật kể từ lần phát hành cuối cùng.

echo "tfds-nightly[$DATASET_NAME]" > /tmp/beam_requirements.txt

Cuối cùng, bạn có thể khởi chạy công việc bằng lệnh dưới đây:

tfds build $DATASET_NAME/$DATASET_CONFIG \
  --data_dir=$GCS_BUCKET/tensorflow_datasets \
  --beam_pipeline_options=\
"runner=DataflowRunner,project=$GCP_PROJECT,job_name=$DATASET_NAME-gen,"\
"staging_location=$GCS_BUCKET/binaries,temp_location=$GCS_BUCKET/temp,"\
"requirements_file=/tmp/beam_requirements.txt"

Cục bộ

Để chạy tập lệnh của bạn cục bộ bằng trình chạy Apache Beam mặc định, lệnh tương tự như đối với các tập dữ liệu khác:

tfds build my_dataset

Với một tập lệnh tùy chỉnh

Để tạo tập dữ liệu trên Beam, API cũng giống như đối với các tập dữ liệu khác. Bạn có thể tùy chỉnh beam.Pipeline bằng cách sử dụng các đối số beam_options (và beam_runner ) của DownloadConfig .

# If you are running on Dataflow, Spark,..., you may have to set-up runtime
# flags. Otherwise, you can leave flags empty [].
flags = ['--runner=DataflowRunner', '--project=<project-name>', ...]

# `beam_options` (and `beam_runner`) will be forwarded to `beam.Pipeline`
dl_config = tfds.download.DownloadConfig(
    beam_options=beam.options.pipeline_options.PipelineOptions(flags=flags)
)
data_dir = 'gs://my-gcs-bucket/tensorflow_datasets'
builder = tfds.builder('wikipedia/20190301.en', data_dir=data_dir)
builder.download_and_prepare(download_config=dl_config)

Triển khai tập dữ liệu Beam

Điều kiện tiên quyết

Để viết tập dữ liệu Apache Beam, bạn nên nắm rõ các khái niệm sau:

Hướng dẫn

Nếu bạn đã quen với hướng dẫn tạo tập dữ liệu , việc thêm tập dữ liệu Beam chỉ yêu cầu sửa đổi hàm _generate_examples . Hàm sẽ trả về một đối tượng chùm chứ không phải một trình tạo:

Bộ dữ liệu không chùm:

def _generate_examples(self, path):
  for f in path.iterdir():
    yield _process_example(f)

Bộ dữ liệu chùm:

def _generate_examples(self, path):
  return (
      beam.Create(path.iterdir())
      | beam.Map(_process_example)
  )

Tất cả các phần còn lại có thể giống nhau 100%, bao gồm cả các bài kiểm tra.

Một số cân nhắc bổ sung:

  • Sử dụng tfds.core.lazy_imports để nhập Apache Beam. Bằng cách sử dụng phụ thuộc lười biếng, người dùng vẫn có thể đọc tập dữ liệu sau khi nó đã được tạo mà không cần phải cài đặt Beam.
  • Hãy cẩn thận với việc đóng Python. Khi chạy đường ống, các chức năng beam.Mapbeam.DoFn được tuần tự hóa bằng cách sử dụng pickle và gửi đến tất cả công nhân. Không biến đổi các đối tượng bên trong beam.PTransform nếu trạng thái phải được chia sẻ giữa các worker.
  • Do cách tfds.core.DatasetBuilder được tuần tự hóa với pickle, việc thay đổi tfds.core.DatasetBuilder trong quá trình tạo dữ liệu sẽ bị bỏ qua trên worker (ví dụ: không thể đặt self.info.metadata['offset'] = 123 trong _split_generators và truy cập nó từ các worker như beam.Map(lambda x: x + self.info.metadata['offset']) )
  • Nếu bạn cần chia sẻ một số bước đường ống giữa các phần, bạn có thể thêm một pipeline: beam.Pipeline kwarg vào _split_generator và kiểm soát đường ống tạo đầy đủ. Xem tài liệu _generate_examples của tfds.core.GeneratorBasedBuilder .

Thí dụ

Đây là một ví dụ về tập dữ liệu Beam.

class DummyBeamDataset(tfds.core.GeneratorBasedBuilder):

  VERSION = tfds.core.Version('1.0.0')

  def _info(self):
    return tfds.core.DatasetInfo(
        builder=self,
        features=tfds.features.FeaturesDict({
            'image': tfds.features.Image(shape=(16, 16, 1)),
            'label': tfds.features.ClassLabel(names=['dog', 'cat']),
        }),
    )

  def _split_generators(self, dl_manager):
    ...
    return {
        'train': self._generate_examples(file_dir='path/to/train_data/'),
        'test': self._generate_examples(file_dir='path/to/test_data/'),
    }

  def _generate_examples(self, file_dir: str):
    """Generate examples as dicts."""
    beam = tfds.core.lazy_imports.apache_beam

    def _process_example(filename):
      # Use filename as key
      return filename, {
          'image': os.path.join(file_dir, filename),
          'label': filename.split('.')[1],  # Extract label: "0010102.dog.jpeg"
      }

    return (
        beam.Create(tf.io.gfile.listdir(file_dir))
        | beam.Map(_process_example)
    )

Chạy đường ống của bạn

Để chạy đường ống, hãy xem phần trên.

tfds build my_dataset --register_checksums

Đường ống sử dụng TFDS làm đầu vào

Nếu bạn muốn tạo đường ống chùm lấy tập dữ liệu TFDS làm nguồn, bạn có thể sử dụng tfds.beam.ReadFromTFDS :

builder = tfds.builder('my_dataset')

_ = (
    pipeline
    | tfds.beam.ReadFromTFDS(builder, split='train')
    | beam.Map(tfds.as_numpy)
    | ...
)

Nó sẽ xử lý song song từng phân đoạn của tập dữ liệu.