Vertex AI Training and Serving with TFX and Vertex Pipelines

Este tutorial baseado em notebook criará e executará um pipeline TFX que treina um modelo de ML usando o serviço Vertex AI Training e o publica no Vertex AI para servir.

Este notebook é baseado no gasoduto TFX nós construímos em simples TFX Pipeline para Vertex Pipelines Tutorial . Se você ainda não leu esse tutorial, deve lê-lo antes de prosseguir com este bloco de notas.

Você pode treinar modelos no Vertex AI usando AutoML ou usar o treinamento personalizado. No treinamento personalizado, você pode selecionar muitos tipos de máquina diferentes para impulsionar seus jobs de treinamento, habilitar o treinamento distribuído, usar o ajuste de hiperparâmetros e acelerar com GPUs.

Você também pode atender às solicitações de previsão implantando o modelo treinado nos modelos Vertex AI e criando um endpoint.

Neste tutorial, usaremos o Vertex AI Training com tarefas personalizadas para treinar um modelo em um pipeline TFX. Também implantaremos o modelo para atender à solicitação de predição usando Vertex AI.

Este notebook foi concebido para ser executado em Google Colab ou no AI Plataforma Notebooks . Se você não estiver usando um desses, basta clicar no botão "Executar no Google Colab" acima.

Configurar

Se você tiver concluído simples TFX Pipeline para Vertex Pipelines Tutorial , você terá um projeto GCP trabalhando e um balde GCS e isso é tudo que precisamos para este tutorial. Por favor, leia o tutorial preliminar primeiro, se você o perdeu.

Instale pacotes python

Instalaremos os pacotes Python necessários, incluindo TFX e KFP, para criar pipelines de ML e enviar trabalhos para Vertex Pipelines.

# Use the latest version of pip.
pip install --upgrade pip
pip install --upgrade "tfx[kfp]<2"

Você reiniciou o tempo de execução?

Se você estiver usando o Google Colab, na primeira vez que executar a célula acima, você deve reiniciar o tempo de execução clicando acima do botão "RESTART RUNTIME" ou usando o menu "Runtime> Restart runtime ...". Isso ocorre devido à maneira como o Colab carrega os pacotes.

Se você não estiver no Colab, pode reiniciar o tempo de execução com a célula a seguir.

# docs_infra: no_execute
import sys
if not 'google.colab' in sys.modules:
  # Automatically restart kernel after installs
  import IPython
  app = IPython.Application.instance()
  app.kernel.do_shutdown(True)

Faça login no Google para este notebook

Se você estiver executando este notebook no Colab, autentique-se com sua conta de usuário:

import sys
if 'google.colab' in sys.modules:
  from google.colab import auth
  auth.authenticate_user()

Se você estiver em AI Plataforma Notebooks, autenticar com o Google Cloud antes de executar a próxima seção, executando

gcloud auth login

na janela Terminal (que pode ser aberto através de File> New no menu). Você só precisa fazer isso uma vez por instância de notebook.

Verifique as versões do pacote.

import tensorflow as tf
print('TensorFlow version: {}'.format(tf.__version__))
from tfx import v1 as tfx
print('TFX version: {}'.format(tfx.__version__))
import kfp
print('KFP version: {}'.format(kfp.__version__))
TensorFlow version: 2.6.2
TFX version: 1.4.0
KFP version: 1.8.1

Configurar variáveis

Vamos configurar algumas variáveis ​​usadas para personalizar os pipelines abaixo. As seguintes informações são necessárias:

  • ID do projeto GCP. Ver Identificar o seu ID de projeto .
  • Região do GCP para executar pipelines. Para mais informações sobre as regiões que Vertex Pipelines está disponível em, consulte o guia de locais Vertex AI .
  • Bucket do Google Cloud Storage para armazenar saídas de pipeline.

Insira os valores necessários na célula abaixo antes de executá-lo.

GOOGLE_CLOUD_PROJECT = ''     # <--- ENTER THIS
GOOGLE_CLOUD_REGION = ''      # <--- ENTER THIS
GCS_BUCKET_NAME = ''          # <--- ENTER THIS

if not (GOOGLE_CLOUD_PROJECT and GOOGLE_CLOUD_REGION and GCS_BUCKET_NAME):
    from absl import logging
    logging.error('Please set all required parameters.')
ERROR:absl:Please set all required parameters.

Set gcloud usar seu projeto.

gcloud config set project {GOOGLE_CLOUD_PROJECT}
ERROR: (gcloud.config.set) argument VALUE: Must be specified.
Usage: gcloud config set SECTION/PROPERTY VALUE [optional flags]
  optional flags may be  --help | --installation

For detailed information on this command and its flags, run:
  gcloud config set --help
PIPELINE_NAME = 'penguin-vertex-training'

# Path to various pipeline artifact.
PIPELINE_ROOT = 'gs://{}/pipeline_root/{}'.format(GCS_BUCKET_NAME, PIPELINE_NAME)

# Paths for users' Python module.
MODULE_ROOT = 'gs://{}/pipeline_module/{}'.format(GCS_BUCKET_NAME, PIPELINE_NAME)

# Paths for users' data.
DATA_ROOT = 'gs://{}/data/{}'.format(GCS_BUCKET_NAME, PIPELINE_NAME)

# Name of Vertex AI Endpoint.
ENDPOINT_NAME = 'prediction-' + PIPELINE_NAME

print('PIPELINE_ROOT: {}'.format(PIPELINE_ROOT))
PIPELINE_ROOT: gs:///pipeline_root/penguin-vertex-training

Prepare dados de exemplo

Vamos usar o mesmo Palmer Penguins conjunto de dados como Simples TFX Pipeline Tutorial .

Existem quatro recursos numéricos neste conjunto de dados que já foram normalizados para ter intervalo [0,1]. Vamos construir um modelo de classificação que prevê as species de pingüins.

Precisamos fazer nossa própria cópia do conjunto de dados. Como o TFX ExampleGen lê as entradas de um diretório, precisamos criar um diretório e copiar o conjunto de dados para ele no GCS.

gsutil cp gs://download.tensorflow.org/data/palmer_penguins/penguins_processed.csv {DATA_ROOT}/
InvalidUrlError: Cloud URL scheme should be followed by colon and two slashes: "://". Found: "gs:///data/penguin-vertex-training/".

Dê uma olhada rápida no arquivo CSV.

gsutil cat {DATA_ROOT}/penguins_processed.csv | head
InvalidUrlError: Cloud URL scheme should be followed by colon and two slashes: "://". Found: "gs:///data/penguin-vertex-training/penguins_processed.csv".

Crie um pipeline

Nossa gasoduto será muito semelhante ao gasoduto que criamos no Simples TFX Pipeline para Vertex Pipelines Tutorial . O pipeline consistirá em três componentes, CsvExampleGen, Trainer e Pusher. Mas usaremos um componente especial Trainer e Pusher. O componente Trainer moverá as cargas de trabalho de treinamento para o Vertex AI e o componente Pusher publicará o modelo ML treinado para o Vertex AI em vez de um sistema de arquivos.

TFX fornece um especial Trainer para enviar trabalhos de treinamento para serviço Vertex AI Training. Tudo que temos a fazer é usar o Trainer no módulo de extensão em vez do padrão Trainer componente juntamente com alguns parâmetros GCP necessários.

Neste tutorial, executaremos as tarefas de treinamento do Vertex AI apenas usando CPUs primeiro e depois com uma GPU.

TFX também fornece um especial Pusher para carregar o modelo para Vertex AI Models. Pusher criará recurso Vertex AI Endpoint para servir perdictions on-line, também. Consulte a documentação Vertex AI para saber mais sobre as previsões on-line fornecidos pela Vertex AI.

Escreva o código do modelo.

O modelo em si é quase semelhante ao modelo do Simples TFX Pipeline Tutorial .

Nós vamos adicionar _get_distribution_strategy() função que cria uma estratégia de distribuição TensorFlow e é usado em run_fn usar MirroredStrategy se GPU está disponível.

_trainer_module_file = 'penguin_trainer.py'
%%writefile {_trainer_module_file}

# Copied from https://www.tensorflow.org/tfx/tutorials/tfx/penguin_simple and
# slightly modified run_fn() to add distribution_strategy.

from typing import List
from absl import logging
import tensorflow as tf
from tensorflow import keras
from tensorflow_metadata.proto.v0 import schema_pb2
from tensorflow_transform.tf_metadata import schema_utils

from tfx import v1 as tfx
from tfx_bsl.public import tfxio

_FEATURE_KEYS = [
    'culmen_length_mm', 'culmen_depth_mm', 'flipper_length_mm', 'body_mass_g'
]
_LABEL_KEY = 'species'

_TRAIN_BATCH_SIZE = 20
_EVAL_BATCH_SIZE = 10

# Since we're not generating or creating a schema, we will instead create
# a feature spec.  Since there are a fairly small number of features this is
# manageable for this dataset.
_FEATURE_SPEC = {
    **{
        feature: tf.io.FixedLenFeature(shape=[1], dtype=tf.float32)
        for feature in _FEATURE_KEYS
    }, _LABEL_KEY: tf.io.FixedLenFeature(shape=[1], dtype=tf.int64)
}


def _input_fn(file_pattern: List[str],
              data_accessor: tfx.components.DataAccessor,
              schema: schema_pb2.Schema,
              batch_size: int) -> tf.data.Dataset:
  """Generates features and label for training.

  Args:
    file_pattern: List of paths or patterns of input tfrecord files.
    data_accessor: DataAccessor for converting input to RecordBatch.
    schema: schema of the input data.
    batch_size: representing the number of consecutive elements of returned
      dataset to combine in a single batch

  Returns:
    A dataset that contains (features, indices) tuple where features is a
      dictionary of Tensors, and indices is a single Tensor of label indices.
  """
  return data_accessor.tf_dataset_factory(
      file_pattern,
      tfxio.TensorFlowDatasetOptions(
          batch_size=batch_size, label_key=_LABEL_KEY),
      schema=schema).repeat()


def _make_keras_model() -> tf.keras.Model:
  """Creates a DNN Keras model for classifying penguin data.

  Returns:
    A Keras Model.
  """
  # The model below is built with Functional API, please refer to
  # https://www.tensorflow.org/guide/keras/overview for all API options.
  inputs = [keras.layers.Input(shape=(1,), name=f) for f in _FEATURE_KEYS]
  d = keras.layers.concatenate(inputs)
  for _ in range(2):
    d = keras.layers.Dense(8, activation='relu')(d)
  outputs = keras.layers.Dense(3)(d)

  model = keras.Model(inputs=inputs, outputs=outputs)
  model.compile(
      optimizer=keras.optimizers.Adam(1e-2),
      loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
      metrics=[keras.metrics.SparseCategoricalAccuracy()])

  model.summary(print_fn=logging.info)
  return model


# NEW: Read `use_gpu` from the custom_config of the Trainer.
#      if it uses GPU, enable MirroredStrategy.
def _get_distribution_strategy(fn_args: tfx.components.FnArgs):
  if fn_args.custom_config.get('use_gpu', False):
    logging.info('Using MirroredStrategy with one GPU.')
    return tf.distribute.MirroredStrategy(devices=['device:GPU:0'])
  return None


# TFX Trainer will call this function.
def run_fn(fn_args: tfx.components.FnArgs):
  """Train the model based on given args.

  Args:
    fn_args: Holds args used to train the model as name/value pairs.
  """

  # This schema is usually either an output of SchemaGen or a manually-curated
  # version provided by pipeline author. A schema can also derived from TFT
  # graph if a Transform component is used. In the case when either is missing,
  # `schema_from_feature_spec` could be used to generate schema from very simple
  # feature_spec, but the schema returned would be very primitive.
  schema = schema_utils.schema_from_feature_spec(_FEATURE_SPEC)

  train_dataset = _input_fn(
      fn_args.train_files,
      fn_args.data_accessor,
      schema,
      batch_size=_TRAIN_BATCH_SIZE)
  eval_dataset = _input_fn(
      fn_args.eval_files,
      fn_args.data_accessor,
      schema,
      batch_size=_EVAL_BATCH_SIZE)

  # NEW: If we have a distribution strategy, build a model in a strategy scope.
  strategy = _get_distribution_strategy(fn_args)
  if strategy is None:
    model = _make_keras_model()
  else:
    with strategy.scope():
      model = _make_keras_model()

  model.fit(
      train_dataset,
      steps_per_epoch=fn_args.train_steps,
      validation_data=eval_dataset,
      validation_steps=fn_args.eval_steps)

  # The result of the training should be saved in `fn_args.serving_model_dir`
  # directory.
  model.save(fn_args.serving_model_dir, save_format='tf')
Writing penguin_trainer.py

Copie o arquivo do módulo para o GCS, que pode ser acessado a partir dos componentes do pipeline.

Caso contrário, você pode querer construir uma imagem de contêiner incluindo o arquivo do módulo e usar a imagem para executar o pipeline e jobs de treinamento do AI Platform.

gsutil cp {_trainer_module_file} {MODULE_ROOT}/
InvalidUrlError: Cloud URL scheme should be followed by colon and two slashes: "://". Found: "gs:///pipeline_module/penguin-vertex-training/".

Escreva uma definição de pipeline

Vamos definir uma função para criar um pipeline TFX. Ele tem os mesmos três componentes como no Simples TFX Pipeline Tutorial , mas usamos um Trainer e Pusher componente no módulo de extensão GCP.

tfx.extensions.google_cloud_ai_platform.Trainer se comporta como um regular Trainer , mas apenas se move o cálculo para o treinamento do modelo de nuvem. Ele inicia um trabalho personalizado no serviço Vertex AI Training e o componente do treinador no sistema de orquestração irá apenas esperar até que o trabalho do Vertex AI Training seja concluído.

tfx.extensions.google_cloud_ai_platform.Pusher cria um AI Modelo Vertex e Vertex AI Endpoint utilizando o modelo treinado.

def _create_pipeline(pipeline_name: str, pipeline_root: str, data_root: str,
                     module_file: str, endpoint_name: str, project_id: str,
                     region: str, use_gpu: bool) -> tfx.dsl.Pipeline:
  """Implements the penguin pipeline with TFX."""
  # Brings data into the pipeline or otherwise joins/converts training data.
  example_gen = tfx.components.CsvExampleGen(input_base=data_root)

  # NEW: Configuration for Vertex AI Training.
  # This dictionary will be passed as `CustomJobSpec`.
  vertex_job_spec = {
      'project': project_id,
      'worker_pool_specs': [{
          'machine_spec': {
              'machine_type': 'n1-standard-4',
          },
          'replica_count': 1,
          'container_spec': {
              'image_uri': 'gcr.io/tfx-oss-public/tfx:{}'.format(tfx.__version__),
          },
      }],
  }
  if use_gpu:
    # See https://cloud.google.com/vertex-ai/docs/reference/rest/v1/MachineSpec#acceleratortype
    # for available machine types.
    vertex_job_spec['worker_pool_specs'][0]['machine_spec'].update({
        'accelerator_type': 'NVIDIA_TESLA_K80',
        'accelerator_count': 1
    })

  # Trains a model using Vertex AI Training.
  # NEW: We need to specify a Trainer for GCP with related configs.
  trainer = tfx.extensions.google_cloud_ai_platform.Trainer(
      module_file=module_file,
      examples=example_gen.outputs['examples'],
      train_args=tfx.proto.TrainArgs(num_steps=100),
      eval_args=tfx.proto.EvalArgs(num_steps=5),
      custom_config={
          tfx.extensions.google_cloud_ai_platform.ENABLE_UCAIP_KEY:
              True,
          tfx.extensions.google_cloud_ai_platform.UCAIP_REGION_KEY:
              region,
          tfx.extensions.google_cloud_ai_platform.TRAINING_ARGS_KEY:
              vertex_job_spec,
          'use_gpu':
              use_gpu,
      })

  # NEW: Configuration for pusher.
  vertex_serving_spec = {
      'project_id': project_id,
      'endpoint_name': endpoint_name,
      # Remaining argument is passed to aiplatform.Model.deploy()
      # See https://cloud.google.com/vertex-ai/docs/predictions/deploy-model-api#deploy_the_model
      # for the detail.
      #
      # Machine type is the compute resource to serve prediction requests.
      # See https://cloud.google.com/vertex-ai/docs/predictions/configure-compute#machine-types
      # for available machine types and acccerators.
      'machine_type': 'n1-standard-4',
  }

  # Vertex AI provides pre-built containers with various configurations for
  # serving.
  # See https://cloud.google.com/vertex-ai/docs/predictions/pre-built-containers
  # for available container images.
  serving_image = 'us-docker.pkg.dev/vertex-ai/prediction/tf2-cpu.2-6:latest'
  if use_gpu:
    vertex_serving_spec.update({
        'accelerator_type': 'NVIDIA_TESLA_K80',
        'accelerator_count': 1
    })
    serving_image = 'us-docker.pkg.dev/vertex-ai/prediction/tf2-gpu.2-6:latest'

  # NEW: Pushes the model to Vertex AI.
  pusher = tfx.extensions.google_cloud_ai_platform.Pusher(
      model=trainer.outputs['model'],
      custom_config={
          tfx.extensions.google_cloud_ai_platform.ENABLE_VERTEX_KEY:
              True,
          tfx.extensions.google_cloud_ai_platform.VERTEX_REGION_KEY:
              region,
          tfx.extensions.google_cloud_ai_platform.VERTEX_CONTAINER_IMAGE_URI_KEY:
              serving_image,
          tfx.extensions.google_cloud_ai_platform.SERVING_ARGS_KEY:
            vertex_serving_spec,
      })

  components = [
      example_gen,
      trainer,
      pusher,
  ]

  return tfx.dsl.Pipeline(
      pipeline_name=pipeline_name,
      pipeline_root=pipeline_root,
      components=components)

Execute o pipeline em Vertex Pipelines.

Usaremos Vertex Pipelines para executar o gasoduto como fizemos no Simples TFX Pipeline para Vertex Pipelines Tutorial .

import os

PIPELINE_DEFINITION_FILE = PIPELINE_NAME + '_pipeline.json'

runner = tfx.orchestration.experimental.KubeflowV2DagRunner(
    config=tfx.orchestration.experimental.KubeflowV2DagRunnerConfig(),
    output_filename=PIPELINE_DEFINITION_FILE)
_ = runner.run(
    _create_pipeline(
        pipeline_name=PIPELINE_NAME,
        pipeline_root=PIPELINE_ROOT,
        data_root=DATA_ROOT,
        module_file=os.path.join(MODULE_ROOT, _trainer_module_file),
        endpoint_name=ENDPOINT_NAME,
        project_id=GOOGLE_CLOUD_PROJECT,
        region=GOOGLE_CLOUD_REGION,
        # We will use CPUs only for now.
        use_gpu=False))

O arquivo de definição gerado pode ser enviado usando o cliente aiplatform Google Cloud no google-cloud-aiplatform pacote.

# docs_infra: no_execute
from google.cloud import aiplatform
from google.cloud.aiplatform import pipeline_jobs

aiplatform.init(project=GOOGLE_CLOUD_PROJECT, location=GOOGLE_CLOUD_REGION)

job = pipeline_jobs.PipelineJob(template_path=PIPELINE_DEFINITION_FILE,
                                display_name=PIPELINE_NAME)
job.run(sync=False)

Agora você pode visitar 'Vertex AI> Pipelines' no Google Cloud Console para ver o progresso.

Teste com uma solicitação de previsão

Depois da conclusão do gasoduto, você vai encontrar um modelo implantado no um dos endpoints em 'Vertex AI> Endpoints. Precisamos saber a id do endpoint para enviar uma solicitação de predição para o novo endpoint. Isto é diferente do nome do endpoint entramos acima. Você pode encontrar o ID na página Endpoints no Google Cloud Console , parece que um número muito longo.

Defina ENDPOINT_ID abaixo antes de executá-lo.

ENDPOINT_ID=''     # <--- ENTER THIS
if not ENDPOINT_ID:
    from absl import logging
    logging.error('Please set the endpoint id.')
ERROR:absl:Please set the endpoint id.

Usamos o mesmo cliente da plataforma para enviar uma solicitação ao terminal. Enviaremos uma solicitação de previsão para classificação das espécies de pinguins. A entrada são os quatro recursos que usamos e o modelo retornará três valores, porque nosso modelo produz um valor para cada espécie.

Por exemplo, o exemplo específico a seguir tem o maior valor no índice '2' e imprimirá '2'.

# docs_infra: no_execute
import numpy as np

# The AI Platform services require regional API endpoints.
client_options = {
    'api_endpoint': GOOGLE_CLOUD_REGION + '-aiplatform.googleapis.com'
    }
# Initialize client that will be used to create and send requests.
client = aiplatform.gapic.PredictionServiceClient(client_options=client_options)

# Set data values for the prediction request.
# Our model expects 4 feature inputs and produces 3 output values for each
# species. Note that the output is logit value rather than probabilities.
# See the model code to understand input / output structure.
instances = [{
    'culmen_length_mm':[0.71],
    'culmen_depth_mm':[0.38],
    'flipper_length_mm':[0.98],
    'body_mass_g': [0.78],
}]

endpoint = client.endpoint_path(
    project=GOOGLE_CLOUD_PROJECT,
    location=GOOGLE_CLOUD_REGION,
    endpoint=ENDPOINT_ID,
)
# Send a prediction request and get response.
response = client.predict(endpoint=endpoint, instances=instances)

# Uses argmax to find the index of the maximum value.
print('species:', np.argmax(response.predictions[0]))

Para obter informações detalhadas sobre a previsão on-line, visite o página Endpoints no Google Cloud Console . você pode encontrar um guia sobre como enviar solicitações de amostra e links para mais recursos.

Execute o pipeline usando uma GPU

O Vertex AI oferece suporte ao treinamento usando vários tipos de máquina, incluindo suporte para GPUs. Veja referência Máquina de especificação de opções disponíveis.

Já definimos nosso pipeline para dar suporte ao treinamento em GPU. Tudo o que precisamos fazer é definir use_gpu bandeira para True. Em seguida, um gasoduto será criado com uma especificação de máquina, incluindo um NVIDIA_TESLA_K80 e nosso código de treinamento do modelo usará tf.distribute.MirroredStrategy .

Note-se que use_gpu bandeira não é uma parte da API Vertex ou TFX. Ele é usado apenas para controlar o código de treinamento neste tutorial.

# docs_infra: no_execute
runner.run(
    _create_pipeline(
        pipeline_name=PIPELINE_NAME,
        pipeline_root=PIPELINE_ROOT,
        data_root=DATA_ROOT,
        module_file=os.path.join(MODULE_ROOT, _trainer_module_file),
        endpoint_name=ENDPOINT_NAME,
        project_id=GOOGLE_CLOUD_PROJECT,
        region=GOOGLE_CLOUD_REGION,
        # Updated: Use GPUs. We will use a NVIDIA_TESLA_K80 and 
        # the model code will use tf.distribute.MirroredStrategy.
        use_gpu=True))

job = pipeline_jobs.PipelineJob(template_path=PIPELINE_DEFINITION_FILE,
                                display_name=PIPELINE_NAME)
job.run(sync=False)

Agora você pode visitar 'Vertex AI> Pipelines' no Google Cloud Console para ver o progresso.

Limpando

Você criou um modelo e ponto de extremidade Vertex AI neste tutorial. Por favor, apague esses recursos para evitar cobranças indesejadas, indo para Endpoints e undeploying o modelo a partir do ponto final em primeiro lugar. Em seguida, você pode excluir o ponto de extremidade e o modelo separadamente.