אימון והגשה של Vertex AI עם TFX ו-Vertex Pipelines

הדרכה מבוססת מחברת זו תיצור ותפעיל צינור TFX אשר מאמן מודל ML באמצעות שירות Vertex AI Training ומפרסם אותו ל-Vertex AI לצורך הגשה.

מחברת זו מבוססת על צינור TFX שבנינו צינור TFX פשוט עבור הדרכת צינורות ורטקס . אם עדיין לא קראת את המדריך הזה, עליך לקרוא אותו לפני שתמשיך עם מחברת זו.

אתה יכול לאמן דגמים על Vertex AI באמצעות AutoML, או להשתמש באימון מותאם אישית. באימון מותאם אישית, אתה יכול לבחור סוגי מכונות רבים ושונים כדי להפעיל את עבודות האימון שלך, לאפשר אימון מבוזר, להשתמש בכוונון היפרפרמטרים ולהאיץ עם GPUs.

אתה יכול גם להגיש בקשות חיזוי על ידי פריסת המודל המאומן ל-Vertex AI Models ויצירת נקודת קצה.

במדריך זה, נשתמש ב-Vertex AI Training עם עבודות מותאמות אישית כדי להכשיר מודל בצינור TFX. אנו גם נפרוס את המודל לשרת בקשת חיזוי באמצעות Vertex AI.

מחברת זה מיועד שיופעל על גוגל Colab או על מחשבים ניידים פלטפורמת AI . אם אינך משתמש באחד מאלה, תוכל פשוט ללחוץ על כפתור "הפעל ב-Google Colab" למעלה.

להכין

אם השלמת פשוט TFX צינור ורטקס צינורות הדרכה , תצטרך פרויקט GCP עבודה ודלי GCS וזה כל מה שאנחנו צריכים בשביל הדרכה זו. אנא קרא תחילה את המדריך המקדים אם פספסת אותו.

התקן חבילות python

נתקין חבילות Python הנדרשות כולל TFX ו-KFP כדי ליצור צינורות ML ולהגיש עבודות ל-Vertex Pipelines.

# Use the latest version of pip.
pip install --upgrade pip
pip install --upgrade "tfx[kfp]<2"

הפעלת מחדש את זמן הריצה?

אם אתה משתמש ב-Google Colab, בפעם הראשונה שאתה מפעיל את התא שלמעלה, עליך להפעיל מחדש את זמן הריצה על ידי לחיצה מעל לחצן "התחל ריצה מחדש" או שימוש בתפריט "זמן ריצה > הפעל מחדש זמן ריצה...". זה בגלל האופן שבו קולאב טוען חבילות.

אם אינך ב-Colab, תוכל להפעיל מחדש את זמן הריצה עם התא הבא.

# docs_infra: no_execute
import sys
if not 'google.colab' in sys.modules:
  # Automatically restart kernel after installs
  import IPython
  app = IPython.Application.instance()
  app.kernel.do_shutdown(True)

היכנס ל-Google עבור מחברת זו

אם אתה מפעיל מחברת זו ב-Colab, בצע אימות עם חשבון המשתמש שלך:

import sys
if 'google.colab' in sys.modules:
  from google.colab import auth
  auth.authenticate_user()

אם אתה על מחשבים ניידים פלטפורמת AI, אימות ב- Google Cloud Print לפני הריצה בסעיף הבא, על ידי הפעלה

gcloud auth login

בחלון המסוף (שבו אתה יכול לפתוח באמצעות קובץ> חדש בתפריט). אתה צריך לעשות זאת רק פעם אחת בכל מופע של מחברת.

בדוק את גרסאות החבילה.

import tensorflow as tf
print('TensorFlow version: {}'.format(tf.__version__))
from tfx import v1 as tfx
print('TFX version: {}'.format(tfx.__version__))
import kfp
print('KFP version: {}'.format(kfp.__version__))
TensorFlow version: 2.6.2
TFX version: 1.4.0
KFP version: 1.8.1

הגדר משתנים

אנו נגדיר כמה משתנים המשמשים להתאמה אישית של הצינורות להלן. יש צורך במידע הבא:

הזן את הערכים הנדרשים בתא שמתחתיו לפני הפעלתו.

GOOGLE_CLOUD_PROJECT = ''     # <--- ENTER THIS
GOOGLE_CLOUD_REGION = ''      # <--- ENTER THIS
GCS_BUCKET_NAME = ''          # <--- ENTER THIS

if not (GOOGLE_CLOUD_PROJECT and GOOGLE_CLOUD_REGION and GCS_BUCKET_NAME):
    from absl import logging
    logging.error('Please set all required parameters.')
ERROR:absl:Please set all required parameters.

סט gcloud להשתמש בפרויקט שלך.

gcloud config set project {GOOGLE_CLOUD_PROJECT}
ERROR: (gcloud.config.set) argument VALUE: Must be specified.
Usage: gcloud config set SECTION/PROPERTY VALUE [optional flags]
  optional flags may be  --help | --installation

For detailed information on this command and its flags, run:
  gcloud config set --help
PIPELINE_NAME = 'penguin-vertex-training'

# Path to various pipeline artifact.
PIPELINE_ROOT = 'gs://{}/pipeline_root/{}'.format(GCS_BUCKET_NAME, PIPELINE_NAME)

# Paths for users' Python module.
MODULE_ROOT = 'gs://{}/pipeline_module/{}'.format(GCS_BUCKET_NAME, PIPELINE_NAME)

# Paths for users' data.
DATA_ROOT = 'gs://{}/data/{}'.format(GCS_BUCKET_NAME, PIPELINE_NAME)

# Name of Vertex AI Endpoint.
ENDPOINT_NAME = 'prediction-' + PIPELINE_NAME

print('PIPELINE_ROOT: {}'.format(PIPELINE_ROOT))
PIPELINE_ROOT: gs:///pipeline_root/penguin-vertex-training

הכן נתונים לדוגמה

נשתמש באותה במערך הפינגווינים פאלמר כפי Simple TFX צינור הדרכה .

ישנן ארבע תכונות מספריות במערך נתונים זה שכבר נורמלו לטווח [0,1]. אנחנו נבנינו מודל סיווג אשר חוזה את species של פינגווינים.

אנחנו צריכים ליצור עותק משלנו של מערך הנתונים. מכיוון ש-TFX ExampleGen קורא קלט מספריה, עלינו ליצור ספרייה ולהעתיק אליה מערך נתונים ב-GCS.

gsutil cp gs://download.tensorflow.org/data/palmer_penguins/penguins_processed.csv {DATA_ROOT}/
InvalidUrlError: Cloud URL scheme should be followed by colon and two slashes: "://". Found: "gs:///data/penguin-vertex-training/".

עיין במהירות בקובץ ה-CSV.

gsutil cat {DATA_ROOT}/penguins_processed.csv | head
InvalidUrlError: Cloud URL scheme should be followed by colon and two slashes: "://". Found: "gs:///data/penguin-vertex-training/penguins_processed.csv".

צור צינור

הצינור שלנו יהיה מאוד דומה בצנרת שיצרנו פשוט TFX צינור ורטקס צינורות הדרכה . הצינור יהיה מורכב משלושה רכיבים, CsvExampleGen, Trainer ו-Pusher. אבל נשתמש ברכיב מיוחד של Trainer ו-Pusher. רכיב ה-Trainer יעביר עומסי אימון ל-Vertex AI, ורכיב ה-Pusher יפרסם את מודל ה-ML המאומן ל-Vertex AI במקום למערכת קבצים.

TFX מספק מיוחד Trainer להגיש עבודות הכשרה לשירות הדרכה ורטקס AI. כל שעלינו לעשות הוא השימוש Trainer במודול הרחבה במקום תקן Trainer מרכיב יחד עם כמה פרמטרים GCP הנדרש.

במדריך זה, נריץ עבודות Vertex AI Training רק באמצעות CPUs תחילה ולאחר מכן עם GPU.

TFX מספק גם מיוחד Pusher להעלות את מודל מודלי AI ורטקס. Pusher תיצור משאב Endpoint ורטקס AI לשרת perdictions באינטרנט, מדי. ראה תיעוד ורטקס AI כדי ללמוד עוד על תחזיות באינטרנט שמספקת ורטקס AI.

כתוב קוד דגם.

המודל עצמו הוא כמעט דומה למודל פשוט TFX צינור הדרכה .

נוסיף _get_distribution_strategy() הפונקציה היוצרת אסטרטגיית ההפצה TensorFlow וזה משמש run_fn להשתמש MirroredStrategy אם GPU הוא זמין.

_trainer_module_file = 'penguin_trainer.py'
%%writefile {_trainer_module_file}

# Copied from https://www.tensorflow.org/tfx/tutorials/tfx/penguin_simple and
# slightly modified run_fn() to add distribution_strategy.

from typing import List
from absl import logging
import tensorflow as tf
from tensorflow import keras
from tensorflow_metadata.proto.v0 import schema_pb2
from tensorflow_transform.tf_metadata import schema_utils

from tfx import v1 as tfx
from tfx_bsl.public import tfxio

_FEATURE_KEYS = [
    'culmen_length_mm', 'culmen_depth_mm', 'flipper_length_mm', 'body_mass_g'
]
_LABEL_KEY = 'species'

_TRAIN_BATCH_SIZE = 20
_EVAL_BATCH_SIZE = 10

# Since we're not generating or creating a schema, we will instead create
# a feature spec.  Since there are a fairly small number of features this is
# manageable for this dataset.
_FEATURE_SPEC = {
    **{
        feature: tf.io.FixedLenFeature(shape=[1], dtype=tf.float32)
        for feature in _FEATURE_KEYS
    }, _LABEL_KEY: tf.io.FixedLenFeature(shape=[1], dtype=tf.int64)
}


def _input_fn(file_pattern: List[str],
              data_accessor: tfx.components.DataAccessor,
              schema: schema_pb2.Schema,
              batch_size: int) -> tf.data.Dataset:
  """Generates features and label for training.

  Args:
    file_pattern: List of paths or patterns of input tfrecord files.
    data_accessor: DataAccessor for converting input to RecordBatch.
    schema: schema of the input data.
    batch_size: representing the number of consecutive elements of returned
      dataset to combine in a single batch

  Returns:
    A dataset that contains (features, indices) tuple where features is a
      dictionary of Tensors, and indices is a single Tensor of label indices.
  """
  return data_accessor.tf_dataset_factory(
      file_pattern,
      tfxio.TensorFlowDatasetOptions(
          batch_size=batch_size, label_key=_LABEL_KEY),
      schema=schema).repeat()


def _make_keras_model() -> tf.keras.Model:
  """Creates a DNN Keras model for classifying penguin data.

  Returns:
    A Keras Model.
  """
  # The model below is built with Functional API, please refer to
  # https://www.tensorflow.org/guide/keras/overview for all API options.
  inputs = [keras.layers.Input(shape=(1,), name=f) for f in _FEATURE_KEYS]
  d = keras.layers.concatenate(inputs)
  for _ in range(2):
    d = keras.layers.Dense(8, activation='relu')(d)
  outputs = keras.layers.Dense(3)(d)

  model = keras.Model(inputs=inputs, outputs=outputs)
  model.compile(
      optimizer=keras.optimizers.Adam(1e-2),
      loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
      metrics=[keras.metrics.SparseCategoricalAccuracy()])

  model.summary(print_fn=logging.info)
  return model


# NEW: Read `use_gpu` from the custom_config of the Trainer.
#      if it uses GPU, enable MirroredStrategy.
def _get_distribution_strategy(fn_args: tfx.components.FnArgs):
  if fn_args.custom_config.get('use_gpu', False):
    logging.info('Using MirroredStrategy with one GPU.')
    return tf.distribute.MirroredStrategy(devices=['device:GPU:0'])
  return None


# TFX Trainer will call this function.
def run_fn(fn_args: tfx.components.FnArgs):
  """Train the model based on given args.

  Args:
    fn_args: Holds args used to train the model as name/value pairs.
  """

  # This schema is usually either an output of SchemaGen or a manually-curated
  # version provided by pipeline author. A schema can also derived from TFT
  # graph if a Transform component is used. In the case when either is missing,
  # `schema_from_feature_spec` could be used to generate schema from very simple
  # feature_spec, but the schema returned would be very primitive.
  schema = schema_utils.schema_from_feature_spec(_FEATURE_SPEC)

  train_dataset = _input_fn(
      fn_args.train_files,
      fn_args.data_accessor,
      schema,
      batch_size=_TRAIN_BATCH_SIZE)
  eval_dataset = _input_fn(
      fn_args.eval_files,
      fn_args.data_accessor,
      schema,
      batch_size=_EVAL_BATCH_SIZE)

  # NEW: If we have a distribution strategy, build a model in a strategy scope.
  strategy = _get_distribution_strategy(fn_args)
  if strategy is None:
    model = _make_keras_model()
  else:
    with strategy.scope():
      model = _make_keras_model()

  model.fit(
      train_dataset,
      steps_per_epoch=fn_args.train_steps,
      validation_data=eval_dataset,
      validation_steps=fn_args.eval_steps)

  # The result of the training should be saved in `fn_args.serving_model_dir`
  # directory.
  model.save(fn_args.serving_model_dir, save_format='tf')
Writing penguin_trainer.py

העתק את קובץ המודול ל-GCS שניתן לגשת אליו ממרכיבי הצינור.

אחרת, ייתכן שתרצה לבנות תמונת מיכל הכוללת את קובץ המודול ולהשתמש בתמונה כדי להפעיל את עבודות הצינור ו-AI Platform Training.

gsutil cp {_trainer_module_file} {MODULE_ROOT}/
InvalidUrlError: Cloud URL scheme should be followed by colon and two slashes: "://". Found: "gs:///pipeline_module/penguin-vertex-training/".

כתוב הגדרת צינור

נגדיר פונקציה ליצירת צינור TFX. יש לו את אותם שלושה רכיבים כמו Simple TFX צינור הדרכה , אבל אנחנו משתמשים Trainer ו Pusher רכיב במודול הרחבה GCP.

tfx.extensions.google_cloud_ai_platform.Trainer מתנהג כמו רגיל Trainer , אבל זה רק מעביר את החישוב עבור הכשרת מודל הענן. הוא משיק עבודה מותאמת אישית בשירות Vertex AI Training ורכיב המאמן במערכת התזמור פשוט ימתין עד שתסתיים עבודת Vertex AI Training.

tfx.extensions.google_cloud_ai_platform.Pusher יוצר דגם AI ורטקס ו Endpoint AI ורטקס באמצעות מודל מודרך.

def _create_pipeline(pipeline_name: str, pipeline_root: str, data_root: str,
                     module_file: str, endpoint_name: str, project_id: str,
                     region: str, use_gpu: bool) -> tfx.dsl.Pipeline:
  """Implements the penguin pipeline with TFX."""
  # Brings data into the pipeline or otherwise joins/converts training data.
  example_gen = tfx.components.CsvExampleGen(input_base=data_root)

  # NEW: Configuration for Vertex AI Training.
  # This dictionary will be passed as `CustomJobSpec`.
  vertex_job_spec = {
      'project': project_id,
      'worker_pool_specs': [{
          'machine_spec': {
              'machine_type': 'n1-standard-4',
          },
          'replica_count': 1,
          'container_spec': {
              'image_uri': 'gcr.io/tfx-oss-public/tfx:{}'.format(tfx.__version__),
          },
      }],
  }
  if use_gpu:
    # See https://cloud.google.com/vertex-ai/docs/reference/rest/v1/MachineSpec#acceleratortype
    # for available machine types.
    vertex_job_spec['worker_pool_specs'][0]['machine_spec'].update({
        'accelerator_type': 'NVIDIA_TESLA_K80',
        'accelerator_count': 1
    })

  # Trains a model using Vertex AI Training.
  # NEW: We need to specify a Trainer for GCP with related configs.
  trainer = tfx.extensions.google_cloud_ai_platform.Trainer(
      module_file=module_file,
      examples=example_gen.outputs['examples'],
      train_args=tfx.proto.TrainArgs(num_steps=100),
      eval_args=tfx.proto.EvalArgs(num_steps=5),
      custom_config={
          tfx.extensions.google_cloud_ai_platform.ENABLE_UCAIP_KEY:
              True,
          tfx.extensions.google_cloud_ai_platform.UCAIP_REGION_KEY:
              region,
          tfx.extensions.google_cloud_ai_platform.TRAINING_ARGS_KEY:
              vertex_job_spec,
          'use_gpu':
              use_gpu,
      })

  # NEW: Configuration for pusher.
  vertex_serving_spec = {
      'project_id': project_id,
      'endpoint_name': endpoint_name,
      # Remaining argument is passed to aiplatform.Model.deploy()
      # See https://cloud.google.com/vertex-ai/docs/predictions/deploy-model-api#deploy_the_model
      # for the detail.
      #
      # Machine type is the compute resource to serve prediction requests.
      # See https://cloud.google.com/vertex-ai/docs/predictions/configure-compute#machine-types
      # for available machine types and acccerators.
      'machine_type': 'n1-standard-4',
  }

  # Vertex AI provides pre-built containers with various configurations for
  # serving.
  # See https://cloud.google.com/vertex-ai/docs/predictions/pre-built-containers
  # for available container images.
  serving_image = 'us-docker.pkg.dev/vertex-ai/prediction/tf2-cpu.2-6:latest'
  if use_gpu:
    vertex_serving_spec.update({
        'accelerator_type': 'NVIDIA_TESLA_K80',
        'accelerator_count': 1
    })
    serving_image = 'us-docker.pkg.dev/vertex-ai/prediction/tf2-gpu.2-6:latest'

  # NEW: Pushes the model to Vertex AI.
  pusher = tfx.extensions.google_cloud_ai_platform.Pusher(
      model=trainer.outputs['model'],
      custom_config={
          tfx.extensions.google_cloud_ai_platform.ENABLE_VERTEX_KEY:
              True,
          tfx.extensions.google_cloud_ai_platform.VERTEX_REGION_KEY:
              region,
          tfx.extensions.google_cloud_ai_platform.VERTEX_CONTAINER_IMAGE_URI_KEY:
              serving_image,
          tfx.extensions.google_cloud_ai_platform.SERVING_ARGS_KEY:
            vertex_serving_spec,
      })

  components = [
      example_gen,
      trainer,
      pusher,
  ]

  return tfx.dsl.Pipeline(
      pipeline_name=pipeline_name,
      pipeline_root=pipeline_root,
      components=components)

הפעל את הצינור ב-Vertex Pipelines.

נשתמש ורטקס צינורות לרוץ בצנרת כפי שעשינו פשוט TFX צינור ורטקס צינורות הדרכה .

import os

PIPELINE_DEFINITION_FILE = PIPELINE_NAME + '_pipeline.json'

runner = tfx.orchestration.experimental.KubeflowV2DagRunner(
    config=tfx.orchestration.experimental.KubeflowV2DagRunnerConfig(),
    output_filename=PIPELINE_DEFINITION_FILE)
_ = runner.run(
    _create_pipeline(
        pipeline_name=PIPELINE_NAME,
        pipeline_root=PIPELINE_ROOT,
        data_root=DATA_ROOT,
        module_file=os.path.join(MODULE_ROOT, _trainer_module_file),
        endpoint_name=ENDPOINT_NAME,
        project_id=GOOGLE_CLOUD_PROJECT,
        region=GOOGLE_CLOUD_REGION,
        # We will use CPUs only for now.
        use_gpu=False))

קובץ ההגדרות שנוצר ניתן להגיש באמצעות לקוח Google Cloud aiplatform ב google-cloud-aiplatform החבילה.

# docs_infra: no_execute
from google.cloud import aiplatform
from google.cloud.aiplatform import pipeline_jobs

aiplatform.init(project=GOOGLE_CLOUD_PROJECT, location=GOOGLE_CLOUD_REGION)

job = pipeline_jobs.PipelineJob(template_path=PIPELINE_DEFINITION_FILE,
                                display_name=PIPELINE_NAME)
job.run(sync=False)

עכשיו אתם יכולים לבקר "ורטקס AI> צנרת" ב Google Cloud Console כדי לראות את ההתקדמות.

בדיקה עם בקשת חיזוי

לאחר משלים צינור, תמצאו מודל פרוסים על אחד הקצה ב "ורטקס AI> נקודות קצה". אנחנו צריכים לדעת את המזהה של נקודת הקצה כדי לשלוח בקשת חיזוי לנקודת הקצה החדשה. זה שונה משם הסיום נכנסנו לעיל. אתה יכול למצוא את מזהה את הדף נקודות קצה ב- Google Cloud Console , זה נראה כמו מספר רב מאוד.

הגדר את ENDPOINT_ID למטה לפני הפעלתו.

ENDPOINT_ID=''     # <--- ENTER THIS
if not ENDPOINT_ID:
    from absl import logging
    logging.error('Please set the endpoint id.')
ERROR:absl:Please set the endpoint id.

אנו משתמשים באותו לקוח aiplatform כדי לשלוח בקשה לנקודת הקצה. אנו נשלח בקשת חיזוי לסיווג מיני פינגווין. הקלט הוא ארבעת התכונות שהשתמשנו בהן, והמודל יחזיר שלושה ערכים, מכיוון שהמודל שלנו מוציא ערך אחד עבור כל מין.

לדוגמה, הדוגמה הספציפית הבאה היא בעלת הערך הגדול ביותר באינדקס '2' והיא תדפיס '2'.

# docs_infra: no_execute
import numpy as np

# The AI Platform services require regional API endpoints.
client_options = {
    'api_endpoint': GOOGLE_CLOUD_REGION + '-aiplatform.googleapis.com'
    }
# Initialize client that will be used to create and send requests.
client = aiplatform.gapic.PredictionServiceClient(client_options=client_options)

# Set data values for the prediction request.
# Our model expects 4 feature inputs and produces 3 output values for each
# species. Note that the output is logit value rather than probabilities.
# See the model code to understand input / output structure.
instances = [{
    'culmen_length_mm':[0.71],
    'culmen_depth_mm':[0.38],
    'flipper_length_mm':[0.98],
    'body_mass_g': [0.78],
}]

endpoint = client.endpoint_path(
    project=GOOGLE_CLOUD_PROJECT,
    location=GOOGLE_CLOUD_REGION,
    endpoint=ENDPOINT_ID,
)
# Send a prediction request and get response.
response = client.predict(endpoint=endpoint, instances=instances)

# Uses argmax to find the index of the maximum value.
print('species:', np.argmax(response.predictions[0]))

לקבלת מידע מפורט על חיזוי באינטרנט, בקר בדף נקודות קצה ב- Google Cloud Console . תוכל למצוא מדריך לשליחת בקשות לדוגמה וקישורים למשאבים נוספים.

הפעל את הצינור באמצעות GPU

Vertex AI תומך באימון באמצעות סוגי מכונות שונים כולל תמיכה ב-GPUs. ראה התייחסות המפרט המכונית עבור אפשרויות זמינות.

כבר הגדרנו את הצינור שלנו לתמיכה באימוני GPU. כל מה שאנחנו צריכים לעשות הוא הגדרת use_gpu דגל ל- True. ואז צינור ייוצר עם מפרט מכונית כולל אחד NVIDIA_TESLA_K80 וקוד הכשרת המודל שלנו ישתמש tf.distribute.MirroredStrategy .

שים לב use_gpu הדגל הוא לא חלק של API ורטקס או TFX. הוא משמש רק כדי לשלוט בקוד ההדרכה במדריך זה.

# docs_infra: no_execute
runner.run(
    _create_pipeline(
        pipeline_name=PIPELINE_NAME,
        pipeline_root=PIPELINE_ROOT,
        data_root=DATA_ROOT,
        module_file=os.path.join(MODULE_ROOT, _trainer_module_file),
        endpoint_name=ENDPOINT_NAME,
        project_id=GOOGLE_CLOUD_PROJECT,
        region=GOOGLE_CLOUD_REGION,
        # Updated: Use GPUs. We will use a NVIDIA_TESLA_K80 and 
        # the model code will use tf.distribute.MirroredStrategy.
        use_gpu=True))

job = pipeline_jobs.PipelineJob(template_path=PIPELINE_DEFINITION_FILE,
                                display_name=PIPELINE_NAME)
job.run(sync=False)

עכשיו אתם יכולים לבקר "ורטקס AI> צנרת" ב Google Cloud Console כדי לראות את ההתקדמות.

ניקיון

יצרת מודל ונקודת קצה של Vertex AI במדריך זה. אנא מחק משאבים אלו כדי למנוע חיובים לא רצויים על ידי הולך נקודות קצה ו undeploying המודל מן הקצה הראשון. לאחר מכן תוכל למחוק את נקודת הקצה ואת המודל בנפרד.