بعض مجموعات البيانات كبيرة جدًا بحيث لا يمكن معالجتها على جهاز واحد. يدعم tfds إنشاء البيانات عبر العديد من الأجهزة باستخدام Apache Beam .
يحتوي هذا المستند على قسمين:
- للمستخدم الذي يريد إنشاء مجموعة بيانات Beam موجودة
- للمطورين الذين يرغبون في إنشاء مجموعة بيانات Beam جديدة
إنشاء مجموعة بيانات الشعاع
فيما يلي أمثلة مختلفة لإنشاء مجموعة بيانات Beam، سواء على السحابة أو محليًا.
على Google Cloud Dataflow
لتشغيل المسار باستخدام Google Cloud Dataflow والاستفادة من الحساب الموزع، اتبع أولاً تعليمات Quickstart .
بمجرد إعداد بيئتك، يمكنك تشغيل واجهة سطر الأوامر الخاصة tfds build باستخدام دليل البيانات في GCS وتحديد الخيارات المطلوبة لعلامة --beam_pipeline_options .
لتسهيل تشغيل البرنامج النصي، من المفيد تحديد المتغيرات التالية باستخدام القيم الفعلية لإعداد GCP/GCS ومجموعة البيانات التي تريد إنشاءها:
DATASET_NAME=<dataset-name>
DATASET_CONFIG=<dataset-config>
GCP_PROJECT=my-project-id
GCS_BUCKET=gs://my-gcs-bucket
ستحتاج بعد ذلك إلى إنشاء ملف لإخبار Dataflow بتثبيت tfds على العمال:
echo "tensorflow_datasets[$DATASET_NAME]" > /tmp/beam_requirements.txt
إذا كنت تستخدم tfds-nightly ، فتأكد من تكرار صدى tfds-nightly في حالة تحديث مجموعة البيانات منذ الإصدار الأخير.
echo "tfds-nightly[$DATASET_NAME]" > /tmp/beam_requirements.txt
إذا كنت تستخدم تبعيات إضافية غير مضمنة في مكتبة TFDS، فاتبع الإرشادات الخاصة بإدارة تبعيات خطوط أنابيب Python .
وأخيرًا، يمكنك بدء المهمة باستخدام الأمر أدناه:
tfds build $DATASET_NAME/$DATASET_CONFIG \
--data_dir=$GCS_BUCKET/tensorflow_datasets \
--beam_pipeline_options=\
"runner=DataflowRunner,project=$GCP_PROJECT,job_name=$DATASET_NAME-gen,"\
"staging_location=$GCS_BUCKET/binaries,temp_location=$GCS_BUCKET/temp,"\
"requirements_file=/tmp/beam_requirements.txt"
محليا
لتشغيل البرنامج النصي محليًا باستخدام مشغل Apache Beam الافتراضي (يجب أن يلائم جميع البيانات الموجودة في الذاكرة)، يكون الأمر هو نفسه المستخدم في مجموعات البيانات الأخرى:
tfds build my_dataset
مع أباتشي فلينك
لتشغيل خط الأنابيب باستخدام Apache Flink، يمكنك قراءة الوثائق الرسمية . تأكد من أن Beam الخاص بك متوافق مع توافق إصدار Flink
لتسهيل تشغيل البرنامج النصي، من المفيد تحديد المتغيرات التالية باستخدام القيم الفعلية لإعداد Flink ومجموعة البيانات التي تريد إنشاءها:
DATASET_NAME=<dataset-name>
DATASET_CONFIG=<dataset-config>
FLINK_CONFIG_DIR=<flink-config-directory>
FLINK_VERSION=<flink-version>
للتشغيل على مجموعة Flink المضمنة، يمكنك تشغيل المهمة باستخدام الأمر أدناه:
tfds build $DATASET_NAME/$DATASET_CONFIG \
--beam_pipeline_options=\
"runner=FlinkRunner,flink_version=$FLINK_VERSION,flink_conf_dir=$FLINK_CONFIG_DIR"
مع برنامج نصي مخصص
لإنشاء مجموعة البيانات على Beam، تكون واجهة برمجة التطبيقات هي نفسها المستخدمة في مجموعات البيانات الأخرى. يمكنك تخصيص beam.Pipeline باستخدام وسيطات beam_options (و beam_runner ) الخاصة بـ DownloadConfig .
# If you are running on Dataflow, Spark,..., you may have to set-up runtime
# flags. Otherwise, you can leave flags empty [].
flags = ['--runner=DataflowRunner', '--project=<project-name>', ...]
# `beam_options` (and `beam_runner`) will be forwarded to `beam.Pipeline`
dl_config = tfds.download.DownloadConfig(
beam_options=beam.options.pipeline_options.PipelineOptions(flags=flags)
)
data_dir = 'gs://my-gcs-bucket/tensorflow_datasets'
builder = tfds.builder('wikipedia/20190301.en', data_dir=data_dir)
builder.download_and_prepare(download_config=dl_config)
تنفيذ مجموعة بيانات الشعاع
المتطلبات الأساسية
لكتابة مجموعات بيانات Apache Beam، يجب أن تكون على دراية بالمفاهيم التالية:
- كن على دراية بدليل إنشاء مجموعة بيانات
tfdsحيث أن معظم المحتوى لا يزال ينطبق على مجموعات بيانات Beam. - احصل على مقدمة عن Apache Beam من خلال دليل برمجة Beam .
- إذا كنت تريد إنشاء مجموعة البيانات الخاصة بك باستخدام Cloud Dataflow، فاقرأ وثائق Google Cloud ودليل تبعية Apache Beam .
تعليمات
إذا كنت على دراية بدليل إنشاء مجموعة البيانات ، فإن إضافة مجموعة بيانات Beam تتطلب فقط تعديل وظيفة _generate_examples . يجب أن تقوم الدالة بإرجاع كائن شعاع، بدلاً من مولد:
مجموعة البيانات غير الشعاعية:
def _generate_examples(self, path):
for f in path.iterdir():
yield _process_example(f)
مجموعة بيانات الشعاع:
def _generate_examples(self, path):
return (
beam.Create(path.iterdir())
| beam.Map(_process_example)
)
كل الباقي يمكن أن يكون متطابقًا بنسبة 100%، بما في ذلك الاختبارات.
بعض الاعتبارات الإضافية:
- استخدم
tfds.core.lazy_importsلاستيراد Apache Beam. باستخدام التبعية البطيئة، لا يزال بإمكان المستخدمين قراءة مجموعة البيانات بعد إنشائها دون الحاجة إلى تثبيت Beam. - كن حذرًا مع عمليات إغلاق بايثون. عند تشغيل خط الأنابيب، يتم إجراء تسلسل لوظائف
beam.Mapوbeam.DoFnباستخدامpickleوإرسالها إلى جميع العمال. لا تستخدم كائنات قابلة للتغيير داخلbeam.PTransformإذا كان يجب مشاركة الحالة بين العمال. - نظرًا للطريقة التي يتم بها تسلسل
tfds.core.DatasetBuilderباستخدام المخلل، سيتم تجاهل تغييرtfds.core.DatasetBuilderأثناء إنشاء البيانات على العاملين (على سبيل المثال، ليس من الممكن تعيينself.info.metadata['offset'] = 123في_split_generatorsوالوصول إليها من العمال مثلbeam.Map(lambda x: x + self.info.metadata['offset'])) - إذا كنت بحاجة إلى مشاركة بعض خطوات خط الأنابيب بين الانقسامات، فيمكنك إضافة إضافة
pipeline: beam.Pipelinekwarg to_split_generatorوالتحكم في خط أنابيب الجيل الكامل. راجع وثائق_generate_examplesالخاصة بـtfds.core.GeneratorBasedBuilder.
مثال
فيما يلي مثال لمجموعة بيانات Beam.
class DummyBeamDataset(tfds.core.GeneratorBasedBuilder):
VERSION = tfds.core.Version('1.0.0')
def _info(self):
return self.dataset_info_from_configs(
features=tfds.features.FeaturesDict({
'image': tfds.features.Image(shape=(16, 16, 1)),
'label': tfds.features.ClassLabel(names=['dog', 'cat']),
}),
)
def _split_generators(self, dl_manager):
...
return {
'train': self._generate_examples(file_dir='path/to/train_data/'),
'test': self._generate_examples(file_dir='path/to/test_data/'),
}
def _generate_examples(self, file_dir: str):
"""Generate examples as dicts."""
beam = tfds.core.lazy_imports.apache_beam
def _process_example(filename):
# Use filename as key
return filename, {
'image': os.path.join(file_dir, filename),
'label': filename.split('.')[1], # Extract label: "0010102.dog.jpeg"
}
return (
beam.Create(tf.io.gfile.listdir(file_dir))
| beam.Map(_process_example)
)
تشغيل خط الأنابيب الخاص بك
لتشغيل خط الأنابيب، قم بإلقاء نظرة على القسم أعلاه.
tfds build my_dataset --register_checksums
خط أنابيب يستخدم TFDS كمدخل
إذا كنت تريد إنشاء خط أنابيب شعاعي يأخذ مجموعة بيانات TFDS كمصدر، فيمكنك استخدام tfds.beam.ReadFromTFDS :
builder = tfds.builder('my_dataset')
_ = (
pipeline
| tfds.beam.ReadFromTFDS(builder, split='train')
| beam.Map(tfds.as_numpy)
| ...
)
سيتم معالجة كل جزء من مجموعة البيانات بالتوازي.