خط لوله TFX ساده برای خطوط لوله Vertex

این آموزش مبتنی بر نوت بوک یک خط لوله ساده TFX ایجاد می کند و آن را با استفاده از Google Cloud Vertex Pipelines اجرا می کند. این نوت بوک بر اساس خط لوله TFX است که ما در آموزش ساده TFX Pipeline ساختیم. اگر با TFX آشنایی ندارید و هنوز آن آموزش را نخوانده اید، قبل از ادامه کار با این نوت بوک باید آن را مطالعه کنید.

Google Cloud Vertex Pipelines به شما کمک می‌کند تا سیستم‌های ML خود را با هماهنگ کردن گردش کار ML خود به شیوه‌ای بدون سرور، خودکار، نظارت و اداره کنید. می توانید خطوط لوله ML خود را با استفاده از Python با TFX تعریف کنید و سپس خطوط لوله خود را در Google Cloud اجرا کنید. برای اطلاعات بیشتر در مورد خطوط لوله Vertex به مقدمه Vertex Pipelines مراجعه کنید.

این نوت بوک برای اجرا در گوگل کولب یا نوت بوک های پلتفرم هوش مصنوعی در نظر گرفته شده است. اگر از یکی از این موارد استفاده نمی کنید، می توانید به سادگی روی دکمه "Run in Google Colab" در بالا کلیک کنید.

برپایی

قبل از اجرای این نوت بوک، مطمئن شوید که موارد زیر را دارید:

لطفاً اسناد Vertex را برای پیکربندی بیشتر پروژه GCP خود ببینید.

بسته های پایتون را نصب کنید

ما بسته‌های Python مورد نیاز از جمله TFX و KFP را برای نگارش خطوط لوله ML نصب می‌کنیم و کارها را به Vertex Pipelines ارسال می‌کنیم.

# Use the latest version of pip.
pip install --upgrade pip
pip install --upgrade "tfx[kfp]<2"

آیا زمان اجرا را مجدداً راه اندازی کردید؟

اگر از Google Colab استفاده می‌کنید، اولین باری که سلول بالا را اجرا می‌کنید، باید با کلیک کردن روی دکمه «راه‌اندازی مجدد زمان اجرا» یا با استفاده از منوی «زمان اجرا > زمان اجرا مجدد ...» زمان اجرا را مجدداً راه‌اندازی کنید. این به دلیل روشی است که Colab بسته ها را بارگذاری می کند.

اگر در Colab نیستید، می توانید زمان اجرا را با سلول زیر راه اندازی مجدد کنید.

# docs_infra: no_execute
import sys
if not 'google.colab' in sys.modules:
  # Automatically restart kernel after installs
  import IPython
  app = IPython.Application.instance()
  app.kernel.do_shutdown(True)

برای این نوت بوک وارد گوگل شوید

اگر این نوت بوک را در Colab اجرا می کنید، با حساب کاربری خود احراز هویت کنید:

import sys
if 'google.colab' in sys.modules:
  from google.colab import auth
  auth.authenticate_user()

اگر از نوت‌بوک‌های پلتفرم هوش مصنوعی استفاده می‌کنید، قبل از اجرای بخش بعدی، با استفاده از Google Cloud احراز هویت کنید.

gcloud auth login

در پنجره ترمینال (که می توانید از طریق File > New در منو باز کنید). شما فقط باید این کار را یک بار در هر نوت بوک انجام دهید.

نسخه های بسته را بررسی کنید.

import tensorflow as tf
print('TensorFlow version: {}'.format(tf.__version__))
from tfx import v1 as tfx
print('TFX version: {}'.format(tfx.__version__))
import kfp
print('KFP version: {}'.format(kfp.__version__))
TensorFlow version: 2.7.1
TFX version: 1.6.0
KFP version: 1.8.11

متغیرها را تنظیم کنید

ما برخی از متغیرهای مورد استفاده برای سفارشی کردن خطوط لوله را در زیر تنظیم خواهیم کرد. اطلاعات زیر مورد نیاز است:

قبل از اجرای آن، مقادیر مورد نیاز را در سلول زیر وارد کنید .

GOOGLE_CLOUD_PROJECT = ''     # <--- ENTER THIS
GOOGLE_CLOUD_REGION = ''      # <--- ENTER THIS
GCS_BUCKET_NAME = ''          # <--- ENTER THIS

if not (GOOGLE_CLOUD_PROJECT and GOOGLE_CLOUD_REGION and GCS_BUCKET_NAME):
    from absl import logging
    logging.error('Please set all required parameters.')
ERROR:absl:Please set all required parameters.

gcloud را برای استفاده از پروژه خود تنظیم کنید.

gcloud config set project {GOOGLE_CLOUD_PROJECT}
ERROR: (gcloud.config.set) argument VALUE: Must be specified.
Usage: gcloud config set SECTION/PROPERTY VALUE [optional flags]
  optional flags may be  --help | --installation

For detailed information on this command and its flags, run:
  gcloud config set --help
PIPELINE_NAME = 'penguin-vertex-pipelines'

# Path to various pipeline artifact.
PIPELINE_ROOT = 'gs://{}/pipeline_root/{}'.format(
    GCS_BUCKET_NAME, PIPELINE_NAME)

# Paths for users' Python module.
MODULE_ROOT = 'gs://{}/pipeline_module/{}'.format(
    GCS_BUCKET_NAME, PIPELINE_NAME)

# Paths for input data.
DATA_ROOT = 'gs://{}/data/{}'.format(GCS_BUCKET_NAME, PIPELINE_NAME)

# This is the path where your model will be pushed for serving.
SERVING_MODEL_DIR = 'gs://{}/serving_model/{}'.format(
    GCS_BUCKET_NAME, PIPELINE_NAME)

print('PIPELINE_ROOT: {}'.format(PIPELINE_ROOT))
PIPELINE_ROOT: gs:///pipeline_root/penguin-vertex-pipelines

داده های نمونه را آماده کنید

ما از همان مجموعه داده Palmer Penguins به عنوان Simple TFX Pipeline Tutorial استفاده خواهیم کرد.

چهار ویژگی عددی در این مجموعه داده وجود دارد که قبلاً برای داشتن محدوده [0،1] نرمال شده بودند. ما یک مدل طبقه بندی خواهیم ساخت که species پنگوئن ها را پیش بینی می کند.

ما باید کپی خودمان از مجموعه داده بسازیم. از آنجایی که TFX ExampleGen ورودی ها را از یک دایرکتوری می خواند، ما باید یک دایرکتوری ایجاد کنیم و مجموعه داده را در GCS در آن کپی کنیم.

gsutil cp gs://download.tensorflow.org/data/palmer_penguins/penguins_processed.csv {DATA_ROOT}/
InvalidUrlError: Cloud URL scheme should be followed by colon and two slashes: "://". Found: "gs:///data/penguin-vertex-pipelines/".

نگاهی گذرا به فایل CSV بیندازید.

gsutil cat {DATA_ROOT}/penguins_processed.csv | head
InvalidUrlError: Cloud URL scheme should be followed by colon and two slashes: "://". Found: "gs:///data/penguin-vertex-pipelines/penguins_processed.csv".

یک خط لوله ایجاد کنید

خطوط لوله TFX با استفاده از API های پایتون تعریف می شوند. ما یک خط لوله تعریف می کنیم که از سه جزء CsvExampleGen، Trainer و Pusher تشکیل شده است. تعریف خط لوله و مدل تقریباً مشابه آموزش ساده TFX Pipeline است.

تنها تفاوت این است که ما نیازی به تنظیم metadata_connection_config نداریم که برای مکان یابی پایگاه داده ML Metadata استفاده می شود. از آنجایی که Vertex Pipelines از یک سرویس ابرداده مدیریت شده استفاده می کند، کاربران نیازی به مراقبت از آن ندارند و ما نیازی به تعیین پارامتر نداریم.

قبل از تعریف خط لوله، ابتدا باید یک کد مدل برای کامپوننت Trainer بنویسیم.

کد مدل را بنویسید

ما از همان کد مدل استفاده خواهیم کرد که در آموزش Simple TFX Pipeline .

_trainer_module_file = 'penguin_trainer.py'
%%writefile {_trainer_module_file}

# Copied from https://www.tensorflow.org/tfx/tutorials/tfx/penguin_simple

from typing import List
from absl import logging
import tensorflow as tf
from tensorflow import keras
from tensorflow_transform.tf_metadata import schema_utils


from tfx import v1 as tfx
from tfx_bsl.public import tfxio

from tensorflow_metadata.proto.v0 import schema_pb2

_FEATURE_KEYS = [
    'culmen_length_mm', 'culmen_depth_mm', 'flipper_length_mm', 'body_mass_g'
]
_LABEL_KEY = 'species'

_TRAIN_BATCH_SIZE = 20
_EVAL_BATCH_SIZE = 10

# Since we're not generating or creating a schema, we will instead create
# a feature spec.  Since there are a fairly small number of features this is
# manageable for this dataset.
_FEATURE_SPEC = {
    **{
        feature: tf.io.FixedLenFeature(shape=[1], dtype=tf.float32)
           for feature in _FEATURE_KEYS
       },
    _LABEL_KEY: tf.io.FixedLenFeature(shape=[1], dtype=tf.int64)
}


def _input_fn(file_pattern: List[str],
              data_accessor: tfx.components.DataAccessor,
              schema: schema_pb2.Schema,
              batch_size: int) -> tf.data.Dataset:
  """Generates features and label for training.

  Args:
    file_pattern: List of paths or patterns of input tfrecord files.
    data_accessor: DataAccessor for converting input to RecordBatch.
    schema: schema of the input data.
    batch_size: representing the number of consecutive elements of returned
      dataset to combine in a single batch

  Returns:
    A dataset that contains (features, indices) tuple where features is a
      dictionary of Tensors, and indices is a single Tensor of label indices.
  """
  return data_accessor.tf_dataset_factory(
      file_pattern,
      tfxio.TensorFlowDatasetOptions(
          batch_size=batch_size, label_key=_LABEL_KEY),
      schema=schema).repeat()


def _make_keras_model() -> tf.keras.Model:
  """Creates a DNN Keras model for classifying penguin data.

  Returns:
    A Keras Model.
  """
  # The model below is built with Functional API, please refer to
  # https://www.tensorflow.org/guide/keras/overview for all API options.
  inputs = [keras.layers.Input(shape=(1,), name=f) for f in _FEATURE_KEYS]
  d = keras.layers.concatenate(inputs)
  for _ in range(2):
    d = keras.layers.Dense(8, activation='relu')(d)
  outputs = keras.layers.Dense(3)(d)

  model = keras.Model(inputs=inputs, outputs=outputs)
  model.compile(
      optimizer=keras.optimizers.Adam(1e-2),
      loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
      metrics=[keras.metrics.SparseCategoricalAccuracy()])

  model.summary(print_fn=logging.info)
  return model


# TFX Trainer will call this function.
def run_fn(fn_args: tfx.components.FnArgs):
  """Train the model based on given args.

  Args:
    fn_args: Holds args used to train the model as name/value pairs.
  """

  # This schema is usually either an output of SchemaGen or a manually-curated
  # version provided by pipeline author. A schema can also derived from TFT
  # graph if a Transform component is used. In the case when either is missing,
  # `schema_from_feature_spec` could be used to generate schema from very simple
  # feature_spec, but the schema returned would be very primitive.
  schema = schema_utils.schema_from_feature_spec(_FEATURE_SPEC)

  train_dataset = _input_fn(
      fn_args.train_files,
      fn_args.data_accessor,
      schema,
      batch_size=_TRAIN_BATCH_SIZE)
  eval_dataset = _input_fn(
      fn_args.eval_files,
      fn_args.data_accessor,
      schema,
      batch_size=_EVAL_BATCH_SIZE)

  model = _make_keras_model()
  model.fit(
      train_dataset,
      steps_per_epoch=fn_args.train_steps,
      validation_data=eval_dataset,
      validation_steps=fn_args.eval_steps)

  # The result of the training should be saved in `fn_args.serving_model_dir`
  # directory.
  model.save(fn_args.serving_model_dir, save_format='tf')
Writing penguin_trainer.py

فایل ماژول را در GCS کپی کنید که از اجزای خط لوله قابل دسترسی است. از آنجایی که آموزش مدل در GCP انجام می شود، باید این تعریف مدل را آپلود کنیم.

در غیر این صورت، ممکن است بخواهید یک تصویر ظرف شامل فایل ماژول بسازید و از تصویر برای اجرای خط لوله استفاده کنید.

gsutil cp {_trainer_module_file} {MODULE_ROOT}/
InvalidUrlError: Cloud URL scheme should be followed by colon and two slashes: "://". Found: "gs:///pipeline_module/penguin-vertex-pipelines/".

یک تعریف خط لوله بنویسید

ما تابعی را برای ایجاد خط لوله TFX تعریف می کنیم.

# Copied from https://www.tensorflow.org/tfx/tutorials/tfx/penguin_simple and
# slightly modified because we don't need `metadata_path` argument.

def _create_pipeline(pipeline_name: str, pipeline_root: str, data_root: str,
                     module_file: str, serving_model_dir: str,
                     ) -> tfx.dsl.Pipeline:
  """Creates a three component penguin pipeline with TFX."""
  # Brings data into the pipeline.
  example_gen = tfx.components.CsvExampleGen(input_base=data_root)

  # Uses user-provided Python function that trains a model.
  trainer = tfx.components.Trainer(
      module_file=module_file,
      examples=example_gen.outputs['examples'],
      train_args=tfx.proto.TrainArgs(num_steps=100),
      eval_args=tfx.proto.EvalArgs(num_steps=5))

  # Pushes the model to a filesystem destination.
  pusher = tfx.components.Pusher(
      model=trainer.outputs['model'],
      push_destination=tfx.proto.PushDestination(
          filesystem=tfx.proto.PushDestination.Filesystem(
              base_directory=serving_model_dir)))

  # Following three components will be included in the pipeline.
  components = [
      example_gen,
      trainer,
      pusher,
  ]

  return tfx.dsl.Pipeline(
      pipeline_name=pipeline_name,
      pipeline_root=pipeline_root,
      components=components)

خط لوله را روی خطوط لوله Vertex اجرا کنید.

ما در آموزش Simple TFX Pipeline از LocalDagRunner استفاده کردیم که در محیط محلی اجرا می شود. TFX ارکسترهای متعددی را برای اجرای خط لوله شما فراهم می کند. در این آموزش از Vertex Pipelines همراه با Kubeflow V2 dag runner استفاده خواهیم کرد.

برای اجرای خط لوله باید یک رانر تعریف کنیم. شما خط لوله خود را با استفاده از API های TFX در قالب تعریف خط لوله ما کامپایل خواهید کرد.

import os

PIPELINE_DEFINITION_FILE = PIPELINE_NAME + '_pipeline.json'

runner = tfx.orchestration.experimental.KubeflowV2DagRunner(
    config=tfx.orchestration.experimental.KubeflowV2DagRunnerConfig(),
    output_filename=PIPELINE_DEFINITION_FILE)
# Following function will write the pipeline definition to PIPELINE_DEFINITION_FILE.
_ = runner.run(
    _create_pipeline(
        pipeline_name=PIPELINE_NAME,
        pipeline_root=PIPELINE_ROOT,
        data_root=DATA_ROOT,
        module_file=os.path.join(MODULE_ROOT, _trainer_module_file),
        serving_model_dir=SERVING_MODEL_DIR))

فایل تعریف تولید شده را می توان با استفاده از سرویس گیرنده kfp ارسال کرد.

# docs_infra: no_execute
from google.cloud import aiplatform
from google.cloud.aiplatform import pipeline_jobs

aiplatform.init(project=GOOGLE_CLOUD_PROJECT, location=GOOGLE_CLOUD_REGION)

job = pipeline_jobs.PipelineJob(template_path=PIPELINE_DEFINITION_FILE,
                                display_name=PIPELINE_NAME)
job.run(sync=False)

اکنون می‌توانید برای مشاهده پیشرفت به «Vertex AI > Pipelines» در Google Cloud Console مراجعه کنید.