頂点AIのトレーニングとTFXおよび頂点パイプラインを使用したサービス

このノートブックベースのチュートリアルでは、Vertex AIトレーニングサービスを使用してMLモデルをトレーニングし、それをVertexAIに公開して提供するTFXパイプラインを作成して実行します。

このノートブックは、私たちが組み込まTFXパイプラインに基づいて頂点パイプラインのチュートリアル用のシンプルなTFXパイプライン。そのチュートリアルをまだ読んでいない場合は、このノートブックに進む前に読んでください。

AutoMLを使用してVertexAIでモデルをトレーニングするか、カスタムトレーニングを使用できます。カスタムトレーニングでは、さまざまなマシンタイプを選択して、トレーニングジョブを強化し、分散トレーニングを有効にし、ハイパーパラメータチューニングを使用し、GPUで高速化できます。

トレーニング済みモデルをVertexAIモデルにデプロイし、エンドポイントを作成することで、予測リクエストを処理することもできます。

このチュートリアルでは、カスタムジョブでVertex AIトレーニングを使用して、TFXパイプラインでモデルをトレーニングします。また、Vertex AIを使用して予測リクエストを処理するために、モデルをデプロイします。

このノートブックは、上で実行されることを意図しているGoogleのコラボかにAIプラットフォームノート。これらのいずれかを使用していない場合は、上の[GoogleColabで実行]ボタンをクリックするだけです。

設定

あなたが完了している場合バーテックスパイプラインのチュートリアル用のシンプルなTFXパイプライン、あなたは作業GCPプロジェクトとGCSバケットを持つことになりますし、それは我々が、このチュートリアルに必要なすべてのです。見逃した場合は、最初に予備チュートリアルをお読みください。

Pythonパッケージをインストールする

TFXやKFPなどの必要なPythonパッケージをインストールして、MLパイプラインを作成し、VertexPipelinesにジョブを送信します。

# Use the latest version of pip.
pip install --upgrade pip
pip install --upgrade "tfx[kfp]<2"

ランタイムを再起動しましたか?

上記のセルを初めて実行するときにGoogleColabを使用している場合は、[ランタイムの再起動]ボタンをクリックするか、[ランタイム]> [ランタイムの再起動...]メニューを使用してランタイムを再起動する必要があります。これは、Colabがパッケージをロードする方法が原因です。

Colabを使用していない場合は、次のセルでランタイムを再開できます。

# docs_infra: no_execute
import sys
if not 'google.colab' in sys.modules:
  # Automatically restart kernel after installs
  import IPython
  app = IPython.Application.instance()
  app.kernel.do_shutdown(True)

このノートブックのためにGoogleにログインします

このノートブックをColabで実行している場合は、ユーザーアカウントで認証します。

import sys
if 'google.colab' in sys.modules:
  from google.colab import auth
  auth.authenticate_user()

あなたが実行することによって、次のセクションを実行する前に、AIプラットフォームノートブック、Googleクラウドで認証している場合は

gcloud auth login

ターミナルウィンドウで(あなたがファイルを経由して開くことができます>メニュー内)。これは、ノートブックインスタンスごとに1回だけ実行する必要があります。

パッケージのバージョンを確認してください。

import tensorflow as tf
print('TensorFlow version: {}'.format(tf.__version__))
from tfx import v1 as tfx
print('TFX version: {}'.format(tfx.__version__))
import kfp
print('KFP version: {}'.format(kfp.__version__))
TensorFlow version: 2.6.2
TFX version: 1.4.0
KFP version: 1.8.1

変数を設定する

以下のパイプラインをカスタマイズするために使用されるいくつかの変数を設定します。次の情報が必要です。

それを実行する前に、下のセルに必要な値を入力します

GOOGLE_CLOUD_PROJECT = ''     # <--- ENTER THIS
GOOGLE_CLOUD_REGION = ''      # <--- ENTER THIS
GCS_BUCKET_NAME = ''          # <--- ENTER THIS

if not (GOOGLE_CLOUD_PROJECT and GOOGLE_CLOUD_REGION and GCS_BUCKET_NAME):
    from absl import logging
    logging.error('Please set all required parameters.')
ERROR:absl:Please set all required parameters.

セットには、 gcloudプロジェクトを使用します。

gcloud config set project {GOOGLE_CLOUD_PROJECT}
ERROR: (gcloud.config.set) argument VALUE: Must be specified.
Usage: gcloud config set SECTION/PROPERTY VALUE [optional flags]
  optional flags may be  --help | --installation

For detailed information on this command and its flags, run:
  gcloud config set --help
PIPELINE_NAME = 'penguin-vertex-training'

# Path to various pipeline artifact.
PIPELINE_ROOT = 'gs://{}/pipeline_root/{}'.format(GCS_BUCKET_NAME, PIPELINE_NAME)

# Paths for users' Python module.
MODULE_ROOT = 'gs://{}/pipeline_module/{}'.format(GCS_BUCKET_NAME, PIPELINE_NAME)

# Paths for users' data.
DATA_ROOT = 'gs://{}/data/{}'.format(GCS_BUCKET_NAME, PIPELINE_NAME)

# Name of Vertex AI Endpoint.
ENDPOINT_NAME = 'prediction-' + PIPELINE_NAME

print('PIPELINE_ROOT: {}'.format(PIPELINE_ROOT))
PIPELINE_ROOT: gs:///pipeline_root/penguin-vertex-training

サンプルデータを準備する

私たちは、同じ使用しますパーマーペンギンのデータセットとしてシンプルTFXパイプラインのチュートリアルを

このデータセットには、範囲[0,1]を持つようにすでに正規化されている4つの数値特徴があります。私たちは、予測、分類モデル構築するspeciesのペンギンのを。

データセットの独自のコピーを作成する必要があります。 TFX ExampleGenはディレクトリから入力を読み取るため、ディレクトリを作成し、GCS上のそのディレクトリにデータセットをコピーする必要があります。

gsutil cp gs://download.tensorflow.org/data/palmer_penguins/penguins_processed.csv {DATA_ROOT}/
InvalidUrlError: Cloud URL scheme should be followed by colon and two slashes: "://". Found: "gs:///data/penguin-vertex-training/".

CSVファイルをざっと見てみましょう。

gsutil cat {DATA_ROOT}/penguins_processed.csv | head
InvalidUrlError: Cloud URL scheme should be followed by colon and two slashes: "://". Found: "gs:///data/penguin-vertex-training/penguins_processed.csv".

パイプラインを作成する

当社のパイプラインは、私たちが作成したパイプラインに非常に似ていますバーテックスパイプラインのチュートリアル用のシンプルなTFXパイプライン。パイプラインは、CsvExampleGen、Trainer、Pusherの3つのコンポーネントで構成されます。ただし、特別なトレーナーおよびプッシャーコンポーネントを使用します。 TrainerコンポーネントはトレーニングワークロードをVertexAIに移動し、PusherコンポーネントはトレーニングされたMLモデルをファイルシステムではなくVertexAIに公開します。

TFXは、特別提供Trainer頂点AIトレーニングサービスへの研修ジョブを送信するために。私たちがしなければならないのは、使用のあるTrainer拡張モジュールの代わりに、標準でTrainerいくつかの必要なGCPのパラメータを持つに沿った成分。

このチュートリアルでは、最初にCPUを使用し、次にGPUを使用してVertexAIトレーニングジョブを実行します。

TFXはまた、特別提供Pusher頂点AIモデルにモデルをアップロードするを。 Pusher 、あまりにも、オンラインperdictionsを提供するために頂点AIエンドポイントのリソースを作成します。参照頂点AIのマニュアルを頂点AIが提供するオンライン予測についてもっと学ぶために。

モデルコードを記述します。

モデル自体は、中のモデルとほぼ同様であるシンプルTFXパイプラインのチュートリアル

私たちは、追加されます_get_distribution_strategy()作成する関数TensorFlowの流通戦略を、中で使用されているrun_fn GPUが利用可能な場合MirroredStrategyを使用します。

_trainer_module_file = 'penguin_trainer.py'
%%writefile {_trainer_module_file}

# Copied from https://www.tensorflow.org/tfx/tutorials/tfx/penguin_simple and
# slightly modified run_fn() to add distribution_strategy.

from typing import List
from absl import logging
import tensorflow as tf
from tensorflow import keras
from tensorflow_metadata.proto.v0 import schema_pb2
from tensorflow_transform.tf_metadata import schema_utils

from tfx import v1 as tfx
from tfx_bsl.public import tfxio

_FEATURE_KEYS = [
    'culmen_length_mm', 'culmen_depth_mm', 'flipper_length_mm', 'body_mass_g'
]
_LABEL_KEY = 'species'

_TRAIN_BATCH_SIZE = 20
_EVAL_BATCH_SIZE = 10

# Since we're not generating or creating a schema, we will instead create
# a feature spec.  Since there are a fairly small number of features this is
# manageable for this dataset.
_FEATURE_SPEC = {
    **{
        feature: tf.io.FixedLenFeature(shape=[1], dtype=tf.float32)
        for feature in _FEATURE_KEYS
    }, _LABEL_KEY: tf.io.FixedLenFeature(shape=[1], dtype=tf.int64)
}


def _input_fn(file_pattern: List[str],
              data_accessor: tfx.components.DataAccessor,
              schema: schema_pb2.Schema,
              batch_size: int) -> tf.data.Dataset:
  """Generates features and label for training.

  Args:
    file_pattern: List of paths or patterns of input tfrecord files.
    data_accessor: DataAccessor for converting input to RecordBatch.
    schema: schema of the input data.
    batch_size: representing the number of consecutive elements of returned
      dataset to combine in a single batch

  Returns:
    A dataset that contains (features, indices) tuple where features is a
      dictionary of Tensors, and indices is a single Tensor of label indices.
  """
  return data_accessor.tf_dataset_factory(
      file_pattern,
      tfxio.TensorFlowDatasetOptions(
          batch_size=batch_size, label_key=_LABEL_KEY),
      schema=schema).repeat()


def _make_keras_model() -> tf.keras.Model:
  """Creates a DNN Keras model for classifying penguin data.

  Returns:
    A Keras Model.
  """
  # The model below is built with Functional API, please refer to
  # https://www.tensorflow.org/guide/keras/overview for all API options.
  inputs = [keras.layers.Input(shape=(1,), name=f) for f in _FEATURE_KEYS]
  d = keras.layers.concatenate(inputs)
  for _ in range(2):
    d = keras.layers.Dense(8, activation='relu')(d)
  outputs = keras.layers.Dense(3)(d)

  model = keras.Model(inputs=inputs, outputs=outputs)
  model.compile(
      optimizer=keras.optimizers.Adam(1e-2),
      loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
      metrics=[keras.metrics.SparseCategoricalAccuracy()])

  model.summary(print_fn=logging.info)
  return model


# NEW: Read `use_gpu` from the custom_config of the Trainer.
#      if it uses GPU, enable MirroredStrategy.
def _get_distribution_strategy(fn_args: tfx.components.FnArgs):
  if fn_args.custom_config.get('use_gpu', False):
    logging.info('Using MirroredStrategy with one GPU.')
    return tf.distribute.MirroredStrategy(devices=['device:GPU:0'])
  return None


# TFX Trainer will call this function.
def run_fn(fn_args: tfx.components.FnArgs):
  """Train the model based on given args.

  Args:
    fn_args: Holds args used to train the model as name/value pairs.
  """

  # This schema is usually either an output of SchemaGen or a manually-curated
  # version provided by pipeline author. A schema can also derived from TFT
  # graph if a Transform component is used. In the case when either is missing,
  # `schema_from_feature_spec` could be used to generate schema from very simple
  # feature_spec, but the schema returned would be very primitive.
  schema = schema_utils.schema_from_feature_spec(_FEATURE_SPEC)

  train_dataset = _input_fn(
      fn_args.train_files,
      fn_args.data_accessor,
      schema,
      batch_size=_TRAIN_BATCH_SIZE)
  eval_dataset = _input_fn(
      fn_args.eval_files,
      fn_args.data_accessor,
      schema,
      batch_size=_EVAL_BATCH_SIZE)

  # NEW: If we have a distribution strategy, build a model in a strategy scope.
  strategy = _get_distribution_strategy(fn_args)
  if strategy is None:
    model = _make_keras_model()
  else:
    with strategy.scope():
      model = _make_keras_model()

  model.fit(
      train_dataset,
      steps_per_epoch=fn_args.train_steps,
      validation_data=eval_dataset,
      validation_steps=fn_args.eval_steps)

  # The result of the training should be saved in `fn_args.serving_model_dir`
  # directory.
  model.save(fn_args.serving_model_dir, save_format='tf')
Writing penguin_trainer.py

モジュールファイルをパイプラインコンポーネントからアクセスできるGCSにコピーします。

それ以外の場合は、モジュールファイルを含むコンテナーイメージを作成し、そのイメージを使用してパイプラインおよびAIプラットフォームトレーニングジョブを実行することをお勧めします。

gsutil cp {_trainer_module_file} {MODULE_ROOT}/
InvalidUrlError: Cloud URL scheme should be followed by colon and two slashes: "://". Found: "gs:///pipeline_module/penguin-vertex-training/".

パイプライン定義を書く

TFXパイプラインを作成する関数を定義します。それはと同じ3つのコンポーネントがありシンプルTFXパイプラインのチュートリアルが、我々は使用TrainerPusher GCP拡張モジュール内のコンポーネントを。

tfx.extensions.google_cloud_ai_platform.Trainer 、通常のように振る舞うTrainerが、それは単にクラウドへのモデルのトレーニングのための計算を移動します。 Vertex AIトレーニングサービスでカスタムジョブを起動し、オーケストレーションシステムのトレーナーコンポーネントは、VertexAIトレーニングジョブが完了するまで待機します。

tfx.extensions.google_cloud_ai_platform.Pusher訓練されたモデルを使用して頂点AIモデルと頂点AIエンドポイントを作成します。

def _create_pipeline(pipeline_name: str, pipeline_root: str, data_root: str,
                     module_file: str, endpoint_name: str, project_id: str,
                     region: str, use_gpu: bool) -> tfx.dsl.Pipeline:
  """Implements the penguin pipeline with TFX."""
  # Brings data into the pipeline or otherwise joins/converts training data.
  example_gen = tfx.components.CsvExampleGen(input_base=data_root)

  # NEW: Configuration for Vertex AI Training.
  # This dictionary will be passed as `CustomJobSpec`.
  vertex_job_spec = {
      'project': project_id,
      'worker_pool_specs': [{
          'machine_spec': {
              'machine_type': 'n1-standard-4',
          },
          'replica_count': 1,
          'container_spec': {
              'image_uri': 'gcr.io/tfx-oss-public/tfx:{}'.format(tfx.__version__),
          },
      }],
  }
  if use_gpu:
    # See https://cloud.google.com/vertex-ai/docs/reference/rest/v1/MachineSpec#acceleratortype
    # for available machine types.
    vertex_job_spec['worker_pool_specs'][0]['machine_spec'].update({
        'accelerator_type': 'NVIDIA_TESLA_K80',
        'accelerator_count': 1
    })

  # Trains a model using Vertex AI Training.
  # NEW: We need to specify a Trainer for GCP with related configs.
  trainer = tfx.extensions.google_cloud_ai_platform.Trainer(
      module_file=module_file,
      examples=example_gen.outputs['examples'],
      train_args=tfx.proto.TrainArgs(num_steps=100),
      eval_args=tfx.proto.EvalArgs(num_steps=5),
      custom_config={
          tfx.extensions.google_cloud_ai_platform.ENABLE_UCAIP_KEY:
              True,
          tfx.extensions.google_cloud_ai_platform.UCAIP_REGION_KEY:
              region,
          tfx.extensions.google_cloud_ai_platform.TRAINING_ARGS_KEY:
              vertex_job_spec,
          'use_gpu':
              use_gpu,
      })

  # NEW: Configuration for pusher.
  vertex_serving_spec = {
      'project_id': project_id,
      'endpoint_name': endpoint_name,
      # Remaining argument is passed to aiplatform.Model.deploy()
      # See https://cloud.google.com/vertex-ai/docs/predictions/deploy-model-api#deploy_the_model
      # for the detail.
      #
      # Machine type is the compute resource to serve prediction requests.
      # See https://cloud.google.com/vertex-ai/docs/predictions/configure-compute#machine-types
      # for available machine types and acccerators.
      'machine_type': 'n1-standard-4',
  }

  # Vertex AI provides pre-built containers with various configurations for
  # serving.
  # See https://cloud.google.com/vertex-ai/docs/predictions/pre-built-containers
  # for available container images.
  serving_image = 'us-docker.pkg.dev/vertex-ai/prediction/tf2-cpu.2-6:latest'
  if use_gpu:
    vertex_serving_spec.update({
        'accelerator_type': 'NVIDIA_TESLA_K80',
        'accelerator_count': 1
    })
    serving_image = 'us-docker.pkg.dev/vertex-ai/prediction/tf2-gpu.2-6:latest'

  # NEW: Pushes the model to Vertex AI.
  pusher = tfx.extensions.google_cloud_ai_platform.Pusher(
      model=trainer.outputs['model'],
      custom_config={
          tfx.extensions.google_cloud_ai_platform.ENABLE_VERTEX_KEY:
              True,
          tfx.extensions.google_cloud_ai_platform.VERTEX_REGION_KEY:
              region,
          tfx.extensions.google_cloud_ai_platform.VERTEX_CONTAINER_IMAGE_URI_KEY:
              serving_image,
          tfx.extensions.google_cloud_ai_platform.SERVING_ARGS_KEY:
            vertex_serving_spec,
      })

  components = [
      example_gen,
      trainer,
      pusher,
  ]

  return tfx.dsl.Pipeline(
      pipeline_name=pipeline_name,
      pipeline_root=pipeline_root,
      components=components)

頂点パイプラインでパイプラインを実行します。

私たちは、私たちが行ったようにパイプラインを実行するために、頂点パイプラインを使用しますバーテックスパイプラインのチュートリアル用のシンプルなTFXのパイプライン

import os

PIPELINE_DEFINITION_FILE = PIPELINE_NAME + '_pipeline.json'

runner = tfx.orchestration.experimental.KubeflowV2DagRunner(
    config=tfx.orchestration.experimental.KubeflowV2DagRunnerConfig(),
    output_filename=PIPELINE_DEFINITION_FILE)
_ = runner.run(
    _create_pipeline(
        pipeline_name=PIPELINE_NAME,
        pipeline_root=PIPELINE_ROOT,
        data_root=DATA_ROOT,
        module_file=os.path.join(MODULE_ROOT, _trainer_module_file),
        endpoint_name=ENDPOINT_NAME,
        project_id=GOOGLE_CLOUD_PROJECT,
        region=GOOGLE_CLOUD_REGION,
        # We will use CPUs only for now.
        use_gpu=False))

生成された定義ファイルがでGoogleクラウドaiplatformクライアントを使用して提出することができるgoogle-cloud-aiplatformパッケージ。

# docs_infra: no_execute
from google.cloud import aiplatform
from google.cloud.aiplatform import pipeline_jobs

aiplatform.init(project=GOOGLE_CLOUD_PROJECT, location=GOOGLE_CLOUD_REGION)

job = pipeline_jobs.PipelineJob(template_path=PIPELINE_DEFINITION_FILE,
                                display_name=PIPELINE_NAME)
job.run(sync=False)

今、あなたは訪問することができます「頂点AI>パイプライン」Google Cloud Console進行状況を確認します。

予測リクエストでテストする

パイプラインが完了したら、「頂点AI>エンドポイント」内のエンドポイントの1つで展開モデルを見つけるでしょう。新しいエンドポイントに予測リクエストを送信するには、エンドポイントのIDを知る必要があります。これは、我々が上記の入力されたエンドポイント名と異なっています。あなたがでIDを見つけることができますエンドポイントのページGoogle Cloud Console 、それは非常に長い数のように見えます。

実行する前に、以下のENDPOINT_IDを設定してください。

ENDPOINT_ID=''     # <--- ENTER THIS
if not ENDPOINT_ID:
    from absl import logging
    logging.error('Please set the endpoint id.')
ERROR:absl:Please set the endpoint id.

同じaiplatformクライアントを使用して、エンドポイントにリクエストを送信します。ペンギン種分類の予測リクエストを送信します。入力は使用した4つの特徴であり、モデルは種ごとに1つの値を出力するため、モデルは3つの値を返します。

たとえば、次の特定の例では、インデックス「2」に最大の値があり、「2」と出力されます。

# docs_infra: no_execute
import numpy as np

# The AI Platform services require regional API endpoints.
client_options = {
    'api_endpoint': GOOGLE_CLOUD_REGION + '-aiplatform.googleapis.com'
    }
# Initialize client that will be used to create and send requests.
client = aiplatform.gapic.PredictionServiceClient(client_options=client_options)

# Set data values for the prediction request.
# Our model expects 4 feature inputs and produces 3 output values for each
# species. Note that the output is logit value rather than probabilities.
# See the model code to understand input / output structure.
instances = [{
    'culmen_length_mm':[0.71],
    'culmen_depth_mm':[0.38],
    'flipper_length_mm':[0.98],
    'body_mass_g': [0.78],
}]

endpoint = client.endpoint_path(
    project=GOOGLE_CLOUD_PROJECT,
    location=GOOGLE_CLOUD_REGION,
    endpoint=ENDPOINT_ID,
)
# Send a prediction request and get response.
response = client.predict(endpoint=endpoint, instances=instances)

# Uses argmax to find the index of the maximum value.
print('species:', np.argmax(response.predictions[0]))

オンライン予測の詳細については、下記をご覧くださいエンドポイントのページGoogle Cloud Console 。サンプルリクエストの送信に関するガイドと、より多くのリソースへのリンクを見つけることができます。

GPUを使用してパイプラインを実行する

Vertex AIは、GPUのサポートを含むさまざまなマシンタイプを使用したトレーニングをサポートします。参照してください。マシンスペック参照可能なオプションのために。

GPUトレーニングをサポートするパイプラインをすでに定義しました。私たちがやらなければならないことは、設定されたuse_gpuフラグをTrueに。その後、パイプラインは1 NVIDIA_TESLA_K80含むマシンスペックを使用して作成され、我々のモデルのトレーニングコードが使用されますtf.distribute.MirroredStrategy

注意use_gpuフラグが頂点またはTFX APIの一部ではありません。このチュートリアルでは、トレーニングコードを制御するために使用されています。

# docs_infra: no_execute
runner.run(
    _create_pipeline(
        pipeline_name=PIPELINE_NAME,
        pipeline_root=PIPELINE_ROOT,
        data_root=DATA_ROOT,
        module_file=os.path.join(MODULE_ROOT, _trainer_module_file),
        endpoint_name=ENDPOINT_NAME,
        project_id=GOOGLE_CLOUD_PROJECT,
        region=GOOGLE_CLOUD_REGION,
        # Updated: Use GPUs. We will use a NVIDIA_TESLA_K80 and 
        # the model code will use tf.distribute.MirroredStrategy.
        use_gpu=True))

job = pipeline_jobs.PipelineJob(template_path=PIPELINE_DEFINITION_FILE,
                                display_name=PIPELINE_NAME)
job.run(sync=False)

今、あなたは訪問することができます「頂点AI>パイプライン」Google Cloud Console進行状況を確認します。

清掃

このチュートリアルでは、VertexAIモデルとエンドポイントを作成しました。行くことによって、不要な費用を避けるために、これらのリソースを削除してくださいエンドポイントと最初のエンドポイントからのモデルのアンデプロイ。次に、エンドポイントとモデルを別々に削除できます。