Lendo dados do BigQuery com TFX e Vertex Pipelines

Este tutorial baseado em notebook usará o Google Cloud BigQuery como fonte de dados para treinar um modelo de ML. O pipeline de ML será construído usando o TFX e executado no Google Cloud Vertex Pipelines.

Este notebook é baseado no pipeline TFX que construímos no Tutorial Simple TFX Pipeline for Vertex Pipelines . Se você ainda não leu esse tutorial, leia-o antes de continuar com este notebook.

O BigQuery é um data warehouse multinuvem sem servidor, altamente escalável e econômico, projetado para agilidade nos negócios. O TFX pode ser usado para ler dados de treinamento do BigQuery e publicar o modelo treinado no BigQuery.

Neste tutorial, usaremos o componente BigQueryExampleGen que lê dados do BigQuery para pipelines do TFX.

Este notebook deve ser executado no Google Colab ou no AI Platform Notebooks . Se você não estiver usando um desses, basta clicar no botão "Executar no Google Colab" acima.

Configurar

Se você concluiu o Tutorial Simple TFX Pipeline for Vertex Pipelines , você terá um projeto GCP funcional e um bucket GCS e isso é tudo o que precisamos para este tutorial. Por favor, leia o tutorial preliminar primeiro se você perdeu.

Instalar pacotes python

Instalaremos os pacotes Python necessários, incluindo TFX e KFP, para criar pipelines de ML e enviar trabalhos para Vertex Pipelines.

# Use the latest version of pip.
pip install --upgrade pip
pip install --upgrade "tfx[kfp]<2"

Você reiniciou o tempo de execução?

Se você estiver usando o Google Colab, na primeira vez que executar a célula acima, deverá reiniciar o runtime clicando no botão acima "RESTART RUNTIME" ou usando o menu "Runtime > Restart runtime...". Isso ocorre devido à maneira como o Colab carrega os pacotes.

Se você não estiver no Colab, poderá reiniciar o tempo de execução com a seguinte célula.

# docs_infra: no_execute
import sys
if not 'google.colab' in sys.modules:
  # Automatically restart kernel after installs
  import IPython
  app = IPython.Application.instance()
  app.kernel.do_shutdown(True)

Faça login no Google para este notebook

Se você estiver executando este notebook no Colab, autentique-se com sua conta de usuário:

import sys
if 'google.colab' in sys.modules:
  from google.colab import auth
  auth.authenticate_user()

Se você estiver no AI Platform Notebooks , autentique-se no Google Cloud antes de executar a próxima seção, executando

gcloud auth login

na janela Terminal (que você pode abrir via Arquivo > Novo no menu). Você só precisa fazer isso uma vez por instância de notebook.

Verifique as versões do pacote.

import tensorflow as tf
print('TensorFlow version: {}'.format(tf.__version__))
from tfx import v1 as tfx
print('TFX version: {}'.format(tfx.__version__))
import kfp
print('KFP version: {}'.format(kfp.__version__))
TensorFlow version: 2.7.1
TFX version: 1.6.0
KFP version: 1.8.11

Configurar variáveis

Vamos configurar algumas variáveis ​​usadas para customizar os pipelines abaixo. As seguintes informações são necessárias:

Insira os valores necessários na célula abaixo antes de executá-la .

GOOGLE_CLOUD_PROJECT = ''         # <--- ENTER THIS
GOOGLE_CLOUD_PROJECT_NUMBER = ''  # <--- ENTER THIS
GOOGLE_CLOUD_REGION = ''          # <--- ENTER THIS
GCS_BUCKET_NAME = ''              # <--- ENTER THIS

if not (GOOGLE_CLOUD_PROJECT and  GOOGLE_CLOUD_PROJECT_NUMBER and GOOGLE_CLOUD_REGION and GCS_BUCKET_NAME):
    from absl import logging
    logging.error('Please set all required parameters.')
ERROR:absl:Please set all required parameters.

Defina o gcloud para usar seu projeto.

gcloud config set project {GOOGLE_CLOUD_PROJECT}
ERROR: (gcloud.config.set) argument VALUE: Must be specified.
Usage: gcloud config set SECTION/PROPERTY VALUE [optional flags]
  optional flags may be  --help | --installation

For detailed information on this command and its flags, run:
  gcloud config set --help
PIPELINE_NAME = 'penguin-bigquery'

# Path to various pipeline artifact.
PIPELINE_ROOT = 'gs://{}/pipeline_root/{}'.format(
    GCS_BUCKET_NAME, PIPELINE_NAME)

# Paths for users' Python module.
MODULE_ROOT = 'gs://{}/pipeline_module/{}'.format(
    GCS_BUCKET_NAME, PIPELINE_NAME)

# Paths for users' data.
DATA_ROOT = 'gs://{}/data/{}'.format(GCS_BUCKET_NAME, PIPELINE_NAME)

# This is the path where your model will be pushed for serving.
SERVING_MODEL_DIR = 'gs://{}/serving_model/{}'.format(
    GCS_BUCKET_NAME, PIPELINE_NAME)

print('PIPELINE_ROOT: {}'.format(PIPELINE_ROOT))
PIPELINE_ROOT: gs:///pipeline_root/penguin-bigquery

Por padrão, o Vertex Pipelines usa a conta de serviço GCE VM padrão do formato [project-number]-compute@developer.gserviceaccount.com . Precisamos dar permissão para usar o BigQuery a essa conta para acessar o BigQuery no pipeline. Adicionaremos a função 'Usuário do BigQuery' à conta.

!gcloud projects add-iam-policy-binding {GOOGLE_CLOUD_PROJECT} \
  --member=serviceAccount:{GOOGLE_CLOUD_PROJECT_NUMBER}-compute@developer.gserviceaccount.com \
  --role=roles/bigquery.user
ERROR: (gcloud.projects.add-iam-policy-binding) argument PROJECT_ID: Must be specified.
Usage: gcloud projects add-iam-policy-binding PROJECT_ID --member=PRINCIPAL --role=ROLE [optional flags]
  optional flags may be  --condition | --condition-from-file | --help

For detailed information on this command and its flags, run:
  gcloud projects add-iam-policy-binding --help

Consulte a documentação do Vertex para saber mais sobre contas de serviço e configuração do IAM.

Criar um pipeline

Os pipelines do TFX são definidos usando APIs do Python, como fizemos no Tutorial Simple TFX Pipeline for Vertex Pipelines . Anteriormente, usamos CsvExampleGen que lê dados de um arquivo CSV. Neste tutorial, usaremos o componente BigQueryExampleGen que lê dados do BigQuery.

Preparar consulta do BigQuery

Usaremos o mesmo conjunto de dados do Palmer Penguins . No entanto, vamos lê-lo de uma tabela do BigQuery tfx-oss-public.palmer_penguins.palmer_penguins que é preenchida usando o mesmo arquivo CSV.

Se você estiver usando o Google Colab, poderá examinar o conteúdo da tabela do BigQuery diretamente.

# docs_infra: no_execute
%%bigquery --project {GOOGLE_CLOUD_PROJECT}
SELECT *
FROM `tfx-oss-public.palmer_penguins.palmer_penguins`
LIMIT 5

Todas as características já foram normalizadas para 0~1, exceto a species que é o rótulo. Construiremos um modelo de classificação que prevê as species de pinguins.

BigQueryExampleGen requer uma consulta para especificar quais dados buscar. Como usaremos todos os campos de todas as linhas da tabela, a consulta é bastante simples. Você também pode especificar nomes de campos e adicionar condições WHERE conforme necessário de acordo com a sintaxe SQL padrão do BigQuery .

QUERY = "SELECT * FROM `tfx-oss-public.palmer_penguins.palmer_penguins`"

Escreva o código do modelo.

Usaremos o mesmo código de modelo do Tutorial Simple TFX Pipeline .

_trainer_module_file = 'penguin_trainer.py'
%%writefile {_trainer_module_file}

# Copied from https://www.tensorflow.org/tfx/tutorials/tfx/penguin_simple

from typing import List
from absl import logging
import tensorflow as tf
from tensorflow import keras
from tensorflow_transform.tf_metadata import schema_utils

from tfx import v1 as tfx
from tfx_bsl.public import tfxio

from tensorflow_metadata.proto.v0 import schema_pb2

_FEATURE_KEYS = [
    'culmen_length_mm', 'culmen_depth_mm', 'flipper_length_mm', 'body_mass_g'
]
_LABEL_KEY = 'species'

_TRAIN_BATCH_SIZE = 20
_EVAL_BATCH_SIZE = 10

# Since we're not generating or creating a schema, we will instead create
# a feature spec.  Since there are a fairly small number of features this is
# manageable for this dataset.
_FEATURE_SPEC = {
    **{
        feature: tf.io.FixedLenFeature(shape=[1], dtype=tf.float32)
           for feature in _FEATURE_KEYS
       },
    _LABEL_KEY: tf.io.FixedLenFeature(shape=[1], dtype=tf.int64)
}


def _input_fn(file_pattern: List[str],
              data_accessor: tfx.components.DataAccessor,
              schema: schema_pb2.Schema,
              batch_size: int) -> tf.data.Dataset:
  """Generates features and label for training.

  Args:
    file_pattern: List of paths or patterns of input tfrecord files.
    data_accessor: DataAccessor for converting input to RecordBatch.
    schema: schema of the input data.
    batch_size: representing the number of consecutive elements of returned
      dataset to combine in a single batch

  Returns:
    A dataset that contains (features, indices) tuple where features is a
      dictionary of Tensors, and indices is a single Tensor of label indices.
  """
  return data_accessor.tf_dataset_factory(
      file_pattern,
      tfxio.TensorFlowDatasetOptions(
          batch_size=batch_size, label_key=_LABEL_KEY),
      schema=schema).repeat()


def _make_keras_model() -> tf.keras.Model:
  """Creates a DNN Keras model for classifying penguin data.

  Returns:
    A Keras Model.
  """
  # The model below is built with Functional API, please refer to
  # https://www.tensorflow.org/guide/keras/overview for all API options.
  inputs = [keras.layers.Input(shape=(1,), name=f) for f in _FEATURE_KEYS]
  d = keras.layers.concatenate(inputs)
  for _ in range(2):
    d = keras.layers.Dense(8, activation='relu')(d)
  outputs = keras.layers.Dense(3)(d)

  model = keras.Model(inputs=inputs, outputs=outputs)
  model.compile(
      optimizer=keras.optimizers.Adam(1e-2),
      loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
      metrics=[keras.metrics.SparseCategoricalAccuracy()])

  model.summary(print_fn=logging.info)
  return model


# TFX Trainer will call this function.
def run_fn(fn_args: tfx.components.FnArgs):
  """Train the model based on given args.

  Args:
    fn_args: Holds args used to train the model as name/value pairs.
  """

  # This schema is usually either an output of SchemaGen or a manually-curated
  # version provided by pipeline author. A schema can also derived from TFT
  # graph if a Transform component is used. In the case when either is missing,
  # `schema_from_feature_spec` could be used to generate schema from very simple
  # feature_spec, but the schema returned would be very primitive.
  schema = schema_utils.schema_from_feature_spec(_FEATURE_SPEC)

  train_dataset = _input_fn(
      fn_args.train_files,
      fn_args.data_accessor,
      schema,
      batch_size=_TRAIN_BATCH_SIZE)
  eval_dataset = _input_fn(
      fn_args.eval_files,
      fn_args.data_accessor,
      schema,
      batch_size=_EVAL_BATCH_SIZE)

  model = _make_keras_model()
  model.fit(
      train_dataset,
      steps_per_epoch=fn_args.train_steps,
      validation_data=eval_dataset,
      validation_steps=fn_args.eval_steps)

  # The result of the training should be saved in `fn_args.serving_model_dir`
  # directory.
  model.save(fn_args.serving_model_dir, save_format='tf')
Writing penguin_trainer.py

Copie o arquivo do módulo para o GCS, que pode ser acessado a partir dos componentes do pipeline. Como o treinamento de modelo acontece no GCP, precisamos fazer upload dessa definição de modelo.

Caso contrário, convém criar uma imagem de contêiner incluindo o arquivo de módulo e usar a imagem para executar o pipeline.

gsutil cp {_trainer_module_file} {MODULE_ROOT}/
InvalidUrlError: Cloud URL scheme should be followed by colon and two slashes: "://". Found: "gs:///pipeline_module/penguin-bigquery/".

Escreva uma definição de pipeline

Vamos definir uma função para criar um pipeline TFX. Precisamos usar BigQueryExampleGen , que recebe a query como argumento. Mais uma mudança em relação ao tutorial anterior é que precisamos passar beam_pipeline_args que é passado para os componentes quando eles são executados. Usaremos beam_pipeline_args para passar parâmetros adicionais ao BigQuery.

from typing import List, Optional

def _create_pipeline(pipeline_name: str, pipeline_root: str, query: str,
                     module_file: str, serving_model_dir: str,
                     beam_pipeline_args: Optional[List[str]],
                     ) -> tfx.dsl.Pipeline:
  """Creates a TFX pipeline using BigQuery."""

  # NEW: Query data in BigQuery as a data source.
  example_gen = tfx.extensions.google_cloud_big_query.BigQueryExampleGen(
      query=query)

  # Uses user-provided Python function that trains a model.
  trainer = tfx.components.Trainer(
      module_file=module_file,
      examples=example_gen.outputs['examples'],
      train_args=tfx.proto.TrainArgs(num_steps=100),
      eval_args=tfx.proto.EvalArgs(num_steps=5))

  # Pushes the model to a file destination.
  pusher = tfx.components.Pusher(
      model=trainer.outputs['model'],
      push_destination=tfx.proto.PushDestination(
          filesystem=tfx.proto.PushDestination.Filesystem(
              base_directory=serving_model_dir)))

  components = [
      example_gen,
      trainer,
      pusher,
  ]

  return tfx.dsl.Pipeline(
      pipeline_name=pipeline_name,
      pipeline_root=pipeline_root,
      components=components,
      # NEW: `beam_pipeline_args` is required to use BigQueryExampleGen.
      beam_pipeline_args=beam_pipeline_args)

Execute o pipeline no Vertex Pipelines.

Usaremos o Vertex Pipelines para executar o pipeline, como fizemos no Tutorial Simple TFX Pipeline for Vertex Pipelines .

Também precisamos passar beam_pipeline_args para BigQueryExampleGen. Inclui configurações como o nome do projeto do GCP e o armazenamento temporário para a execução do BigQuery.

import os

# We need to pass some GCP related configs to BigQuery. This is currently done
# using `beam_pipeline_args` parameter.
BIG_QUERY_WITH_DIRECT_RUNNER_BEAM_PIPELINE_ARGS = [
   '--project=' + GOOGLE_CLOUD_PROJECT,
   '--temp_location=' + os.path.join('gs://', GCS_BUCKET_NAME, 'tmp'),
   ]

PIPELINE_DEFINITION_FILE = PIPELINE_NAME + '_pipeline.json'

runner = tfx.orchestration.experimental.KubeflowV2DagRunner(
    config=tfx.orchestration.experimental.KubeflowV2DagRunnerConfig(),
    output_filename=PIPELINE_DEFINITION_FILE)
_ = runner.run(
    _create_pipeline(
        pipeline_name=PIPELINE_NAME,
        pipeline_root=PIPELINE_ROOT,
        query=QUERY,
        module_file=os.path.join(MODULE_ROOT, _trainer_module_file),
        serving_model_dir=SERVING_MODEL_DIR,
        beam_pipeline_args=BIG_QUERY_WITH_DIRECT_RUNNER_BEAM_PIPELINE_ARGS))

O arquivo de definição gerado pode ser enviado usando o cliente kfp.

# docs_infra: no_execute
from google.cloud import aiplatform
from google.cloud.aiplatform import pipeline_jobs

aiplatform.init(project=GOOGLE_CLOUD_PROJECT, location=GOOGLE_CLOUD_REGION)

job = pipeline_jobs.PipelineJob(template_path=PIPELINE_DEFINITION_FILE,
                                display_name=PIPELINE_NAME)
job.run(sync=False)

Agora você pode visitar 'Vertex AI > Pipelines' no Google Cloud Console para ver o progresso.