การสร้างชุดข้อมูลขนาดใหญ่ด้วย Apache Beam

ชุดข้อมูลบางชุดมีขนาดใหญ่เกินกว่าจะประมวลผลในเครื่องเดียวได้ tfds รองรับการสร้างข้อมูลในหลาย ๆ เครื่องโดยใช้ Apache Beam

เอกสารนี้มีสองส่วน:

  • สำหรับผู้ใช้ที่ต้องการสร้างชุดข้อมูล Beam ที่มีอยู่
  • สำหรับนักพัฒนาที่ต้องการสร้างชุดข้อมูล Beam ใหม่

การสร้างชุดข้อมูล Beam

ด้านล่างนี้เป็นตัวอย่างต่างๆ ของการสร้างชุดข้อมูล Beam ทั้งบนคลาวด์หรือในเครื่อง

บน Google Cloud Dataflow

หากต้องการรันไปป์ไลน์โดยใช้ Google Cloud Dataflow และใช้ประโยชน์จากการคำนวณแบบกระจาย ขั้นแรกให้ทำตาม คำแนะนำการเริ่มต้นอย่างรวดเร็ว

เมื่อตั้งค่าสภาพแวดล้อมแล้ว คุณสามารถเรียกใช้ tfds build CLI ได้ โดยใช้ไดเรกทอรีข้อมูลบน GCS และระบุ ตัวเลือกที่จำเป็น สำหรับแฟล็ก --beam_pipeline_options

เพื่อให้เรียกใช้สคริปต์ได้ง่ายขึ้น คุณควรกำหนดตัวแปรต่อไปนี้โดยใช้ค่าจริงสำหรับการตั้งค่า GCP/GCS และชุดข้อมูลที่คุณต้องการสร้าง

DATASET_NAME=<dataset-name>
DATASET_CONFIG=<dataset-config>
GCP_PROJECT=my-project-id
GCS_BUCKET=gs://my-gcs-bucket

จากนั้น คุณจะต้องสร้างไฟล์เพื่อบอกให้ Dataflow ติดตั้ง tfds ให้กับผู้ปฏิบัติงาน:

echo "tensorflow_datasets[$DATASET_NAME]" > /tmp/beam_requirements.txt

หากคุณใช้ tfds-nightly อย่าลืมสะท้อนจาก tfds-nightly ในกรณีที่ชุดข้อมูลได้รับการอัปเดตตั้งแต่รุ่นล่าสุด

echo "tfds-nightly[$DATASET_NAME]" > /tmp/beam_requirements.txt

หากคุณใช้การขึ้นต่อกันเพิ่มเติมที่ไม่รวมอยู่ในไลบรารี TFDS ให้ทำตาม คำแนะนำในการจัดการการขึ้นต่อกันของไปป์ไลน์ Python

สุดท้าย คุณสามารถเริ่มงานได้โดยใช้คำสั่งด้านล่าง:

tfds build $DATASET_NAME/$DATASET_CONFIG \
  --data_dir=$GCS_BUCKET/tensorflow_datasets \
  --beam_pipeline_options=\
"runner=DataflowRunner,project=$GCP_PROJECT,job_name=$DATASET_NAME-gen,"\
"staging_location=$GCS_BUCKET/binaries,temp_location=$GCS_BUCKET/temp,"\
"requirements_file=/tmp/beam_requirements.txt"

ในท้องถิ่น

หากต้องการรันสคริปต์ของคุณภายในเครื่องโดยใช้ Apache Beam runner เริ่มต้น (ต้องพอดีกับข้อมูลทั้งหมดในหน่วยความจำ) คำสั่งจะเหมือนกับชุดข้อมูลอื่นๆ:

tfds build my_dataset

หากต้องการรันไปป์ไลน์โดยใช้ Apache Flink คุณสามารถอ่าน เอกสารอย่างเป็นทางการ ได้ ตรวจสอบให้แน่ใจว่า Beam ของคุณสอดคล้องกับ ความเข้ากันได้ของเวอร์ชัน Flink

เพื่อให้เปิดสคริปต์ได้ง่ายขึ้น คุณควรกำหนดตัวแปรต่อไปนี้โดยใช้ค่าจริงสำหรับการตั้งค่า Flink และชุดข้อมูลที่คุณต้องการสร้าง:

DATASET_NAME=<dataset-name>
DATASET_CONFIG=<dataset-config>
FLINK_CONFIG_DIR=<flink-config-directory>
FLINK_VERSION=<flink-version>

หากต้องการทำงานบนคลัสเตอร์ Flink แบบฝัง คุณสามารถเริ่มงานได้โดยใช้คำสั่งด้านล่าง:

tfds build $DATASET_NAME/$DATASET_CONFIG \
  --beam_pipeline_options=\
"runner=FlinkRunner,flink_version=$FLINK_VERSION,flink_conf_dir=$FLINK_CONFIG_DIR"

ด้วยสคริปต์ที่กำหนดเอง

ในการสร้างชุดข้อมูลบน Beam นั้น API จะเหมือนกับชุดข้อมูลอื่นๆ คุณสามารถปรับแต่ง beam.Pipeline ได้โดยใช้อาร์กิวเมนต์ beam_options (และ beam_runner ) ของ DownloadConfig

# If you are running on Dataflow, Spark,..., you may have to set-up runtime
# flags. Otherwise, you can leave flags empty [].
flags = ['--runner=DataflowRunner', '--project=<project-name>', ...]

# `beam_options` (and `beam_runner`) will be forwarded to `beam.Pipeline`
dl_config = tfds.download.DownloadConfig(
    beam_options=beam.options.pipeline_options.PipelineOptions(flags=flags)
)
data_dir = 'gs://my-gcs-bucket/tensorflow_datasets'
builder = tfds.builder('wikipedia/20190301.en', data_dir=data_dir)
builder.download_and_prepare(download_config=dl_config)

การใช้ชุดข้อมูล Beam

ข้อกำหนดเบื้องต้น

ในการเขียนชุดข้อมูล Apache Beam คุณควรคุ้นเคยกับแนวคิดต่อไปนี้:

คำแนะนำ

หากคุณคุ้นเคยกับ คู่มือการสร้างชุดข้อมูล การเพิ่มชุดข้อมูล Beam จะต้องแก้ไขฟังก์ชัน _generate_examples เท่านั้น ฟังก์ชั่นควรส่งคืนวัตถุลำแสงแทนที่จะเป็นตัวสร้าง:

ชุดข้อมูลที่ไม่ใช่ลำแสง:

def _generate_examples(self, path):
  for f in path.iterdir():
    yield _process_example(f)

ชุดข้อมูลบีม:

def _generate_examples(self, path):
  return (
      beam.Create(path.iterdir())
      | beam.Map(_process_example)
  )

ส่วนที่เหลือทั้งหมดสามารถเหมือนกันได้ 100% รวมถึงการทดสอบด้วย

ข้อควรพิจารณาเพิ่มเติมบางประการ:

  • ใช้ tfds.core.lazy_imports เพื่อนำเข้า Apache Beam เมื่อใช้การพึ่งพาแบบ Lazy ผู้ใช้ยังสามารถอ่านชุดข้อมูลได้หลังจากที่ถูกสร้างขึ้นโดยไม่ต้องติดตั้ง Beam
  • ระวังการปิด Python เมื่อรันไปป์ไลน์ ฟังก์ชัน beam.Map และ beam.DoFn จะถูกทำให้เป็นอนุกรมโดยใช้ pickle และส่งไปยังผู้ปฏิบัติงานทุกคน อย่าใช้วัตถุที่ไม่แน่นอนภายใน beam.PTransform หากต้องมีการแบ่งปันสถานะระหว่างคนงาน
  • เนื่องจากวิธีที่ tfds.core.DatasetBuilder ถูกทำให้เป็นอนุกรมด้วย Pickle การกลายพันธุ์ tfds.core.DatasetBuilder ในระหว่างการสร้างข้อมูลจะถูกละเว้นจากผู้ปฏิบัติงาน (เช่น ไม่สามารถตั้ง self.info.metadata['offset'] = 123 ใน _split_generators และเข้าถึงได้จากคนงานเช่น beam.Map(lambda x: x + self.info.metadata['offset']) )
  • หากคุณต้องการแชร์ขั้นตอนไปป์ไลน์ระหว่างการแยก คุณสามารถเพิ่ม pipeline: beam.Pipeline kwarg ถึง _split_generator และควบคุมไปป์ไลน์การสร้างแบบเต็ม ดูเอกสารประกอบ _generate_examples ของ tfds.core.GeneratorBasedBuilder

ตัวอย่าง

นี่คือตัวอย่างของชุดข้อมูล Beam

class DummyBeamDataset(tfds.core.GeneratorBasedBuilder):

  VERSION = tfds.core.Version('1.0.0')

  def _info(self):
    return self.dataset_info_from_configs(
        features=tfds.features.FeaturesDict({
            'image': tfds.features.Image(shape=(16, 16, 1)),
            'label': tfds.features.ClassLabel(names=['dog', 'cat']),
        }),
    )

  def _split_generators(self, dl_manager):
    ...
    return {
        'train': self._generate_examples(file_dir='path/to/train_data/'),
        'test': self._generate_examples(file_dir='path/to/test_data/'),
    }

  def _generate_examples(self, file_dir: str):
    """Generate examples as dicts."""
    beam = tfds.core.lazy_imports.apache_beam

    def _process_example(filename):
      # Use filename as key
      return filename, {
          'image': os.path.join(file_dir, filename),
          'label': filename.split('.')[1],  # Extract label: "0010102.dog.jpeg"
      }

    return (
        beam.Create(tf.io.gfile.listdir(file_dir))
        | beam.Map(_process_example)
    )

ดำเนินการไปป์ไลน์ของคุณ

หากต้องการรันไปป์ไลน์ โปรดดูที่ส่วนด้านบน

tfds build my_dataset --register_checksums

ไปป์ไลน์โดยใช้ TFDS เป็นอินพุต

หากคุณต้องการสร้างบีมไปป์ไลน์ซึ่งใช้ชุดข้อมูล TFDS เป็นแหล่งที่มา คุณสามารถใช้ tfds.beam.ReadFromTFDS :

builder = tfds.builder('my_dataset')

_ = (
    pipeline
    | tfds.beam.ReadFromTFDS(builder, split='train')
    | beam.Map(tfds.as_numpy)
    | ...
)

โดยจะประมวลผลแต่ละส่วนของชุดข้อมูลแบบขนาน