Vertex AI Formation et service avec TFX et Vertex Pipelines

Ce didacticiel basé sur un bloc-notes créera et exécutera un pipeline TFX qui forme un modèle ML à l'aide du service Vertex AI Training et le publie sur Vertex AI pour diffusion.

Ce portable est basé sur le pipeline TFX nous avons construit en simple TFX Pipeline pour Vertex Pipelines Tutorial . Si vous n'avez pas encore lu ce didacticiel, vous devriez le lire avant de continuer avec ce bloc-notes.

Vous pouvez entraîner des modèles sur Vertex AI à l'aide d'AutoML ou utiliser un entraînement personnalisé. Dans la formation personnalisée, vous pouvez sélectionner de nombreux types de machines différents pour alimenter vos tâches de formation, activer la formation distribuée, utiliser le réglage des hyperparamètres et accélérer avec les GPU.

Vous pouvez également répondre aux demandes de prédiction en déployant le modèle formé sur les modèles Vertex AI et en créant un point de terminaison.

Dans ce didacticiel, nous utiliserons Vertex AI Training avec des tâches personnalisées pour entraîner un modèle dans un pipeline TFX. Nous déploierons également le modèle pour répondre à la demande de prédiction à l'aide de Vertex AI.

Ce portable est destiné à être exécuté sur Google Colab ou sur Notebooks plate - forme AI . Si vous n'en utilisez pas, vous pouvez simplement cliquer sur le bouton "Exécuter dans Google Colab" ci-dessus.

Mettre en place

Si vous avez terminé Simple TFX Pipeline pour Vertex Pipelines Tutorial , vous avez un projet GCP de travail et un seau de GCS et qui est tout ce que nous avons besoin pour ce tutoriel. Veuillez d'abord lire le didacticiel préliminaire si vous l'avez manqué.

Installer des packages python

Nous installerons les packages Python requis, notamment TFX et KFP, pour créer des pipelines ML et soumettre des travaux à Vertex Pipelines.

# Use the latest version of pip.
pip install --upgrade pip
pip install --upgrade "tfx[kfp]<2"

As-tu redémarré le runtime ?

Si vous utilisez Google Colab, la première fois que vous exécutez la cellule ci-dessus, vous devez redémarrer le runtime en cliquant au-dessus du bouton "RESTART RUNTIME" ou en utilisant le menu "Runtime> Restart runtime ...". Cela est dû à la façon dont Colab charge les packages.

Si vous n'êtes pas sur Colab, vous pouvez redémarrer le runtime avec la cellule suivante.

# docs_infra: no_execute
import sys
if not 'google.colab' in sys.modules:
  # Automatically restart kernel after installs
  import IPython
  app = IPython.Application.instance()
  app.kernel.do_shutdown(True)

Connectez-vous à Google pour ce bloc-notes

Si vous exécutez ce notebook sur Colab, authentifiez-vous avec votre compte utilisateur :

import sys
if 'google.colab' in sys.modules:
  from google.colab import auth
  auth.authenticate_user()

Si vous êtes sur la plate - forme IA Cahiers, carnets, avec Authentifier Google Cloud avant d' exécuter la section suivante, en exécutant

gcloud auth login

dans la fenêtre Terminal (que vous pouvez ouvrir via Fichier> Nouveau dans le menu). Vous n'avez besoin de le faire qu'une seule fois par instance de bloc-notes.

Vérifiez les versions des packages.

import tensorflow as tf
print('TensorFlow version: {}'.format(tf.__version__))
from tfx import v1 as tfx
print('TFX version: {}'.format(tfx.__version__))
import kfp
print('KFP version: {}'.format(kfp.__version__))
TensorFlow version: 2.6.2
TFX version: 1.4.0
KFP version: 1.8.1

Configurer des variables

Nous allons configurer certaines variables utilisées pour personnaliser les pipelines ci-dessous. Les informations suivantes sont requises :

Entrez les valeurs requises dans la cellule ci - dessous avant de l' exécuter.

GOOGLE_CLOUD_PROJECT = ''     # <--- ENTER THIS
GOOGLE_CLOUD_REGION = ''      # <--- ENTER THIS
GCS_BUCKET_NAME = ''          # <--- ENTER THIS

if not (GOOGLE_CLOUD_PROJECT and GOOGLE_CLOUD_REGION and GCS_BUCKET_NAME):
    from absl import logging
    logging.error('Please set all required parameters.')
ERROR:absl:Please set all required parameters.

Set gcloud d'utiliser votre projet.

gcloud config set project {GOOGLE_CLOUD_PROJECT}
ERROR: (gcloud.config.set) argument VALUE: Must be specified.
Usage: gcloud config set SECTION/PROPERTY VALUE [optional flags]
  optional flags may be  --help | --installation

For detailed information on this command and its flags, run:
  gcloud config set --help
PIPELINE_NAME = 'penguin-vertex-training'

# Path to various pipeline artifact.
PIPELINE_ROOT = 'gs://{}/pipeline_root/{}'.format(GCS_BUCKET_NAME, PIPELINE_NAME)

# Paths for users' Python module.
MODULE_ROOT = 'gs://{}/pipeline_module/{}'.format(GCS_BUCKET_NAME, PIPELINE_NAME)

# Paths for users' data.
DATA_ROOT = 'gs://{}/data/{}'.format(GCS_BUCKET_NAME, PIPELINE_NAME)

# Name of Vertex AI Endpoint.
ENDPOINT_NAME = 'prediction-' + PIPELINE_NAME

print('PIPELINE_ROOT: {}'.format(PIPELINE_ROOT))
PIPELINE_ROOT: gs:///pipeline_root/penguin-vertex-training

Préparer des exemples de données

Nous allons utiliser le même ensemble de données Palmer Penguins comme Simple TFX Pipeline Tutorial .

Il y a quatre caractéristiques numériques dans cet ensemble de données qui ont déjà été normalisées pour avoir une plage [0,1]. Nous allons construire un modèle de classification qui prédit les species de manchots.

Nous devons faire notre propre copie de l'ensemble de données. Étant donné que TFX ExampleGen lit les entrées d'un répertoire, nous devons créer un répertoire et y copier l'ensemble de données sur GCS.

gsutil cp gs://download.tensorflow.org/data/palmer_penguins/penguins_processed.csv {DATA_ROOT}/
InvalidUrlError: Cloud URL scheme should be followed by colon and two slashes: "://". Found: "gs:///data/penguin-vertex-training/".

Jetez un coup d'œil au fichier CSV.

gsutil cat {DATA_ROOT}/penguins_processed.csv | head
InvalidUrlError: Cloud URL scheme should be followed by colon and two slashes: "://". Found: "gs:///data/penguin-vertex-training/penguins_processed.csv".

Créer un pipeline

Notre pipeline sera très similaire à la conduite que nous avons créé en simple TFX Pipeline pour Vertex Pipelines Tutorial . Le pipeline se composera de trois composants, CsvExampleGen, Trainer et Pusher. Mais nous utiliserons un composant spécial Trainer et Pusher. Le composant Trainer déplacera les charges de travail de formation vers Vertex AI, et le composant Pusher publiera le modèle ML formé vers Vertex AI au lieu d'un système de fichiers.

TFX offre un spécial Trainer pour soumettre des travaux de formation Vertex AI Formation continue. Tout ce que nous devons faire est d' utiliser Trainer dans le module d'extension à la place du standard Trainer composant ainsi que certains paramètres de GCP nécessaires.

Dans ce didacticiel, nous exécuterons les tâches Vertex AI Training uniquement en utilisant d'abord des CPU, puis avec un GPU.

TFX fournit également un spécial Pusher pour télécharger le modèle aux modèles Vertex AI. Pusher créera Vertex AI ressources Endpoint pour servir perdictions en ligne, aussi. Voir la documentation Vertex AI pour en savoir plus sur les prévisions en ligne fournies par Vertex AI.

Écrire le code du modèle.

Le modèle lui - même est presque similaire au modèle simple TFX Pipeline Tutorial .

Nous ajouterons _get_distribution_strategy() fonction qui crée une stratégie de distribution de tensorflow et il est utilisé dans run_fn d'utiliser MirroredStrategy si le GPU est disponible.

_trainer_module_file = 'penguin_trainer.py'
%%writefile {_trainer_module_file}

# Copied from https://www.tensorflow.org/tfx/tutorials/tfx/penguin_simple and
# slightly modified run_fn() to add distribution_strategy.

from typing import List
from absl import logging
import tensorflow as tf
from tensorflow import keras
from tensorflow_metadata.proto.v0 import schema_pb2
from tensorflow_transform.tf_metadata import schema_utils

from tfx import v1 as tfx
from tfx_bsl.public import tfxio

_FEATURE_KEYS = [
    'culmen_length_mm', 'culmen_depth_mm', 'flipper_length_mm', 'body_mass_g'
]
_LABEL_KEY = 'species'

_TRAIN_BATCH_SIZE = 20
_EVAL_BATCH_SIZE = 10

# Since we're not generating or creating a schema, we will instead create
# a feature spec.  Since there are a fairly small number of features this is
# manageable for this dataset.
_FEATURE_SPEC = {
    **{
        feature: tf.io.FixedLenFeature(shape=[1], dtype=tf.float32)
        for feature in _FEATURE_KEYS
    }, _LABEL_KEY: tf.io.FixedLenFeature(shape=[1], dtype=tf.int64)
}


def _input_fn(file_pattern: List[str],
              data_accessor: tfx.components.DataAccessor,
              schema: schema_pb2.Schema,
              batch_size: int) -> tf.data.Dataset:
  """Generates features and label for training.

  Args:
    file_pattern: List of paths or patterns of input tfrecord files.
    data_accessor: DataAccessor for converting input to RecordBatch.
    schema: schema of the input data.
    batch_size: representing the number of consecutive elements of returned
      dataset to combine in a single batch

  Returns:
    A dataset that contains (features, indices) tuple where features is a
      dictionary of Tensors, and indices is a single Tensor of label indices.
  """
  return data_accessor.tf_dataset_factory(
      file_pattern,
      tfxio.TensorFlowDatasetOptions(
          batch_size=batch_size, label_key=_LABEL_KEY),
      schema=schema).repeat()


def _make_keras_model() -> tf.keras.Model:
  """Creates a DNN Keras model for classifying penguin data.

  Returns:
    A Keras Model.
  """
  # The model below is built with Functional API, please refer to
  # https://www.tensorflow.org/guide/keras/overview for all API options.
  inputs = [keras.layers.Input(shape=(1,), name=f) for f in _FEATURE_KEYS]
  d = keras.layers.concatenate(inputs)
  for _ in range(2):
    d = keras.layers.Dense(8, activation='relu')(d)
  outputs = keras.layers.Dense(3)(d)

  model = keras.Model(inputs=inputs, outputs=outputs)
  model.compile(
      optimizer=keras.optimizers.Adam(1e-2),
      loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
      metrics=[keras.metrics.SparseCategoricalAccuracy()])

  model.summary(print_fn=logging.info)
  return model


# NEW: Read `use_gpu` from the custom_config of the Trainer.
#      if it uses GPU, enable MirroredStrategy.
def _get_distribution_strategy(fn_args: tfx.components.FnArgs):
  if fn_args.custom_config.get('use_gpu', False):
    logging.info('Using MirroredStrategy with one GPU.')
    return tf.distribute.MirroredStrategy(devices=['device:GPU:0'])
  return None


# TFX Trainer will call this function.
def run_fn(fn_args: tfx.components.FnArgs):
  """Train the model based on given args.

  Args:
    fn_args: Holds args used to train the model as name/value pairs.
  """

  # This schema is usually either an output of SchemaGen or a manually-curated
  # version provided by pipeline author. A schema can also derived from TFT
  # graph if a Transform component is used. In the case when either is missing,
  # `schema_from_feature_spec` could be used to generate schema from very simple
  # feature_spec, but the schema returned would be very primitive.
  schema = schema_utils.schema_from_feature_spec(_FEATURE_SPEC)

  train_dataset = _input_fn(
      fn_args.train_files,
      fn_args.data_accessor,
      schema,
      batch_size=_TRAIN_BATCH_SIZE)
  eval_dataset = _input_fn(
      fn_args.eval_files,
      fn_args.data_accessor,
      schema,
      batch_size=_EVAL_BATCH_SIZE)

  # NEW: If we have a distribution strategy, build a model in a strategy scope.
  strategy = _get_distribution_strategy(fn_args)
  if strategy is None:
    model = _make_keras_model()
  else:
    with strategy.scope():
      model = _make_keras_model()

  model.fit(
      train_dataset,
      steps_per_epoch=fn_args.train_steps,
      validation_data=eval_dataset,
      validation_steps=fn_args.eval_steps)

  # The result of the training should be saved in `fn_args.serving_model_dir`
  # directory.
  model.save(fn_args.serving_model_dir, save_format='tf')
Writing penguin_trainer.py

Copiez le fichier du module dans GCS, accessible à partir des composants du pipeline.

Sinon, vous souhaiterez peut-être créer une image de conteneur comprenant le fichier de module et utiliser l'image pour exécuter le pipeline et les tâches AI Platform Training.

gsutil cp {_trainer_module_file} {MODULE_ROOT}/
InvalidUrlError: Cloud URL scheme should be followed by colon and two slashes: "://". Found: "gs:///pipeline_module/penguin-vertex-training/".

Écrire une définition de pipeline

Nous allons définir une fonction pour créer un pipeline TFX. Il a les mêmes trois composants que dans simple TFX Pipeline Tutorial , mais nous utilisons un Trainer et Pusher composant dans le module d'extension de GCP.

tfx.extensions.google_cloud_ai_platform.Trainer se comporte comme un régulier Trainer , mais il se déplace simplement le calcul de la formation du modèle cloud. Il lance une tâche personnalisée dans le service Vertex AI Training et le composant formateur du système d'orchestration attendra simplement la fin de la tâche Vertex AI Training.

tfx.extensions.google_cloud_ai_platform.Pusher crée un Vertex AI modèle et Vertex AI Endpoint en utilisant le modèle formé.

def _create_pipeline(pipeline_name: str, pipeline_root: str, data_root: str,
                     module_file: str, endpoint_name: str, project_id: str,
                     region: str, use_gpu: bool) -> tfx.dsl.Pipeline:
  """Implements the penguin pipeline with TFX."""
  # Brings data into the pipeline or otherwise joins/converts training data.
  example_gen = tfx.components.CsvExampleGen(input_base=data_root)

  # NEW: Configuration for Vertex AI Training.
  # This dictionary will be passed as `CustomJobSpec`.
  vertex_job_spec = {
      'project': project_id,
      'worker_pool_specs': [{
          'machine_spec': {
              'machine_type': 'n1-standard-4',
          },
          'replica_count': 1,
          'container_spec': {
              'image_uri': 'gcr.io/tfx-oss-public/tfx:{}'.format(tfx.__version__),
          },
      }],
  }
  if use_gpu:
    # See https://cloud.google.com/vertex-ai/docs/reference/rest/v1/MachineSpec#acceleratortype
    # for available machine types.
    vertex_job_spec['worker_pool_specs'][0]['machine_spec'].update({
        'accelerator_type': 'NVIDIA_TESLA_K80',
        'accelerator_count': 1
    })

  # Trains a model using Vertex AI Training.
  # NEW: We need to specify a Trainer for GCP with related configs.
  trainer = tfx.extensions.google_cloud_ai_platform.Trainer(
      module_file=module_file,
      examples=example_gen.outputs['examples'],
      train_args=tfx.proto.TrainArgs(num_steps=100),
      eval_args=tfx.proto.EvalArgs(num_steps=5),
      custom_config={
          tfx.extensions.google_cloud_ai_platform.ENABLE_UCAIP_KEY:
              True,
          tfx.extensions.google_cloud_ai_platform.UCAIP_REGION_KEY:
              region,
          tfx.extensions.google_cloud_ai_platform.TRAINING_ARGS_KEY:
              vertex_job_spec,
          'use_gpu':
              use_gpu,
      })

  # NEW: Configuration for pusher.
  vertex_serving_spec = {
      'project_id': project_id,
      'endpoint_name': endpoint_name,
      # Remaining argument is passed to aiplatform.Model.deploy()
      # See https://cloud.google.com/vertex-ai/docs/predictions/deploy-model-api#deploy_the_model
      # for the detail.
      #
      # Machine type is the compute resource to serve prediction requests.
      # See https://cloud.google.com/vertex-ai/docs/predictions/configure-compute#machine-types
      # for available machine types and acccerators.
      'machine_type': 'n1-standard-4',
  }

  # Vertex AI provides pre-built containers with various configurations for
  # serving.
  # See https://cloud.google.com/vertex-ai/docs/predictions/pre-built-containers
  # for available container images.
  serving_image = 'us-docker.pkg.dev/vertex-ai/prediction/tf2-cpu.2-6:latest'
  if use_gpu:
    vertex_serving_spec.update({
        'accelerator_type': 'NVIDIA_TESLA_K80',
        'accelerator_count': 1
    })
    serving_image = 'us-docker.pkg.dev/vertex-ai/prediction/tf2-gpu.2-6:latest'

  # NEW: Pushes the model to Vertex AI.
  pusher = tfx.extensions.google_cloud_ai_platform.Pusher(
      model=trainer.outputs['model'],
      custom_config={
          tfx.extensions.google_cloud_ai_platform.ENABLE_VERTEX_KEY:
              True,
          tfx.extensions.google_cloud_ai_platform.VERTEX_REGION_KEY:
              region,
          tfx.extensions.google_cloud_ai_platform.VERTEX_CONTAINER_IMAGE_URI_KEY:
              serving_image,
          tfx.extensions.google_cloud_ai_platform.SERVING_ARGS_KEY:
            vertex_serving_spec,
      })

  components = [
      example_gen,
      trainer,
      pusher,
  ]

  return tfx.dsl.Pipeline(
      pipeline_name=pipeline_name,
      pipeline_root=pipeline_root,
      components=components)

Exécutez le pipeline sur Vertex Pipelines.

Nous utiliserons Vertex Pipelines pour exécuter le pipeline comme nous l' avons fait en simple TFX Pipeline pour Vertex Pipelines Tutorial .

import os

PIPELINE_DEFINITION_FILE = PIPELINE_NAME + '_pipeline.json'

runner = tfx.orchestration.experimental.KubeflowV2DagRunner(
    config=tfx.orchestration.experimental.KubeflowV2DagRunnerConfig(),
    output_filename=PIPELINE_DEFINITION_FILE)
_ = runner.run(
    _create_pipeline(
        pipeline_name=PIPELINE_NAME,
        pipeline_root=PIPELINE_ROOT,
        data_root=DATA_ROOT,
        module_file=os.path.join(MODULE_ROOT, _trainer_module_file),
        endpoint_name=ENDPOINT_NAME,
        project_id=GOOGLE_CLOUD_PROJECT,
        region=GOOGLE_CLOUD_REGION,
        # We will use CPUs only for now.
        use_gpu=False))

Le fichier de définition généré peut être transmis en utilisant Google Cloud client aiplatform dans google-cloud-aiplatform package.

# docs_infra: no_execute
from google.cloud import aiplatform
from google.cloud.aiplatform import pipeline_jobs

aiplatform.init(project=GOOGLE_CLOUD_PROJECT, location=GOOGLE_CLOUD_REGION)

job = pipeline_jobs.PipelineJob(template_path=PIPELINE_DEFINITION_FILE,
                                display_name=PIPELINE_NAME)
job.run(sync=False)

Maintenant , vous pouvez visiter Vertex AI> Pipelines ' dans Google Cloud Console pour voir les progrès.

Tester avec une requête de prédiction

Une fois que le pipeline finalise, vous trouverez un modèle déployé à l'une des extrémités dans Vertex AI> Endpoints. Nous devons connaître l'identifiant du point de terminaison pour envoyer une demande de prédiction au nouveau point de terminaison. Ceci est différent du nom du point final nous sommes entrés ci - dessus. Vous pouvez trouver l'identifiant à la page par Google Cloud Console Endpoints dans Google Cloud Console , il ressemble à un nombre très long.

Définissez ENDPOINT_ID ci-dessous avant de l'exécuter.

ENDPOINT_ID=''     # <--- ENTER THIS
if not ENDPOINT_ID:
    from absl import logging
    logging.error('Please set the endpoint id.')
ERROR:absl:Please set the endpoint id.

Nous utilisons le même client aiplatform pour envoyer une demande au point de terminaison. Nous enverrons une demande de prédiction pour la classification des espèces de pingouins. L'entrée correspond aux quatre caractéristiques que nous avons utilisées et le modèle renverra trois valeurs, car notre modèle génère une valeur pour chaque espèce.

Par exemple, l'exemple spécifique suivant a la plus grande valeur à l'index « 2 » et affichera « 2 ».

# docs_infra: no_execute
import numpy as np

# The AI Platform services require regional API endpoints.
client_options = {
    'api_endpoint': GOOGLE_CLOUD_REGION + '-aiplatform.googleapis.com'
    }
# Initialize client that will be used to create and send requests.
client = aiplatform.gapic.PredictionServiceClient(client_options=client_options)

# Set data values for the prediction request.
# Our model expects 4 feature inputs and produces 3 output values for each
# species. Note that the output is logit value rather than probabilities.
# See the model code to understand input / output structure.
instances = [{
    'culmen_length_mm':[0.71],
    'culmen_depth_mm':[0.38],
    'flipper_length_mm':[0.98],
    'body_mass_g': [0.78],
}]

endpoint = client.endpoint_path(
    project=GOOGLE_CLOUD_PROJECT,
    location=GOOGLE_CLOUD_REGION,
    endpoint=ENDPOINT_ID,
)
# Send a prediction request and get response.
response = client.predict(endpoint=endpoint, instances=instances)

# Uses argmax to find the index of the maximum value.
print('species:', np.argmax(response.predictions[0]))

Pour des informations détaillées sur la prévision en ligne, s'il vous plaît visitez la page par Google Cloud Console Endpoints dans Google Cloud Console . vous pouvez trouver un guide sur l'envoi d'échantillons de demandes et des liens vers d'autres ressources.

Exécuter le pipeline à l'aide d'un GPU

Vertex AI prend en charge la formation à l'aide de divers types de machines, y compris la prise en charge des GPU. Voir la référence de la machine pour les options disponibles.

Nous avons déjà défini notre pipeline pour prendre en charge la formation GPU. Tout ce que nous devons faire est de mettre use_gpu drapeau sur True. Ensuite , un pipeline sera créé avec une spécification de la machine dont un NVIDIA_TESLA_K80 et notre code de formation de modèle utilisera tf.distribute.MirroredStrategy .

Notez que use_gpu drapeau ne fait pas partie de l'API Vertex ou TFX. Il est juste utilisé pour contrôler le code d'entraînement dans ce tutoriel.

# docs_infra: no_execute
runner.run(
    _create_pipeline(
        pipeline_name=PIPELINE_NAME,
        pipeline_root=PIPELINE_ROOT,
        data_root=DATA_ROOT,
        module_file=os.path.join(MODULE_ROOT, _trainer_module_file),
        endpoint_name=ENDPOINT_NAME,
        project_id=GOOGLE_CLOUD_PROJECT,
        region=GOOGLE_CLOUD_REGION,
        # Updated: Use GPUs. We will use a NVIDIA_TESLA_K80 and 
        # the model code will use tf.distribute.MirroredStrategy.
        use_gpu=True))

job = pipeline_jobs.PipelineJob(template_path=PIPELINE_DEFINITION_FILE,
                                display_name=PIPELINE_NAME)
job.run(sync=False)

Maintenant , vous pouvez visiter Vertex AI> Pipelines ' dans Google Cloud Console pour voir les progrès.

Nettoyer

Vous avez créé un modèle Vertex AI et un point de terminaison dans ce didacticiel. S'il vous plaît supprimer ces ressources pour éviter des frais inutiles en allant Endpoints et le modèle de l' annulation du déploiement du premier critère d' évaluation. Ensuite, vous pouvez supprimer le point de terminaison et le modèle séparément.