Pelatihan dan Penyajian Vertex AI dengan TFX dan Vertex Pipelines

Tutorial berbasis notebook ini akan membuat dan menjalankan pipeline TFX yang melatih model ML menggunakan layanan Vertex AI Training dan memublikasikannya ke Vertex AI untuk disajikan.

Notebook ini didasarkan pada pipa TFX kami membangun di Wikipedia TFX Pipeline untuk Vertex Pipa Tutorial . Jika Anda belum membaca tutorial itu, Anda harus membacanya sebelum melanjutkan dengan buku catatan ini.

Anda dapat melatih model di Vertex AI menggunakan AutoML, atau menggunakan pelatihan khusus. Dalam pelatihan khusus, Anda dapat memilih berbagai jenis mesin untuk mendukung pekerjaan pelatihan Anda, mengaktifkan pelatihan terdistribusi, menggunakan penyetelan hyperparameter, dan mempercepat dengan GPU.

Anda juga dapat melayani permintaan prediksi dengan menerapkan model terlatih ke Model AI Vertex dan membuat titik akhir.

Dalam tutorial ini, kita akan menggunakan Vertex AI Training dengan tugas khusus untuk melatih model dalam pipeline TFX. Kami juga akan menerapkan model untuk melayani permintaan prediksi menggunakan Vertex AI.

Notebook ini dimaksudkan untuk dijalankan di Google CoLab atau AI Notebook platform . Jika Anda tidak menggunakan salah satunya, Anda cukup mengeklik tombol "Jalankan di Google Colab" di atas.

Mempersiapkan

Jika Anda telah menyelesaikan Sederhana TFX Pipeline untuk Vertex Pipa Tutorial , Anda akan memiliki proyek GCP bekerja dan ember GCS dan itu adalah semua yang kita butuhkan untuk tutorial ini. Silakan baca tutorial pendahuluan terlebih dahulu jika Anda melewatkannya.

Instal paket python

Kami akan menginstal paket Python yang diperlukan termasuk TFX dan KFP untuk membuat pipeline ML dan mengirimkan pekerjaan ke Vertex Pipelines.

# Use the latest version of pip.
pip install --upgrade pip
pip install --upgrade "tfx[kfp]<2"

Apakah Anda me-restart runtime?

Jika Anda menggunakan Google Colab, pertama kali menjalankan sel di atas, Anda harus memulai ulang runtime dengan mengeklik tombol "RESTART RUNTIME" di atas atau menggunakan menu "Runtime > Restart runtime ...". Ini karena cara Colab memuat paket.

Jika Anda tidak menggunakan Colab, Anda dapat memulai ulang runtime dengan sel berikut.

# docs_infra: no_execute
import sys
if not 'google.colab' in sys.modules:
  # Automatically restart kernel after installs
  import IPython
  app = IPython.Application.instance()
  app.kernel.do_shutdown(True)

Masuk ke Google untuk buku catatan ini

Jika Anda menjalankan notebook ini di Colab, autentikasi dengan akun pengguna Anda:

import sys
if 'google.colab' in sys.modules:
  from google.colab import auth
  auth.authenticate_user()

Jika Anda berada di AI Notebook Platform, mengotentikasi dengan Google Cloud sebelum menjalankan bagian berikutnya, dengan menjalankan

gcloud auth login

di jendela Terminal (yang dapat Anda membuka melalui File> New di menu). Anda hanya perlu melakukan ini sekali per instance notebook.

Periksa versi paket.

import tensorflow as tf
print('TensorFlow version: {}'.format(tf.__version__))
from tfx import v1 as tfx
print('TFX version: {}'.format(tfx.__version__))
import kfp
print('KFP version: {}'.format(kfp.__version__))
TensorFlow version: 2.6.2
TFX version: 1.4.0
KFP version: 1.8.1

Mengatur variabel

Kami akan menyiapkan beberapa variabel yang digunakan untuk menyesuaikan jalur pipa di bawah ini. Informasi berikut diperlukan:

Masukkan nilai-nilai yang diperlukan dalam sel di bawah ini sebelum menjalankannya.

GOOGLE_CLOUD_PROJECT = ''     # <--- ENTER THIS
GOOGLE_CLOUD_REGION = ''      # <--- ENTER THIS
GCS_BUCKET_NAME = ''          # <--- ENTER THIS

if not (GOOGLE_CLOUD_PROJECT and GOOGLE_CLOUD_REGION and GCS_BUCKET_NAME):
    from absl import logging
    logging.error('Please set all required parameters.')
ERROR:absl:Please set all required parameters.

Set gcloud untuk menggunakan proyek Anda.

gcloud config set project {GOOGLE_CLOUD_PROJECT}
ERROR: (gcloud.config.set) argument VALUE: Must be specified.
Usage: gcloud config set SECTION/PROPERTY VALUE [optional flags]
  optional flags may be  --help | --installation

For detailed information on this command and its flags, run:
  gcloud config set --help
PIPELINE_NAME = 'penguin-vertex-training'

# Path to various pipeline artifact.
PIPELINE_ROOT = 'gs://{}/pipeline_root/{}'.format(GCS_BUCKET_NAME, PIPELINE_NAME)

# Paths for users' Python module.
MODULE_ROOT = 'gs://{}/pipeline_module/{}'.format(GCS_BUCKET_NAME, PIPELINE_NAME)

# Paths for users' data.
DATA_ROOT = 'gs://{}/data/{}'.format(GCS_BUCKET_NAME, PIPELINE_NAME)

# Name of Vertex AI Endpoint.
ENDPOINT_NAME = 'prediction-' + PIPELINE_NAME

print('PIPELINE_ROOT: {}'.format(PIPELINE_ROOT))
PIPELINE_ROOT: gs:///pipeline_root/penguin-vertex-training

Siapkan contoh data

Kami akan menggunakan yang sama dataset Palmer Penguins sebagai Simple TFX Pipeline Tutorial .

Terdapat empat fitur numerik pada dataset ini yang telah dinormalisasi dengan range [0,1]. Kami akan membangun model klasifikasi yang memprediksi species penguin.

Kita perlu membuat salinan dataset kita sendiri. Karena TFX ExampleGen membaca input dari direktori, kita perlu membuat direktori dan menyalin dataset ke dalamnya di GCS.

gsutil cp gs://download.tensorflow.org/data/palmer_penguins/penguins_processed.csv {DATA_ROOT}/
InvalidUrlError: Cloud URL scheme should be followed by colon and two slashes: "://". Found: "gs:///data/penguin-vertex-training/".

Lihat sekilas file CSV.

gsutil cat {DATA_ROOT}/penguins_processed.csv | head
InvalidUrlError: Cloud URL scheme should be followed by colon and two slashes: "://". Found: "gs:///data/penguin-vertex-training/penguins_processed.csv".

Buat saluran pipa

Pipa kami akan sangat mirip dengan pipa kita buat di Wikipedia TFX Pipeline untuk Vertex Pipa Tutorial . Pipeline akan terdiri dari tiga komponen, CsvExampleGen, Trainer dan Pusher. Tapi kita akan menggunakan komponen Trainer dan Pusher khusus. Komponen Trainer akan memindahkan beban kerja pelatihan ke Vertex AI, dan komponen Pusher akan memublikasikan model ML terlatih ke Vertex AI, bukan sistem file.

TFX menyediakan khusus Trainer untuk menyerahkan pekerjaan pelatihan untuk layanan Vertex AI Training. Yang harus kita lakukan adalah menggunakan Trainer dalam modul ekstensi bukan standar Trainer komponen bersama dengan beberapa parameter GCP diperlukan.

Dalam tutorial ini, kita akan menjalankan pekerjaan Vertex AI Training hanya menggunakan CPU terlebih dahulu dan kemudian dengan GPU.

TFX juga menyediakan khusus Pusher untuk meng-upload model untuk Vertex AI Model. Pusher akan menciptakan sumber daya Vertex AI Endpoint untuk melayani perdictions online, juga. Lihat dokumentasi Vertex AI untuk mempelajari lebih lanjut tentang prediksi online yang disediakan oleh Vertex AI.

Tulis kode model.

Model itu sendiri hampir mirip dengan model di Wikipedia TFX Pipeline Tutorial .

Kami akan menambahkan _get_distribution_strategy() fungsi yang menciptakan strategi distribusi TensorFlow dan digunakan dalam run_fn menggunakan MirroredStrategy jika GPU tersedia.

_trainer_module_file = 'penguin_trainer.py'
%%writefile {_trainer_module_file}

# Copied from https://www.tensorflow.org/tfx/tutorials/tfx/penguin_simple and
# slightly modified run_fn() to add distribution_strategy.

from typing import List
from absl import logging
import tensorflow as tf
from tensorflow import keras
from tensorflow_metadata.proto.v0 import schema_pb2
from tensorflow_transform.tf_metadata import schema_utils

from tfx import v1 as tfx
from tfx_bsl.public import tfxio

_FEATURE_KEYS = [
    'culmen_length_mm', 'culmen_depth_mm', 'flipper_length_mm', 'body_mass_g'
]
_LABEL_KEY = 'species'

_TRAIN_BATCH_SIZE = 20
_EVAL_BATCH_SIZE = 10

# Since we're not generating or creating a schema, we will instead create
# a feature spec.  Since there are a fairly small number of features this is
# manageable for this dataset.
_FEATURE_SPEC = {
    **{
        feature: tf.io.FixedLenFeature(shape=[1], dtype=tf.float32)
        for feature in _FEATURE_KEYS
    }, _LABEL_KEY: tf.io.FixedLenFeature(shape=[1], dtype=tf.int64)
}


def _input_fn(file_pattern: List[str],
              data_accessor: tfx.components.DataAccessor,
              schema: schema_pb2.Schema,
              batch_size: int) -> tf.data.Dataset:
  """Generates features and label for training.

  Args:
    file_pattern: List of paths or patterns of input tfrecord files.
    data_accessor: DataAccessor for converting input to RecordBatch.
    schema: schema of the input data.
    batch_size: representing the number of consecutive elements of returned
      dataset to combine in a single batch

  Returns:
    A dataset that contains (features, indices) tuple where features is a
      dictionary of Tensors, and indices is a single Tensor of label indices.
  """
  return data_accessor.tf_dataset_factory(
      file_pattern,
      tfxio.TensorFlowDatasetOptions(
          batch_size=batch_size, label_key=_LABEL_KEY),
      schema=schema).repeat()


def _make_keras_model() -> tf.keras.Model:
  """Creates a DNN Keras model for classifying penguin data.

  Returns:
    A Keras Model.
  """
  # The model below is built with Functional API, please refer to
  # https://www.tensorflow.org/guide/keras/overview for all API options.
  inputs = [keras.layers.Input(shape=(1,), name=f) for f in _FEATURE_KEYS]
  d = keras.layers.concatenate(inputs)
  for _ in range(2):
    d = keras.layers.Dense(8, activation='relu')(d)
  outputs = keras.layers.Dense(3)(d)

  model = keras.Model(inputs=inputs, outputs=outputs)
  model.compile(
      optimizer=keras.optimizers.Adam(1e-2),
      loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
      metrics=[keras.metrics.SparseCategoricalAccuracy()])

  model.summary(print_fn=logging.info)
  return model


# NEW: Read `use_gpu` from the custom_config of the Trainer.
#      if it uses GPU, enable MirroredStrategy.
def _get_distribution_strategy(fn_args: tfx.components.FnArgs):
  if fn_args.custom_config.get('use_gpu', False):
    logging.info('Using MirroredStrategy with one GPU.')
    return tf.distribute.MirroredStrategy(devices=['device:GPU:0'])
  return None


# TFX Trainer will call this function.
def run_fn(fn_args: tfx.components.FnArgs):
  """Train the model based on given args.

  Args:
    fn_args: Holds args used to train the model as name/value pairs.
  """

  # This schema is usually either an output of SchemaGen or a manually-curated
  # version provided by pipeline author. A schema can also derived from TFT
  # graph if a Transform component is used. In the case when either is missing,
  # `schema_from_feature_spec` could be used to generate schema from very simple
  # feature_spec, but the schema returned would be very primitive.
  schema = schema_utils.schema_from_feature_spec(_FEATURE_SPEC)

  train_dataset = _input_fn(
      fn_args.train_files,
      fn_args.data_accessor,
      schema,
      batch_size=_TRAIN_BATCH_SIZE)
  eval_dataset = _input_fn(
      fn_args.eval_files,
      fn_args.data_accessor,
      schema,
      batch_size=_EVAL_BATCH_SIZE)

  # NEW: If we have a distribution strategy, build a model in a strategy scope.
  strategy = _get_distribution_strategy(fn_args)
  if strategy is None:
    model = _make_keras_model()
  else:
    with strategy.scope():
      model = _make_keras_model()

  model.fit(
      train_dataset,
      steps_per_epoch=fn_args.train_steps,
      validation_data=eval_dataset,
      validation_steps=fn_args.eval_steps)

  # The result of the training should be saved in `fn_args.serving_model_dir`
  # directory.
  model.save(fn_args.serving_model_dir, save_format='tf')
Writing penguin_trainer.py

Salin file modul ke GCS yang dapat diakses dari komponen pipeline.

Jika tidak, Anda mungkin ingin membuat image container termasuk file modul dan menggunakan image tersebut untuk menjalankan pekerjaan pipeline dan AI Platform Training.

gsutil cp {_trainer_module_file} {MODULE_ROOT}/
InvalidUrlError: Cloud URL scheme should be followed by colon and two slashes: "://". Found: "gs:///pipeline_module/penguin-vertex-training/".

Tulis definisi pipa

Kami akan mendefinisikan fungsi untuk membuat pipa TFX. Ada tiga komponen yang sama seperti di Wikipedia TFX Pipeline Tutorial , tapi kami menggunakan Trainer dan Pusher komponen dalam modul ekstensi GCP.

tfx.extensions.google_cloud_ai_platform.Trainer berperilaku seperti biasa Trainer , tapi itu hanya bergerak perhitungan untuk model pelatihan ke awan. Ini meluncurkan pekerjaan khusus dalam layanan Pelatihan Vertex AI dan komponen pelatih dalam sistem orkestrasi hanya akan menunggu sampai pekerjaan Pelatihan AI Vertex selesai.

tfx.extensions.google_cloud_ai_platform.Pusher menciptakan Vertex AI Model dan Vertex AI Endpoint menggunakan model terlatih.

def _create_pipeline(pipeline_name: str, pipeline_root: str, data_root: str,
                     module_file: str, endpoint_name: str, project_id: str,
                     region: str, use_gpu: bool) -> tfx.dsl.Pipeline:
  """Implements the penguin pipeline with TFX."""
  # Brings data into the pipeline or otherwise joins/converts training data.
  example_gen = tfx.components.CsvExampleGen(input_base=data_root)

  # NEW: Configuration for Vertex AI Training.
  # This dictionary will be passed as `CustomJobSpec`.
  vertex_job_spec = {
      'project': project_id,
      'worker_pool_specs': [{
          'machine_spec': {
              'machine_type': 'n1-standard-4',
          },
          'replica_count': 1,
          'container_spec': {
              'image_uri': 'gcr.io/tfx-oss-public/tfx:{}'.format(tfx.__version__),
          },
      }],
  }
  if use_gpu:
    # See https://cloud.google.com/vertex-ai/docs/reference/rest/v1/MachineSpec#acceleratortype
    # for available machine types.
    vertex_job_spec['worker_pool_specs'][0]['machine_spec'].update({
        'accelerator_type': 'NVIDIA_TESLA_K80',
        'accelerator_count': 1
    })

  # Trains a model using Vertex AI Training.
  # NEW: We need to specify a Trainer for GCP with related configs.
  trainer = tfx.extensions.google_cloud_ai_platform.Trainer(
      module_file=module_file,
      examples=example_gen.outputs['examples'],
      train_args=tfx.proto.TrainArgs(num_steps=100),
      eval_args=tfx.proto.EvalArgs(num_steps=5),
      custom_config={
          tfx.extensions.google_cloud_ai_platform.ENABLE_UCAIP_KEY:
              True,
          tfx.extensions.google_cloud_ai_platform.UCAIP_REGION_KEY:
              region,
          tfx.extensions.google_cloud_ai_platform.TRAINING_ARGS_KEY:
              vertex_job_spec,
          'use_gpu':
              use_gpu,
      })

  # NEW: Configuration for pusher.
  vertex_serving_spec = {
      'project_id': project_id,
      'endpoint_name': endpoint_name,
      # Remaining argument is passed to aiplatform.Model.deploy()
      # See https://cloud.google.com/vertex-ai/docs/predictions/deploy-model-api#deploy_the_model
      # for the detail.
      #
      # Machine type is the compute resource to serve prediction requests.
      # See https://cloud.google.com/vertex-ai/docs/predictions/configure-compute#machine-types
      # for available machine types and acccerators.
      'machine_type': 'n1-standard-4',
  }

  # Vertex AI provides pre-built containers with various configurations for
  # serving.
  # See https://cloud.google.com/vertex-ai/docs/predictions/pre-built-containers
  # for available container images.
  serving_image = 'us-docker.pkg.dev/vertex-ai/prediction/tf2-cpu.2-6:latest'
  if use_gpu:
    vertex_serving_spec.update({
        'accelerator_type': 'NVIDIA_TESLA_K80',
        'accelerator_count': 1
    })
    serving_image = 'us-docker.pkg.dev/vertex-ai/prediction/tf2-gpu.2-6:latest'

  # NEW: Pushes the model to Vertex AI.
  pusher = tfx.extensions.google_cloud_ai_platform.Pusher(
      model=trainer.outputs['model'],
      custom_config={
          tfx.extensions.google_cloud_ai_platform.ENABLE_VERTEX_KEY:
              True,
          tfx.extensions.google_cloud_ai_platform.VERTEX_REGION_KEY:
              region,
          tfx.extensions.google_cloud_ai_platform.VERTEX_CONTAINER_IMAGE_URI_KEY:
              serving_image,
          tfx.extensions.google_cloud_ai_platform.SERVING_ARGS_KEY:
            vertex_serving_spec,
      })

  components = [
      example_gen,
      trainer,
      pusher,
  ]

  return tfx.dsl.Pipeline(
      pipeline_name=pipeline_name,
      pipeline_root=pipeline_root,
      components=components)

Jalankan pipeline di Vertex Pipelines.

Kami akan menggunakan Vertex Pipa untuk menjalankan pipa seperti yang kita lakukan di Wikipedia TFX Pipeline untuk Vertex Pipa Tutorial .

import os

PIPELINE_DEFINITION_FILE = PIPELINE_NAME + '_pipeline.json'

runner = tfx.orchestration.experimental.KubeflowV2DagRunner(
    config=tfx.orchestration.experimental.KubeflowV2DagRunnerConfig(),
    output_filename=PIPELINE_DEFINITION_FILE)
_ = runner.run(
    _create_pipeline(
        pipeline_name=PIPELINE_NAME,
        pipeline_root=PIPELINE_ROOT,
        data_root=DATA_ROOT,
        module_file=os.path.join(MODULE_ROOT, _trainer_module_file),
        endpoint_name=ENDPOINT_NAME,
        project_id=GOOGLE_CLOUD_PROJECT,
        region=GOOGLE_CLOUD_REGION,
        # We will use CPUs only for now.
        use_gpu=False))

File definisi yang dihasilkan dapat disampaikan dengan menggunakan Google Cloud aiplatform klien di google-cloud-aiplatform paket.

# docs_infra: no_execute
from google.cloud import aiplatform
from google.cloud.aiplatform import pipeline_jobs

aiplatform.init(project=GOOGLE_CLOUD_PROJECT, location=GOOGLE_CLOUD_REGION)

job = pipeline_jobs.PipelineJob(template_path=PIPELINE_DEFINITION_FILE,
                                display_name=PIPELINE_NAME)
job.run(sync=False)

Sekarang Anda dapat mengunjungi 'Vertex AI> Pipa' di Google Cloud Console untuk melihat kemajuan.

Uji dengan permintaan prediksi

Setelah selesai pipa, Anda akan menemukan model dikerahkan di salah satu titik akhir di 'Vertex AI> Endpoint'. Kita perlu mengetahui id endpoint untuk mengirim permintaan prediksi ke endpoint baru. Hal ini berbeda dari nama endpoint kita masuk di atas. Anda dapat menemukan id di halaman endpoint di Google Cloud Console , sepertinya jumlah yang sangat panjang.

Setel ENDPOINT_ID di bawah ini sebelum menjalankannya.

ENDPOINT_ID=''     # <--- ENTER THIS
if not ENDPOINT_ID:
    from absl import logging
    logging.error('Please set the endpoint id.')
ERROR:absl:Please set the endpoint id.

Kami menggunakan klien aiplatform yang sama untuk mengirim permintaan ke titik akhir. Kami akan mengirimkan permintaan prediksi untuk klasifikasi spesies Penguin. Inputnya adalah empat fitur yang kami gunakan, dan model akan mengembalikan tiga nilai, karena model kami mengeluarkan satu nilai untuk setiap spesies.

Misalnya, contoh spesifik berikut memiliki nilai terbesar pada indeks '2' dan akan mencetak '2'.

# docs_infra: no_execute
import numpy as np

# The AI Platform services require regional API endpoints.
client_options = {
    'api_endpoint': GOOGLE_CLOUD_REGION + '-aiplatform.googleapis.com'
    }
# Initialize client that will be used to create and send requests.
client = aiplatform.gapic.PredictionServiceClient(client_options=client_options)

# Set data values for the prediction request.
# Our model expects 4 feature inputs and produces 3 output values for each
# species. Note that the output is logit value rather than probabilities.
# See the model code to understand input / output structure.
instances = [{
    'culmen_length_mm':[0.71],
    'culmen_depth_mm':[0.38],
    'flipper_length_mm':[0.98],
    'body_mass_g': [0.78],
}]

endpoint = client.endpoint_path(
    project=GOOGLE_CLOUD_PROJECT,
    location=GOOGLE_CLOUD_REGION,
    endpoint=ENDPOINT_ID,
)
# Send a prediction request and get response.
response = client.predict(endpoint=endpoint, instances=instances)

# Uses argmax to find the index of the maximum value.
print('species:', np.argmax(response.predictions[0]))

Untuk informasi rinci tentang prediksi online, silakan kunjungi halaman endpoint di Google Cloud Console . Anda dapat menemukan panduan untuk mengirimkan permintaan sampel dan tautan ke lebih banyak sumber daya.

Jalankan pipeline menggunakan GPU

Vertex AI mendukung pelatihan menggunakan berbagai jenis mesin termasuk dukungan untuk GPU. Lihat referensi Machine spesifikasi untuk pilihan yang tersedia.

Kami telah menetapkan saluran kami untuk mendukung pelatihan GPU. Semua yang perlu kita lakukan adalah pengaturan use_gpu bendera ke True. Kemudian pipa akan dibuat dengan spek mesin termasuk satu NVIDIA_TESLA_K80 dan kode pelatihan model kami akan menggunakan tf.distribute.MirroredStrategy .

Perhatikan bahwa use_gpu bendera bukan merupakan bagian dari Vertex atau TFX API. Itu hanya digunakan untuk mengontrol kode pelatihan dalam tutorial ini.

# docs_infra: no_execute
runner.run(
    _create_pipeline(
        pipeline_name=PIPELINE_NAME,
        pipeline_root=PIPELINE_ROOT,
        data_root=DATA_ROOT,
        module_file=os.path.join(MODULE_ROOT, _trainer_module_file),
        endpoint_name=ENDPOINT_NAME,
        project_id=GOOGLE_CLOUD_PROJECT,
        region=GOOGLE_CLOUD_REGION,
        # Updated: Use GPUs. We will use a NVIDIA_TESLA_K80 and 
        # the model code will use tf.distribute.MirroredStrategy.
        use_gpu=True))

job = pipeline_jobs.PipelineJob(template_path=PIPELINE_DEFINITION_FILE,
                                display_name=PIPELINE_NAME)
job.run(sync=False)

Sekarang Anda dapat mengunjungi 'Vertex AI> Pipa' di Google Cloud Console untuk melihat kemajuan.

Membersihkan

Anda telah membuat Model dan Titik Akhir Vertex AI dalam tutorial ini. Hapus sumber daya ini untuk menghindari biaya yang tidak diinginkan dengan pergi ke Endpoint dan undeploying model dari endpoint pertama. Kemudian Anda dapat menghapus titik akhir dan model secara terpisah.