Beberapa kumpulan data terlalu besar untuk diproses pada satu mesin. tfds
mendukung pembuatan data di banyak mesin dengan menggunakan Apache Beam .
Dokumen ini memiliki dua bagian:
- Untuk pengguna yang ingin membuat dataset Beam yang sudah ada
- Bagi developer yang ingin membuat dataset Beam baru
Menghasilkan kumpulan data Beam
Di bawah ini adalah contoh berbeda dalam menghasilkan kumpulan data Beam, baik di cloud maupun secara lokal.
Di Aliran Data Google Cloud
Untuk menjalankan pipeline menggunakan Google Cloud Dataflow dan memanfaatkan komputasi terdistribusi, ikuti petunjuk Mulai Cepat terlebih dahulu.
Setelah lingkungan Anda disiapkan, Anda dapat menjalankan CLI tfds build
menggunakan direktori data di GCS dan menentukan opsi yang diperlukan untuk flag --beam_pipeline_options
.
Untuk mempermudah peluncuran skrip, sebaiknya tentukan variabel berikut menggunakan nilai sebenarnya untuk penyiapan GCP/GCS dan set data yang ingin Anda buat:
DATASET_NAME=<dataset-name>
DATASET_CONFIG=<dataset-config>
GCP_PROJECT=my-project-id
GCS_BUCKET=gs://my-gcs-bucket
Anda kemudian perlu membuat file untuk memberi tahu Dataflow agar menginstal tfds
pada pekerja:
echo "tensorflow_datasets[$DATASET_NAME]" > /tmp/beam_requirements.txt
Jika Anda menggunakan tfds-nightly
, pastikan untuk melakukan echo dari tfds-nightly
jika kumpulan data telah diperbarui sejak rilis terakhir.
echo "tfds-nightly[$DATASET_NAME]" > /tmp/beam_requirements.txt
Jika Anda menggunakan dependensi tambahan yang tidak disertakan dalam pustaka TFDS, ikuti petunjuk untuk mengelola dependensi pipa Python .
Terakhir, Anda dapat meluncurkan pekerjaan menggunakan perintah di bawah ini:
tfds build $DATASET_NAME/$DATASET_CONFIG \
--data_dir=$GCS_BUCKET/tensorflow_datasets \
--beam_pipeline_options=\
"runner=DataflowRunner,project=$GCP_PROJECT,job_name=$DATASET_NAME-gen,"\
"staging_location=$GCS_BUCKET/binaries,temp_location=$GCS_BUCKET/temp,"\
"requirements_file=/tmp/beam_requirements.txt"
Secara lokal
Untuk menjalankan skrip Anda secara lokal menggunakan runner Apache Beam default (harus memuat semua data di memori), perintahnya sama seperti untuk kumpulan data lainnya:
tfds build my_dataset
Dengan Apache Flink
Untuk menjalankan pipeline menggunakan Apache Flink Anda dapat membaca dokumentasi resminya . Pastikan Beam Anda mematuhi Kompatibilitas Versi Flink
Untuk mempermudah peluncuran skrip, sebaiknya tentukan variabel berikut menggunakan nilai sebenarnya untuk penyiapan Flink dan himpunan data yang ingin Anda hasilkan:
DATASET_NAME=<dataset-name>
DATASET_CONFIG=<dataset-config>
FLINK_CONFIG_DIR=<flink-config-directory>
FLINK_VERSION=<flink-version>
Untuk berjalan di klaster Flink tertanam, Anda dapat meluncurkan pekerjaan menggunakan perintah di bawah ini:
tfds build $DATASET_NAME/$DATASET_CONFIG \
--beam_pipeline_options=\
"runner=FlinkRunner,flink_version=$FLINK_VERSION,flink_conf_dir=$FLINK_CONFIG_DIR"
Dengan skrip khusus
Untuk menghasilkan dataset di Beam, API-nya sama dengan dataset lainnya. Anda dapat menyesuaikan beam.Pipeline
menggunakan argumen beam_options
(dan beam_runner
) dari DownloadConfig
.
# If you are running on Dataflow, Spark,..., you may have to set-up runtime
# flags. Otherwise, you can leave flags empty [].
flags = ['--runner=DataflowRunner', '--project=<project-name>', ...]
# `beam_options` (and `beam_runner`) will be forwarded to `beam.Pipeline`
dl_config = tfds.download.DownloadConfig(
beam_options=beam.options.pipeline_options.PipelineOptions(flags=flags)
)
data_dir = 'gs://my-gcs-bucket/tensorflow_datasets'
builder = tfds.builder('wikipedia/20190301.en', data_dir=data_dir)
builder.download_and_prepare(download_config=dl_config)
Menerapkan kumpulan data Beam
Prasyarat
Untuk menulis kumpulan data Apache Beam, Anda harus memahami konsep berikut:
- Pahami panduan pembuatan kumpulan data
tfds
karena sebagian besar konten masih berlaku untuk kumpulan data Beam. - Dapatkan pengenalan Apache Beam dengan panduan pemrograman Beam .
- Jika Anda ingin membuat set data menggunakan Cloud Dataflow, baca Dokumentasi Google Cloud dan panduan ketergantungan Apache Beam .
instruksi
Jika Anda sudah familiar dengan panduan pembuatan dataset , menambahkan dataset Beam hanya perlu mengubah fungsi _generate_examples
. Fungsi tersebut harus mengembalikan objek beam, bukan generator:
Kumpulan data non-balok:
def _generate_examples(self, path):
for f in path.iterdir():
yield _process_example(f)
Kumpulan data balok:
def _generate_examples(self, path):
return (
beam.Create(path.iterdir())
| beam.Map(_process_example)
)
Semua yang lain bisa 100% identik, termasuk tes.
Beberapa pertimbangan tambahan:
- Gunakan
tfds.core.lazy_imports
untuk mengimpor Apache Beam. Dengan menggunakan ketergantungan malas, pengguna tetap dapat membaca dataset setelah dibuat tanpa harus menginstal Beam. - Hati-hati dengan penutupan Python. Saat menjalankan pipeline, fungsi
beam.Map
danbeam.DoFn
diserialkan menggunakanpickle
dan dikirim ke semua pekerja. Jangan gunakan objek yang bisa berubah di dalambeam.PTransform
jika statusnya harus dibagikan ke seluruh pekerja. - Karena cara
tfds.core.DatasetBuilder
diserialkan dengan acar, mutasitfds.core.DatasetBuilder
selama pembuatan data akan diabaikan pada pekerja (misalnya, tidak mungkin menyetelself.info.metadata['offset'] = 123
di_split_generators
dan mengaksesnya dari pekerja sepertibeam.Map(lambda x: x + self.info.metadata['offset'])
) - Jika Anda perlu berbagi beberapa langkah pipeline di antara pemisahan, Anda dapat menambahkan
pipeline: beam.Pipeline
kwarg ke_split_generator
dan mengontrol pipeline generasi penuh. Lihat dokumentasi_generate_examples
daritfds.core.GeneratorBasedBuilder
.
Contoh
Berikut adalah contoh kumpulan data Beam.
class DummyBeamDataset(tfds.core.GeneratorBasedBuilder):
VERSION = tfds.core.Version('1.0.0')
def _info(self):
return self.dataset_info_from_configs(
features=tfds.features.FeaturesDict({
'image': tfds.features.Image(shape=(16, 16, 1)),
'label': tfds.features.ClassLabel(names=['dog', 'cat']),
}),
)
def _split_generators(self, dl_manager):
...
return {
'train': self._generate_examples(file_dir='path/to/train_data/'),
'test': self._generate_examples(file_dir='path/to/test_data/'),
}
def _generate_examples(self, file_dir: str):
"""Generate examples as dicts."""
beam = tfds.core.lazy_imports.apache_beam
def _process_example(filename):
# Use filename as key
return filename, {
'image': os.path.join(file_dir, filename),
'label': filename.split('.')[1], # Extract label: "0010102.dog.jpeg"
}
return (
beam.Create(tf.io.gfile.listdir(file_dir))
| beam.Map(_process_example)
)
Menjalankan saluran pipa Anda
Untuk menjalankan pipeline, lihat bagian di atas.
tfds build my_dataset --register_checksums
Pipeline menggunakan TFDS sebagai input
Jika Anda ingin membuat beam pipeline yang menggunakan kumpulan data TFDS sebagai sumber, Anda dapat menggunakan tfds.beam.ReadFromTFDS
:
builder = tfds.builder('my_dataset')
_ = (
pipeline
| tfds.beam.ReadFromTFDS(builder, split='train')
| beam.Map(tfds.as_numpy)
| ...
)
Ini akan memproses setiap pecahan kumpulan data secara paralel.