Một số bộ dữ liệu quá lớn để có thể xử lý trên một máy. tfds
hỗ trợ tạo dữ liệu trên nhiều máy bằng cách sử dụng Apache Beam .
Tài liệu này có hai phần:
- Dành cho người dùng muốn tạo tập dữ liệu Beam hiện có
- Dành cho các nhà phát triển muốn tạo tập dữ liệu Beam mới
Tạo tập dữ liệu Beam
Dưới đây là các ví dụ khác nhau về cách tạo tập dữ liệu Beam, cả trên đám mây hoặc cục bộ.
Trên luồng dữ liệu đám mây của Google
Để chạy quy trình bằng Google Cloud Dataflow và tận dụng tính toán phân tán, trước tiên hãy làm theo hướng dẫn Bắt đầu nhanh .
Sau khi thiết lập môi trường, bạn có thể chạy tfds build
CLI bằng cách sử dụng thư mục dữ liệu trên GCS và chỉ định các tùy chọn bắt buộc cho cờ --beam_pipeline_options
.
Để giúp khởi chạy tập lệnh dễ dàng hơn, bạn nên xác định các biến sau bằng cách sử dụng các giá trị thực tế cho thiết lập GCP/GCS và tập dữ liệu bạn muốn tạo:
DATASET_NAME=<dataset-name>
DATASET_CONFIG=<dataset-config>
GCP_PROJECT=my-project-id
GCS_BUCKET=gs://my-gcs-bucket
Sau đó, bạn sẽ cần tạo một tệp để yêu cầu Dataflow cài đặt tfds
trên các trình chạy:
echo "tensorflow_datasets[$DATASET_NAME]" > /tmp/beam_requirements.txt
Nếu bạn đang sử dụng tfds-nightly
, hãy đảm bảo lặp lại từ tfds-nightly
trong trường hợp tập dữ liệu đã được cập nhật kể từ lần phát hành cuối cùng.
echo "tfds-nightly[$DATASET_NAME]" > /tmp/beam_requirements.txt
Nếu bạn đang sử dụng các phần phụ thuộc bổ sung không có trong thư viện TFDS, hãy làm theo hướng dẫn để quản lý các phần phụ thuộc đường dẫn Python .
Cuối cùng, bạn có thể khởi chạy công việc bằng lệnh bên dưới:
tfds build $DATASET_NAME/$DATASET_CONFIG \
--data_dir=$GCS_BUCKET/tensorflow_datasets \
--beam_pipeline_options=\
"runner=DataflowRunner,project=$GCP_PROJECT,job_name=$DATASET_NAME-gen,"\
"staging_location=$GCS_BUCKET/binaries,temp_location=$GCS_BUCKET/temp,"\
"requirements_file=/tmp/beam_requirements.txt"
Tại địa phương
Để chạy cục bộ tập lệnh của bạn bằng trình chạy Apache Beam mặc định (nó phải vừa với tất cả dữ liệu trong bộ nhớ), lệnh này cũng giống như đối với các bộ dữ liệu khác:
tfds build my_dataset
Với Apache Flink
Để chạy quy trình bằng Apache Flink, bạn có thể đọc tài liệu chính thức . Đảm bảo Beam của bạn tuân thủ Khả năng tương thích phiên bản Flink
Để giúp khởi chạy tập lệnh dễ dàng hơn, bạn nên xác định các biến sau bằng cách sử dụng các giá trị thực tế cho thiết lập Flink và tập dữ liệu bạn muốn tạo:
DATASET_NAME=<dataset-name>
DATASET_CONFIG=<dataset-config>
FLINK_CONFIG_DIR=<flink-config-directory>
FLINK_VERSION=<flink-version>
Để chạy trên cụm Flink được nhúng, bạn có thể khởi chạy công việc bằng lệnh bên dưới:
tfds build $DATASET_NAME/$DATASET_CONFIG \
--beam_pipeline_options=\
"runner=FlinkRunner,flink_version=$FLINK_VERSION,flink_conf_dir=$FLINK_CONFIG_DIR"
Với một kịch bản tùy chỉnh
Để tạo tập dữ liệu trên Beam, API cũng giống như các tập dữ liệu khác. Bạn có thể tùy chỉnh beam.Pipeline
bằng cách sử dụng các đối số beam_options
(và beam_runner
) của DownloadConfig
.
# If you are running on Dataflow, Spark,..., you may have to set-up runtime
# flags. Otherwise, you can leave flags empty [].
flags = ['--runner=DataflowRunner', '--project=<project-name>', ...]
# `beam_options` (and `beam_runner`) will be forwarded to `beam.Pipeline`
dl_config = tfds.download.DownloadConfig(
beam_options=beam.options.pipeline_options.PipelineOptions(flags=flags)
)
data_dir = 'gs://my-gcs-bucket/tensorflow_datasets'
builder = tfds.builder('wikipedia/20190301.en', data_dir=data_dir)
builder.download_and_prepare(download_config=dl_config)
Triển khai bộ dữ liệu Beam
Điều kiện tiên quyết
Để viết bộ dữ liệu Apache Beam, bạn nên làm quen với các khái niệm sau:
- Làm quen với hướng dẫn tạo tập dữ liệu
tfds
vì hầu hết nội dung vẫn áp dụng cho tập dữ liệu Beam. - Nhận phần giới thiệu về Apache Beam với hướng dẫn lập trình Beam .
- Nếu bạn muốn tạo tập dữ liệu của mình bằng Cloud Dataflow, hãy đọc Tài liệu Google Cloud và hướng dẫn phụ thuộc Apache Beam .
Hướng dẫn
Nếu bạn đã quen với hướng dẫn tạo tập dữ liệu thì việc thêm tập dữ liệu Beam chỉ yêu cầu sửa đổi hàm _generate_examples
. Hàm sẽ trả về một đối tượng chùm, thay vì một trình tạo:
Tập dữ liệu không có chùm tia:
def _generate_examples(self, path):
for f in path.iterdir():
yield _process_example(f)
Tập dữ liệu chùm:
def _generate_examples(self, path):
return (
beam.Create(path.iterdir())
| beam.Map(_process_example)
)
Tất cả phần còn lại có thể giống nhau 100%, bao gồm cả các bài kiểm tra.
Một số cân nhắc bổ sung:
- Sử dụng
tfds.core.lazy_imports
để nhập Apache Beam. Bằng cách sử dụng phần phụ thuộc lười biếng, người dùng vẫn có thể đọc tập dữ liệu sau khi được tạo mà không cần phải cài đặt Beam. - Hãy cẩn thận với việc đóng Python. Khi chạy quy trình, các hàm
beam.Map
vàbeam.DoFn
được tuần tự hóa bằng cách sử dụngpickle
và gửi đến tất cả các công nhân. Không sử dụng các đối tượng có thể thay đổi bên trong mộtbeam.PTransform
nếu trạng thái phải được chia sẻ giữa các công nhân. - Do cách
tfds.core.DatasetBuilder
được tuần tự hóa bằng dưa chua, nên việc thay đổitfds.core.DatasetBuilder
trong quá trình tạo dữ liệu sẽ bị bỏ qua đối với các trình chạy (ví dụ: không thể đặtself.info.metadata['offset'] = 123
trong_split_generators
và truy cập nó từ các công nhân nhưbeam.Map(lambda x: x + self.info.metadata['offset'])
) - Nếu cần chia sẻ một số bước quy trình giữa các phần tách, bạn có thể thêm một
pipeline: beam.Pipeline
kwarg vào_split_generator
và kiểm soát quy trình tạo toàn bộ. Xem tài liệu_generate_examples
củatfds.core.GeneratorBasedBuilder
.
Ví dụ
Đây là một ví dụ về tập dữ liệu Beam.
class DummyBeamDataset(tfds.core.GeneratorBasedBuilder):
VERSION = tfds.core.Version('1.0.0')
def _info(self):
return self.dataset_info_from_configs(
features=tfds.features.FeaturesDict({
'image': tfds.features.Image(shape=(16, 16, 1)),
'label': tfds.features.ClassLabel(names=['dog', 'cat']),
}),
)
def _split_generators(self, dl_manager):
...
return {
'train': self._generate_examples(file_dir='path/to/train_data/'),
'test': self._generate_examples(file_dir='path/to/test_data/'),
}
def _generate_examples(self, file_dir: str):
"""Generate examples as dicts."""
beam = tfds.core.lazy_imports.apache_beam
def _process_example(filename):
# Use filename as key
return filename, {
'image': os.path.join(file_dir, filename),
'label': filename.split('.')[1], # Extract label: "0010102.dog.jpeg"
}
return (
beam.Create(tf.io.gfile.listdir(file_dir))
| beam.Map(_process_example)
)
Chạy đường ống của bạn
Để chạy quy trình, hãy xem phần trên.
tfds build my_dataset --register_checksums
Đường ống sử dụng TFDS làm đầu vào
Nếu bạn muốn tạo một đường dẫn chùm lấy tập dữ liệu TFDS làm nguồn, bạn có thể sử dụng tfds.beam.ReadFromTFDS
:
builder = tfds.builder('my_dataset')
_ = (
pipeline
| tfds.beam.ReadFromTFDS(builder, split='train')
| beam.Map(tfds.as_numpy)
| ...
)
Nó sẽ xử lý song song từng phân đoạn của tập dữ liệu.