Apache Beam দিয়ে বড় ডেটাসেট তৈরি করা হচ্ছে

কিছু ডেটাসেট একটি একক মেশিনে প্রক্রিয়া করার জন্য খুব বড়। tfds Apache Beam ব্যবহার করে অনেক মেশিনে ডেটা তৈরি করতে সমর্থন করে।

এই ডকটিতে দুটি বিভাগ রয়েছে:

  • যে ব্যবহারকারীরা একটি বিদ্যমান বীম ডেটাসেট তৈরি করতে চান তাদের জন্য
  • বিকাশকারীদের জন্য যারা একটি নতুন বীম ডেটাসেট তৈরি করতে চান৷

একটি বিম ডেটাসেট তৈরি করা হচ্ছে

নীচে ক্লাউডে বা স্থানীয়ভাবে, একটি বীম ডেটাসেট তৈরি করার বিভিন্ন উদাহরণ রয়েছে৷

Google ক্লাউড ডেটাফ্লোতে

Google ক্লাউড ডেটাফ্লো ব্যবহার করে পাইপলাইন চালানোর জন্য এবং বিতরণ করা গণনার সুবিধা নিতে, প্রথমে Quickstart নির্দেশাবলী অনুসরণ করুন।

একবার আপনার পরিবেশ সেট আপ হয়ে গেলে, আপনি GCS- এ একটি ডেটা ডিরেক্টরি ব্যবহার করে এবং --beam_pipeline_options পতাকার জন্য প্রয়োজনীয় বিকল্পগুলি নির্দিষ্ট করে tfds build CLI চালাতে পারেন।

স্ক্রিপ্ট চালু করা সহজ করতে, আপনার GCP/GCS সেটআপের প্রকৃত মান এবং আপনি যে ডেটাসেট তৈরি করতে চান তা ব্যবহার করে নিম্নলিখিত ভেরিয়েবলগুলিকে সংজ্ঞায়িত করা সহায়ক:

DATASET_NAME=<dataset-name>
DATASET_CONFIG=<dataset-config>
GCP_PROJECT=my-project-id
GCS_BUCKET=gs://my-gcs-bucket

তারপরে আপনাকে ডেটাফ্লোকে কর্মীদের উপর tfds ইনস্টল করতে বলার জন্য একটি ফাইল তৈরি করতে হবে:

echo "tensorflow_datasets[$DATASET_NAME]" > /tmp/beam_requirements.txt

আপনি যদি tfds-nightly ব্যবহার করেন, তাহলে নিশ্চিত করুন যে tfds-nightly থেকে প্রতিধ্বনি করা হয়েছে যদি শেষ প্রকাশের পর থেকে ডেটাসেট আপডেট করা হয়েছে।

echo "tfds-nightly[$DATASET_NAME]" > /tmp/beam_requirements.txt

আপনি যদি TFDS লাইব্রেরিতে অন্তর্ভুক্ত নয় এমন অতিরিক্ত নির্ভরতা ব্যবহার করেন তাহলে পাইথন পাইপলাইন নির্ভরতা পরিচালনার জন্য নির্দেশাবলী অনুসরণ করুন।

অবশেষে, আপনি নীচের কমান্ডটি ব্যবহার করে কাজটি চালু করতে পারেন:

tfds build $DATASET_NAME/$DATASET_CONFIG \
  --data_dir=$GCS_BUCKET/tensorflow_datasets \
  --beam_pipeline_options=\
"runner=DataflowRunner,project=$GCP_PROJECT,job_name=$DATASET_NAME-gen,"\
"staging_location=$GCS_BUCKET/binaries,temp_location=$GCS_BUCKET/temp,"\
"requirements_file=/tmp/beam_requirements.txt"

স্থানীয়ভাবে

ডিফল্ট Apache Beam রানার ব্যবহার করে স্থানীয়ভাবে আপনার স্ক্রিপ্ট চালানোর জন্য (এটি মেমরিতে সমস্ত ডেটা ফিট করতে হবে), কমান্ডটি অন্যান্য ডেটাসেটের মতোই:

tfds build my_dataset

Apache Flink ব্যবহার করে পাইপলাইন চালানোর জন্য আপনি অফিসিয়াল ডকুমেন্টেশন পড়তে পারেন। নিশ্চিত করুন যে আপনার বীম Flink সংস্করণ সামঞ্জস্যের সাথে সঙ্গতিপূর্ণ

স্ক্রিপ্টটি চালু করা সহজ করার জন্য, আপনার ফ্লিঙ্ক সেটআপের প্রকৃত মান এবং আপনি যে ডেটাসেট তৈরি করতে চান তা ব্যবহার করে নিম্নলিখিত ভেরিয়েবলগুলিকে সংজ্ঞায়িত করা সহায়ক:

DATASET_NAME=<dataset-name>
DATASET_CONFIG=<dataset-config>
FLINK_CONFIG_DIR=<flink-config-directory>
FLINK_VERSION=<flink-version>

একটি এমবেডেড ফ্লিঙ্ক ক্লাস্টারে চালানোর জন্য, আপনি নীচের কমান্ডটি ব্যবহার করে কাজটি চালু করতে পারেন:

tfds build $DATASET_NAME/$DATASET_CONFIG \
  --beam_pipeline_options=\
"runner=FlinkRunner,flink_version=$FLINK_VERSION,flink_conf_dir=$FLINK_CONFIG_DIR"

একটি কাস্টম স্ক্রিপ্ট সঙ্গে

Beam-এ ডেটাসেট তৈরি করতে, API অন্যান্য ডেটাসেটের মতোই। আপনি DownloadConfig এর beam_options (এবং beam_runner ) আর্গুমেন্ট ব্যবহার করে beam.Pipeline কাস্টমাইজ করতে পারেন।

# If you are running on Dataflow, Spark,..., you may have to set-up runtime
# flags. Otherwise, you can leave flags empty [].
flags = ['--runner=DataflowRunner', '--project=<project-name>', ...]

# `beam_options` (and `beam_runner`) will be forwarded to `beam.Pipeline`
dl_config = tfds.download.DownloadConfig(
    beam_options=beam.options.pipeline_options.PipelineOptions(flags=flags)
)
data_dir = 'gs://my-gcs-bucket/tensorflow_datasets'
builder = tfds.builder('wikipedia/20190301.en', data_dir=data_dir)
builder.download_and_prepare(download_config=dl_config)

একটি রশ্মি ডেটাসেট বাস্তবায়ন

পূর্বশর্ত

Apache Beam ডেটাসেট লেখার জন্য, আপনাকে নিম্নলিখিত ধারণাগুলির সাথে পরিচিত হতে হবে:

নির্দেশনা

আপনি যদি ডেটাসেট তৈরির নির্দেশিকাটির সাথে পরিচিত হন, তবে একটি বিম ডেটাসেট যোগ করার জন্য শুধুমাত্র _generate_examples ফাংশন পরিবর্তন করতে হবে। ফাংশনটি একটি জেনারেটরের পরিবর্তে একটি মরীচি বস্তু প্রদান করা উচিত:

নন-বিম ডেটাসেট:

def _generate_examples(self, path):
  for f in path.iterdir():
    yield _process_example(f)

রশ্মি ডেটাসেট:

def _generate_examples(self, path):
  return (
      beam.Create(path.iterdir())
      | beam.Map(_process_example)
  )

বাকি সব পরীক্ষা সহ 100% অভিন্ন হতে পারে।

কিছু অতিরিক্ত বিবেচনা:

  • Apache Beam আমদানি করতে tfds.core.lazy_imports ব্যবহার করুন। একটি অলস নির্ভরতা ব্যবহার করে, ব্যবহারকারীরা বীম ইনস্টল না করেই ডেটাসেট তৈরি হওয়ার পরেও পড়তে পারেন।
  • পাইথন বন্ধের সাথে সতর্ক থাকুন। পাইপলাইন চালানোর সময়, beam.Map এবং beam.DoFn ফাংশনগুলি pickle ব্যবহার করে সিরিয়ালাইজ করা হয় এবং সমস্ত কর্মীদের কাছে পাঠানো হয়। একটি beam.PTransform ভিতরে পরিবর্তিত বস্তু ব্যবহার করবেন না। যদি রাষ্ট্রকে শ্রমিকদের মধ্যে ভাগ করতে হয় তাহলে রূপান্তর করুন।
  • যেভাবে tfds.core.DatasetBuilder আচারের সাথে সিরিয়ালাইজ করা হয়েছে, তথ্য তৈরির সময় tfds.core.DatasetBuilder পরিবর্তন করা হয়েছে তা কর্মীদের উপেক্ষা করা হবে (যেমন, _split_generatorsself.info.metadata['offset'] = 123 সেট করা সম্ভব নয় এবং beam.Map(lambda x: x + self.info.metadata['offset']) এর মতো কর্মীদের কাছ থেকে এটি অ্যাক্সেস করুন))
  • আপনি যদি বিভাজনের মধ্যে কিছু পাইপলাইন ধাপ শেয়ার করতে চান, তাহলে আপনি একটি অতিরিক্ত pipeline: beam.Pipeline kwarg to _split_generator এবং পূর্ণ প্রজন্মের পাইপলাইন নিয়ন্ত্রণ করুন। tfds.core.GeneratorBasedBuilder এর _generate_examples ডকুমেন্টেশন দেখুন।

উদাহরণ

এখানে একটি Beam ডেটাসেটের একটি উদাহরণ।

class DummyBeamDataset(tfds.core.GeneratorBasedBuilder):

  VERSION = tfds.core.Version('1.0.0')

  def _info(self):
    return self.dataset_info_from_configs(
        features=tfds.features.FeaturesDict({
            'image': tfds.features.Image(shape=(16, 16, 1)),
            'label': tfds.features.ClassLabel(names=['dog', 'cat']),
        }),
    )

  def _split_generators(self, dl_manager):
    ...
    return {
        'train': self._generate_examples(file_dir='path/to/train_data/'),
        'test': self._generate_examples(file_dir='path/to/test_data/'),
    }

  def _generate_examples(self, file_dir: str):
    """Generate examples as dicts."""
    beam = tfds.core.lazy_imports.apache_beam

    def _process_example(filename):
      # Use filename as key
      return filename, {
          'image': os.path.join(file_dir, filename),
          'label': filename.split('.')[1],  # Extract label: "0010102.dog.jpeg"
      }

    return (
        beam.Create(tf.io.gfile.listdir(file_dir))
        | beam.Map(_process_example)
    )

আপনার পাইপলাইন চলমান

পাইপলাইন চালানোর জন্য, উপরের বিভাগটি দেখুন।

tfds build my_dataset --register_checksums

ইনপুট হিসাবে TFDS ব্যবহার করে পাইপলাইন

আপনি যদি একটি বীম পাইপলাইন তৈরি করতে চান যা উৎস হিসাবে একটি TFDS ডেটাসেট নেয়, আপনি tfds.beam.ReadFromTFDS ব্যবহার করতে পারেন:

builder = tfds.builder('my_dataset')

_ = (
    pipeline
    | tfds.beam.ReadFromTFDS(builder, split='train')
    | beam.Map(tfds.as_numpy)
    | ...
)

এটি সমান্তরালভাবে ডেটাসেটের প্রতিটি শার্ড প্রক্রিয়া করবে।