برخی از مجموعههای داده آنقدر بزرگ هستند که نمیتوانند روی یک ماشین پردازش شوند. tfds
از تولید داده در بسیاری از ماشین ها با استفاده از پرتو آپاچی پشتیبانی می کند.
این سند دارای دو بخش است:
- برای کاربرانی که می خواهند یک مجموعه داده Beam موجود ایجاد کنند
- برای توسعه دهندگانی که می خواهند مجموعه داده Beam جدیدی ایجاد کنند
ایجاد یک مجموعه داده Beam
در زیر نمونههای مختلفی از تولید مجموعه دادههای Beam، چه در فضای ابری و چه به صورت محلی، آورده شده است.
در Google Cloud Dataflow
برای اجرای خط لوله با استفاده از Google Cloud Dataflow و استفاده از محاسبات توزیع شده، ابتدا دستورالعملهای شروع سریع را دنبال کنید.
هنگامی که محیط شما راه اندازی شد، می توانید tfds build
CLI را با استفاده از دایرکتوری داده در GCS و مشخص کردن گزینه های مورد نیاز برای پرچم --beam_pipeline_options
اجرا کنید.
برای آسانتر کردن راهاندازی اسکریپت، تعریف متغیرهای زیر با استفاده از مقادیر واقعی برای تنظیم GCP/GCS و مجموعه دادهای که میخواهید ایجاد کنید مفید است:
DATASET_NAME=<dataset-name>
DATASET_CONFIG=<dataset-config>
GCP_PROJECT=my-project-id
GCS_BUCKET=gs://my-gcs-bucket
سپس باید یک فایل ایجاد کنید تا به Dataflow بگویید tfds
را روی کارگران نصب کند:
echo "tensorflow_datasets[$DATASET_NAME]" > /tmp/beam_requirements.txt
اگر از tfds-nightly
استفاده میکنید، مطمئن شوید که از tfds-nightly
بازتاب داده میشود در صورتی که مجموعه داده از آخرین نسخه بهروزرسانی شده باشد.
echo "tfds-nightly[$DATASET_NAME]" > /tmp/beam_requirements.txt
اگر از وابستگیهای اضافی استفاده میکنید که در کتابخانه TFDS گنجانده نشده است ، دستورالعملهای مدیریت وابستگیهای خط لوله پایتون را دنبال کنید.
در نهایت، می توانید کار را با استفاده از دستور زیر اجرا کنید:
tfds build $DATASET_NAME/$DATASET_CONFIG \
--data_dir=$GCS_BUCKET/tensorflow_datasets \
--beam_pipeline_options=\
"runner=DataflowRunner,project=$GCP_PROJECT,job_name=$DATASET_NAME-gen,"\
"staging_location=$GCS_BUCKET/binaries,temp_location=$GCS_BUCKET/temp,"\
"requirements_file=/tmp/beam_requirements.txt"
به صورت محلی
برای اجرای اسکریپت خود به صورت محلی با استفاده از اجراکننده پیشفرض پرتو Apache (باید همه دادهها را در حافظه جا دهد)، دستور مانند سایر مجموعههای داده است:
tfds build my_dataset
با آپاچی فلینک
برای اجرای خط لوله با استفاده از Apache Flink می توانید اسناد رسمی را بخوانید. مطمئن شوید که Beam شما با سازگاری نسخه Flink سازگار است
برای آسانتر کردن راهاندازی اسکریپت، تعریف متغیرهای زیر با استفاده از مقادیر واقعی تنظیمات Flink و مجموعه دادهای که میخواهید ایجاد کنید مفید است:
DATASET_NAME=<dataset-name>
DATASET_CONFIG=<dataset-config>
FLINK_CONFIG_DIR=<flink-config-directory>
FLINK_VERSION=<flink-version>
برای اجرا بر روی یک خوشه Flink تعبیه شده، می توانید کار را با استفاده از دستور زیر اجرا کنید:
tfds build $DATASET_NAME/$DATASET_CONFIG \
--beam_pipeline_options=\
"runner=FlinkRunner,flink_version=$FLINK_VERSION,flink_conf_dir=$FLINK_CONFIG_DIR"
با اسکریپت سفارشی
برای تولید مجموعه داده در Beam، API مانند سایر مجموعههای داده است. شما می توانید beam.Pipeline
با استفاده از آرگومان های beam_options
(و beam_runner
) در DownloadConfig
سفارشی کنید.
# If you are running on Dataflow, Spark,..., you may have to set-up runtime
# flags. Otherwise, you can leave flags empty [].
flags = ['--runner=DataflowRunner', '--project=<project-name>', ...]
# `beam_options` (and `beam_runner`) will be forwarded to `beam.Pipeline`
dl_config = tfds.download.DownloadConfig(
beam_options=beam.options.pipeline_options.PipelineOptions(flags=flags)
)
data_dir = 'gs://my-gcs-bucket/tensorflow_datasets'
builder = tfds.builder('wikipedia/20190301.en', data_dir=data_dir)
builder.download_and_prepare(download_config=dl_config)
پیاده سازی مجموعه داده Beam
پیش نیازها
برای نوشتن مجموعه داده های Apache Beam باید با مفاهیم زیر آشنا باشید:
- با راهنمای ایجاد مجموعه داده
tfds
آشنا باشید زیرا بیشتر محتوا همچنان برای مجموعه داده های Beam کاربرد دارد. - با راهنمای برنامه نویسی Beam با Apache Beam آشنا شوید.
- اگر میخواهید مجموعه داده خود را با استفاده از Cloud Dataflow ایجاد کنید، Google Cloud Documentation و راهنمای وابستگی Apache Beam را بخوانید.
دستورالعمل ها
اگر با راهنمای ایجاد مجموعه داده آشنا هستید، برای افزودن مجموعه داده Beam فقط باید تابع _generate_examples
تغییر دهید. تابع باید به جای یک مولد، یک شی پرتو را برگرداند:
مجموعه داده غیر پرتو:
def _generate_examples(self, path):
for f in path.iterdir():
yield _process_example(f)
مجموعه داده پرتو:
def _generate_examples(self, path):
return (
beam.Create(path.iterdir())
| beam.Map(_process_example)
)
همه بقیه می توانند 100٪ یکسان باشند، از جمله آزمایشات.
برخی ملاحظات اضافی:
- برای وارد کردن Apache Beam از
tfds.core.lazy_imports
استفاده کنید. با استفاده از وابستگی تنبل، کاربران همچنان می توانند پس از تولید مجموعه داده بدون نیاز به نصب Beam، آن را بخوانند. - مراقب بسته شدن پایتون باشید. هنگام اجرای خط لوله، توابع
beam.Map
وbeam.DoFn
با استفاده ازpickle
سریال می شوند و برای همه کارگران ارسال می شوند. از اشیای قابل تغییر در داخلbeam.PTransform
استفاده نکنید. اگر حالت باید بین کارگران به اشتراک گذاشته شود، تغییر شکل دهید. - با توجه به نحوه سریال سازی
tfds.core.DatasetBuilder
با pickle، جهشtfds.core.DatasetBuilder
در حین ایجاد داده بر روی کارگران نادیده گرفته می شود (به عنوان مثال تنظیمself.info.metadata['offset'] = 123
در_split_generators
ممکن نیست. و از کارگران مانندbeam.Map(lambda x: x + self.info.metadata['offset'])
به آن دسترسی پیدا کنید. - اگر نیاز دارید که برخی از مراحل خط لوله را بین تقسیمها به اشتراک بگذارید، میتوانید یک
pipeline: beam.Pipeline
kwarg به_split_generator
و کنترل خط لوله تولید کامل. به مستندات_generate_examples
tfds.core.GeneratorBasedBuilder
مراجعه کنید.
مثال
در اینجا نمونه ای از مجموعه داده های Beam آورده شده است.
class DummyBeamDataset(tfds.core.GeneratorBasedBuilder):
VERSION = tfds.core.Version('1.0.0')
def _info(self):
return self.dataset_info_from_configs(
features=tfds.features.FeaturesDict({
'image': tfds.features.Image(shape=(16, 16, 1)),
'label': tfds.features.ClassLabel(names=['dog', 'cat']),
}),
)
def _split_generators(self, dl_manager):
...
return {
'train': self._generate_examples(file_dir='path/to/train_data/'),
'test': self._generate_examples(file_dir='path/to/test_data/'),
}
def _generate_examples(self, file_dir: str):
"""Generate examples as dicts."""
beam = tfds.core.lazy_imports.apache_beam
def _process_example(filename):
# Use filename as key
return filename, {
'image': os.path.join(file_dir, filename),
'label': filename.split('.')[1], # Extract label: "0010102.dog.jpeg"
}
return (
beam.Create(tf.io.gfile.listdir(file_dir))
| beam.Map(_process_example)
)
اجرای خط لوله شما
برای اجرای خط لوله، به بخش بالا نگاه کنید.
tfds build my_dataset --register_checksums
خط لوله با استفاده از TFDS به عنوان ورودی
اگر می خواهید خط لوله پرتویی ایجاد کنید که یک مجموعه داده TFDS را به عنوان منبع می گیرد، می توانید از tfds.beam.ReadFromTFDS
استفاده کنید:
builder = tfds.builder('my_dataset')
_ = (
pipeline
| tfds.beam.ReadFromTFDS(builder, split='train')
| beam.Map(tfds.as_numpy)
| ...
)
هر قطعه از مجموعه داده را به صورت موازی پردازش می کند.