ชุดข้อมูลบางชุดมีขนาดใหญ่เกินกว่าจะประมวลผลในเครื่องเดียวได้ tfds
รองรับการสร้างข้อมูลในหลาย ๆ เครื่องโดยใช้ Apache Beam
เอกสารนี้มีสองส่วน:
- สำหรับผู้ใช้ที่ต้องการสร้างชุดข้อมูล Beam ที่มีอยู่
- สำหรับนักพัฒนาที่ต้องการสร้างชุดข้อมูล Beam ใหม่
การสร้างชุดข้อมูล Beam
ด้านล่างนี้เป็นตัวอย่างต่างๆ ของการสร้างชุดข้อมูล Beam ทั้งบนคลาวด์หรือในเครื่อง
บน Google Cloud Dataflow
หากต้องการรันไปป์ไลน์โดยใช้ Google Cloud Dataflow และใช้ประโยชน์จากการคำนวณแบบกระจาย ขั้นแรกให้ทำตาม คำแนะนำการเริ่มต้นอย่างรวดเร็ว
เมื่อตั้งค่าสภาพแวดล้อมแล้ว คุณสามารถเรียกใช้ tfds build
CLI ได้ โดยใช้ไดเรกทอรีข้อมูลบน GCS และระบุ ตัวเลือกที่จำเป็น สำหรับแฟล็ก --beam_pipeline_options
เพื่อให้เรียกใช้สคริปต์ได้ง่ายขึ้น คุณควรกำหนดตัวแปรต่อไปนี้โดยใช้ค่าจริงสำหรับการตั้งค่า GCP/GCS และชุดข้อมูลที่คุณต้องการสร้าง
DATASET_NAME=<dataset-name>
DATASET_CONFIG=<dataset-config>
GCP_PROJECT=my-project-id
GCS_BUCKET=gs://my-gcs-bucket
จากนั้น คุณจะต้องสร้างไฟล์เพื่อบอกให้ Dataflow ติดตั้ง tfds
ให้กับผู้ปฏิบัติงาน:
echo "tensorflow_datasets[$DATASET_NAME]" > /tmp/beam_requirements.txt
หากคุณใช้ tfds-nightly
อย่าลืมสะท้อนจาก tfds-nightly
ในกรณีที่ชุดข้อมูลได้รับการอัปเดตตั้งแต่รุ่นล่าสุด
echo "tfds-nightly[$DATASET_NAME]" > /tmp/beam_requirements.txt
หากคุณใช้การขึ้นต่อกันเพิ่มเติมที่ไม่รวมอยู่ในไลบรารี TFDS ให้ทำตาม คำแนะนำในการจัดการการขึ้นต่อกันของไปป์ไลน์ Python
สุดท้าย คุณสามารถเริ่มงานได้โดยใช้คำสั่งด้านล่าง:
tfds build $DATASET_NAME/$DATASET_CONFIG \
--data_dir=$GCS_BUCKET/tensorflow_datasets \
--beam_pipeline_options=\
"runner=DataflowRunner,project=$GCP_PROJECT,job_name=$DATASET_NAME-gen,"\
"staging_location=$GCS_BUCKET/binaries,temp_location=$GCS_BUCKET/temp,"\
"requirements_file=/tmp/beam_requirements.txt"
ในท้องถิ่น
หากต้องการรันสคริปต์ของคุณภายในเครื่องโดยใช้ Apache Beam runner เริ่มต้น (ต้องพอดีกับข้อมูลทั้งหมดในหน่วยความจำ) คำสั่งจะเหมือนกับชุดข้อมูลอื่นๆ:
tfds build my_dataset
ด้วย Apache Flink
หากต้องการรันไปป์ไลน์โดยใช้ Apache Flink คุณสามารถอ่าน เอกสารอย่างเป็นทางการ ได้ ตรวจสอบให้แน่ใจว่า Beam ของคุณสอดคล้องกับ ความเข้ากันได้ของเวอร์ชัน Flink
เพื่อให้เปิดสคริปต์ได้ง่ายขึ้น คุณควรกำหนดตัวแปรต่อไปนี้โดยใช้ค่าจริงสำหรับการตั้งค่า Flink และชุดข้อมูลที่คุณต้องการสร้าง:
DATASET_NAME=<dataset-name>
DATASET_CONFIG=<dataset-config>
FLINK_CONFIG_DIR=<flink-config-directory>
FLINK_VERSION=<flink-version>
หากต้องการทำงานบนคลัสเตอร์ Flink แบบฝัง คุณสามารถเริ่มงานได้โดยใช้คำสั่งด้านล่าง:
tfds build $DATASET_NAME/$DATASET_CONFIG \
--beam_pipeline_options=\
"runner=FlinkRunner,flink_version=$FLINK_VERSION,flink_conf_dir=$FLINK_CONFIG_DIR"
ด้วยสคริปต์ที่กำหนดเอง
ในการสร้างชุดข้อมูลบน Beam นั้น API จะเหมือนกับชุดข้อมูลอื่นๆ คุณสามารถปรับแต่ง beam.Pipeline
ได้โดยใช้อาร์กิวเมนต์ beam_options
(และ beam_runner
) ของ DownloadConfig
# If you are running on Dataflow, Spark,..., you may have to set-up runtime
# flags. Otherwise, you can leave flags empty [].
flags = ['--runner=DataflowRunner', '--project=<project-name>', ...]
# `beam_options` (and `beam_runner`) will be forwarded to `beam.Pipeline`
dl_config = tfds.download.DownloadConfig(
beam_options=beam.options.pipeline_options.PipelineOptions(flags=flags)
)
data_dir = 'gs://my-gcs-bucket/tensorflow_datasets'
builder = tfds.builder('wikipedia/20190301.en', data_dir=data_dir)
builder.download_and_prepare(download_config=dl_config)
การใช้ชุดข้อมูล Beam
ข้อกำหนดเบื้องต้น
ในการเขียนชุดข้อมูล Apache Beam คุณควรคุ้นเคยกับแนวคิดต่อไปนี้:
- ทำความคุ้นเคยกับ คู่มือการสร้างชุดข้อมูล
tfds
เนื่องจากเนื้อหาส่วนใหญ่ยังคงใช้กับชุดข้อมูล Beam - รับข้อมูลเบื้องต้นเกี่ยวกับ Apache Beam พร้อม คู่มือการเขียนโปรแกรม Beam
- หากคุณต้องการสร้างชุดข้อมูลโดยใช้ Cloud Dataflow โปรดอ่าน เอกสาร Google Cloud และ คู่มือการพึ่งพา Apache Beam
คำแนะนำ
หากคุณคุ้นเคยกับ คู่มือการสร้างชุดข้อมูล การเพิ่มชุดข้อมูล Beam จะต้องแก้ไขฟังก์ชัน _generate_examples
เท่านั้น ฟังก์ชั่นควรส่งคืนวัตถุลำแสงแทนที่จะเป็นตัวสร้าง:
ชุดข้อมูลที่ไม่ใช่ลำแสง:
def _generate_examples(self, path):
for f in path.iterdir():
yield _process_example(f)
ชุดข้อมูลบีม:
def _generate_examples(self, path):
return (
beam.Create(path.iterdir())
| beam.Map(_process_example)
)
ส่วนที่เหลือทั้งหมดสามารถเหมือนกันได้ 100% รวมถึงการทดสอบด้วย
ข้อควรพิจารณาเพิ่มเติมบางประการ:
- ใช้
tfds.core.lazy_imports
เพื่อนำเข้า Apache Beam เมื่อใช้การพึ่งพาแบบ Lazy ผู้ใช้ยังสามารถอ่านชุดข้อมูลได้หลังจากที่ถูกสร้างขึ้นโดยไม่ต้องติดตั้ง Beam - ระวังการปิด Python เมื่อรันไปป์ไลน์ ฟังก์ชัน
beam.Map
และbeam.DoFn
จะถูกทำให้เป็นอนุกรมโดยใช้pickle
และส่งไปยังผู้ปฏิบัติงานทุกคน อย่าใช้วัตถุที่ไม่แน่นอนภายในbeam.PTransform
หากต้องมีการแบ่งปันสถานะระหว่างคนงาน - เนื่องจากวิธีที่
tfds.core.DatasetBuilder
ถูกทำให้เป็นอนุกรมด้วย Pickle การกลายพันธุ์tfds.core.DatasetBuilder
ในระหว่างการสร้างข้อมูลจะถูกละเว้นจากผู้ปฏิบัติงาน (เช่น ไม่สามารถตั้งself.info.metadata['offset'] = 123
ใน_split_generators
และเข้าถึงได้จากคนงานเช่นbeam.Map(lambda x: x + self.info.metadata['offset'])
) - หากคุณต้องการแชร์ขั้นตอนไปป์ไลน์ระหว่างการแยก คุณสามารถเพิ่ม
pipeline: beam.Pipeline
kwarg ถึง_split_generator
และควบคุมไปป์ไลน์การสร้างแบบเต็ม ดูเอกสารประกอบ_generate_examples
ของtfds.core.GeneratorBasedBuilder
ตัวอย่าง
นี่คือตัวอย่างของชุดข้อมูล Beam
class DummyBeamDataset(tfds.core.GeneratorBasedBuilder):
VERSION = tfds.core.Version('1.0.0')
def _info(self):
return self.dataset_info_from_configs(
features=tfds.features.FeaturesDict({
'image': tfds.features.Image(shape=(16, 16, 1)),
'label': tfds.features.ClassLabel(names=['dog', 'cat']),
}),
)
def _split_generators(self, dl_manager):
...
return {
'train': self._generate_examples(file_dir='path/to/train_data/'),
'test': self._generate_examples(file_dir='path/to/test_data/'),
}
def _generate_examples(self, file_dir: str):
"""Generate examples as dicts."""
beam = tfds.core.lazy_imports.apache_beam
def _process_example(filename):
# Use filename as key
return filename, {
'image': os.path.join(file_dir, filename),
'label': filename.split('.')[1], # Extract label: "0010102.dog.jpeg"
}
return (
beam.Create(tf.io.gfile.listdir(file_dir))
| beam.Map(_process_example)
)
ดำเนินการไปป์ไลน์ของคุณ
หากต้องการรันไปป์ไลน์ โปรดดูที่ส่วนด้านบน
tfds build my_dataset --register_checksums
ไปป์ไลน์โดยใช้ TFDS เป็นอินพุต
หากคุณต้องการสร้างบีมไปป์ไลน์ซึ่งใช้ชุดข้อมูล TFDS เป็นแหล่งที่มา คุณสามารถใช้ tfds.beam.ReadFromTFDS
:
builder = tfds.builder('my_dataset')
_ = (
pipeline
| tfds.beam.ReadFromTFDS(builder, split='train')
| beam.Map(tfds.as_numpy)
| ...
)
โดยจะประมวลผลแต่ละส่วนของชุดข้อมูลแบบขนาน