Tạo bộ dữ liệu lớn với Apache Beam

Một số bộ dữ liệu quá lớn để có thể xử lý trên một máy. tfds hỗ trợ tạo dữ liệu trên nhiều máy bằng cách sử dụng Apache Beam .

Tài liệu này có hai phần:

  • Dành cho người dùng muốn tạo tập dữ liệu Beam hiện có
  • Dành cho các nhà phát triển muốn tạo tập dữ liệu Beam mới

Tạo tập dữ liệu Beam

Dưới đây là các ví dụ khác nhau về cách tạo tập dữ liệu Beam, cả trên đám mây hoặc cục bộ.

Trên luồng dữ liệu đám mây của Google

Để chạy quy trình bằng Google Cloud Dataflow và tận dụng tính toán phân tán, trước tiên hãy làm theo hướng dẫn Bắt đầu nhanh .

Sau khi thiết lập môi trường, bạn có thể chạy tfds build CLI bằng cách sử dụng thư mục dữ liệu trên GCS và chỉ định các tùy chọn bắt buộc cho cờ --beam_pipeline_options .

Để giúp khởi chạy tập lệnh dễ dàng hơn, bạn nên xác định các biến sau bằng cách sử dụng các giá trị thực tế cho thiết lập GCP/GCS và tập dữ liệu bạn muốn tạo:

DATASET_NAME=<dataset-name>
DATASET_CONFIG=<dataset-config>
GCP_PROJECT=my-project-id
GCS_BUCKET=gs://my-gcs-bucket

Sau đó, bạn sẽ cần tạo một tệp để yêu cầu Dataflow cài đặt tfds trên các trình chạy:

echo "tensorflow_datasets[$DATASET_NAME]" > /tmp/beam_requirements.txt

Nếu bạn đang sử dụng tfds-nightly , hãy đảm bảo lặp lại từ tfds-nightly trong trường hợp tập dữ liệu đã được cập nhật kể từ lần phát hành cuối cùng.

echo "tfds-nightly[$DATASET_NAME]" > /tmp/beam_requirements.txt

Nếu bạn đang sử dụng các phần phụ thuộc bổ sung không có trong thư viện TFDS, hãy làm theo hướng dẫn để quản lý các phần phụ thuộc đường dẫn Python .

Cuối cùng, bạn có thể khởi chạy công việc bằng lệnh bên dưới:

tfds build $DATASET_NAME/$DATASET_CONFIG \
  --data_dir=$GCS_BUCKET/tensorflow_datasets \
  --beam_pipeline_options=\
"runner=DataflowRunner,project=$GCP_PROJECT,job_name=$DATASET_NAME-gen,"\
"staging_location=$GCS_BUCKET/binaries,temp_location=$GCS_BUCKET/temp,"\
"requirements_file=/tmp/beam_requirements.txt"

Tại địa phương

Để chạy cục bộ tập lệnh của bạn bằng trình chạy Apache Beam mặc định (nó phải vừa với tất cả dữ liệu trong bộ nhớ), lệnh này cũng giống như đối với các bộ dữ liệu khác:

tfds build my_dataset

Để chạy quy trình bằng Apache Flink, bạn có thể đọc tài liệu chính thức . Đảm bảo Beam của bạn tuân thủ Khả năng tương thích phiên bản Flink

Để giúp khởi chạy tập lệnh dễ dàng hơn, bạn nên xác định các biến sau bằng cách sử dụng các giá trị thực tế cho thiết lập Flink và tập dữ liệu bạn muốn tạo:

DATASET_NAME=<dataset-name>
DATASET_CONFIG=<dataset-config>
FLINK_CONFIG_DIR=<flink-config-directory>
FLINK_VERSION=<flink-version>

Để chạy trên cụm Flink được nhúng, bạn có thể khởi chạy công việc bằng lệnh bên dưới:

tfds build $DATASET_NAME/$DATASET_CONFIG \
  --beam_pipeline_options=\
"runner=FlinkRunner,flink_version=$FLINK_VERSION,flink_conf_dir=$FLINK_CONFIG_DIR"

Với một kịch bản tùy chỉnh

Để tạo tập dữ liệu trên Beam, API cũng giống như các tập dữ liệu khác. Bạn có thể tùy chỉnh beam.Pipeline bằng cách sử dụng các đối số beam_options (và beam_runner ) của DownloadConfig .

# If you are running on Dataflow, Spark,..., you may have to set-up runtime
# flags. Otherwise, you can leave flags empty [].
flags = ['--runner=DataflowRunner', '--project=<project-name>', ...]

# `beam_options` (and `beam_runner`) will be forwarded to `beam.Pipeline`
dl_config = tfds.download.DownloadConfig(
    beam_options=beam.options.pipeline_options.PipelineOptions(flags=flags)
)
data_dir = 'gs://my-gcs-bucket/tensorflow_datasets'
builder = tfds.builder('wikipedia/20190301.en', data_dir=data_dir)
builder.download_and_prepare(download_config=dl_config)

Triển khai bộ dữ liệu Beam

Điều kiện tiên quyết

Để viết bộ dữ liệu Apache Beam, bạn nên làm quen với các khái niệm sau:

Hướng dẫn

Nếu bạn đã quen với hướng dẫn tạo tập dữ liệu thì việc thêm tập dữ liệu Beam chỉ yêu cầu sửa đổi hàm _generate_examples . Hàm sẽ trả về một đối tượng chùm, thay vì một trình tạo:

Tập dữ liệu không có chùm tia:

def _generate_examples(self, path):
  for f in path.iterdir():
    yield _process_example(f)

Tập dữ liệu chùm:

def _generate_examples(self, path):
  return (
      beam.Create(path.iterdir())
      | beam.Map(_process_example)
  )

Tất cả phần còn lại có thể giống nhau 100%, bao gồm cả các bài kiểm tra.

Một số cân nhắc bổ sung:

  • Sử dụng tfds.core.lazy_imports để nhập Apache Beam. Bằng cách sử dụng phần phụ thuộc lười biếng, người dùng vẫn có thể đọc tập dữ liệu sau khi được tạo mà không cần phải cài đặt Beam.
  • Hãy cẩn thận với việc đóng Python. Khi chạy quy trình, các hàm beam.Mapbeam.DoFn được tuần tự hóa bằng cách sử dụng pickle và gửi đến tất cả các công nhân. Không sử dụng các đối tượng có thể thay đổi bên trong một beam.PTransform nếu trạng thái phải được chia sẻ giữa các công nhân.
  • Do cách tfds.core.DatasetBuilder được tuần tự hóa bằng dưa chua, nên việc thay đổi tfds.core.DatasetBuilder trong quá trình tạo dữ liệu sẽ bị bỏ qua đối với các trình chạy (ví dụ: không thể đặt self.info.metadata['offset'] = 123 trong _split_generators và truy cập nó từ các công nhân như beam.Map(lambda x: x + self.info.metadata['offset']) )
  • Nếu cần chia sẻ một số bước quy trình giữa các phần tách, bạn có thể thêm một pipeline: beam.Pipeline kwarg vào _split_generator và kiểm soát quy trình tạo toàn bộ. Xem tài liệu _generate_examples của tfds.core.GeneratorBasedBuilder .

Ví dụ

Đây là một ví dụ về tập dữ liệu Beam.

class DummyBeamDataset(tfds.core.GeneratorBasedBuilder):

  VERSION = tfds.core.Version('1.0.0')

  def _info(self):
    return self.dataset_info_from_configs(
        features=tfds.features.FeaturesDict({
            'image': tfds.features.Image(shape=(16, 16, 1)),
            'label': tfds.features.ClassLabel(names=['dog', 'cat']),
        }),
    )

  def _split_generators(self, dl_manager):
    ...
    return {
        'train': self._generate_examples(file_dir='path/to/train_data/'),
        'test': self._generate_examples(file_dir='path/to/test_data/'),
    }

  def _generate_examples(self, file_dir: str):
    """Generate examples as dicts."""
    beam = tfds.core.lazy_imports.apache_beam

    def _process_example(filename):
      # Use filename as key
      return filename, {
          'image': os.path.join(file_dir, filename),
          'label': filename.split('.')[1],  # Extract label: "0010102.dog.jpeg"
      }

    return (
        beam.Create(tf.io.gfile.listdir(file_dir))
        | beam.Map(_process_example)
    )

Chạy đường ống của bạn

Để chạy quy trình, hãy xem phần trên.

tfds build my_dataset --register_checksums

Đường ống sử dụng TFDS làm đầu vào

Nếu bạn muốn tạo một đường dẫn chùm lấy tập dữ liệu TFDS làm nguồn, bạn có thể sử dụng tfds.beam.ReadFromTFDS :

builder = tfds.builder('my_dataset')

_ = (
    pipeline
    | tfds.beam.ReadFromTFDS(builder, split='train')
    | beam.Map(tfds.as_numpy)
    | ...
)

Nó sẽ xử lý song song từng phân đoạn của tập dữ liệu.