Using TensorFlow Recommenders with TFX

A tutorial to train a TensorFlow Recommenders ranking model as a TFX pipeline.

View on TensorFlow.org Run in Google Colab View source on GitHub Download notebook

In this notebook-based tutorial, we will create and run a TFX pipeline to train a ranking model to predict movie ratings using TensorFlow Recommenders (TFRS). The pipeline will consist of three essential TFX components: ExampleGen, Trainer and Pusher. The pipeline includes the most minimal ML workflow like importing data, training a model and exporting the trained TFRS ranking model.

Set Up

We first need to install the TFX Python package and download the dataset which we will use for our model.

Upgrade Pip

To avoid upgrading Pip in a system when running locally, check to make sure that we are running in Colab. Local systems can of course be upgraded separately.

import sys
if 'google.colab' in sys.modules:
  !pip install --upgrade pip

Install TFX

pip install -U tfx
pip install -U tensorflow-recommenders

Did you restart the runtime?

If you are using Google Colab, the first time that you run the cell above, you must restart the runtime by clicking above "RESTART RUNTIME" button or using "Runtime > Restart runtime ..." menu. This is because of the way that Colab loads packages.

Before we define the pipeline, we need to write the model code for the Trainer component and save it in a file.

Check the TensorFlow and TFX versions.

import tensorflow as tf
print('TensorFlow version: {}'.format(tf.__version__))
from tfx import v1 as tfx
print('TFX version: {}'.format(tfx.__version__))
TensorFlow version: 2.8.0
TFX version: 1.7.0

Set up variables

There are some variables used to define a pipeline. You can customize these variables as you want. By default all output from the pipeline will be generated under the current directory. Instead of using the SchemaGen component to generate a schema, for this tutorial we will create a hardcoded schema.

import os

PIPELINE_NAME = 'TFRS-ranking'

# Directory where MovieLens 100K rating data lives
DATA_ROOT = os.path.join('data', PIPELINE_NAME)
# Output directory to store artifacts generated from the pipeline.
PIPELINE_ROOT = os.path.join('pipelines', PIPELINE_NAME)
# Path to a SQLite DB file to use as an MLMD storage.
METADATA_PATH = os.path.join('metadata', PIPELINE_NAME, 'metadata.db')
# Output directory where created models from the pipeline will be exported.
SERVING_MODEL_DIR = os.path.join('serving_model', PIPELINE_NAME)

from absl import logging
logging.set_verbosity(logging.INFO)  # Set default logging level.

Prepare example data

Since TFX does not currently support TensorFlow Datasets API, we will download the MovieLens 100K dataset manually for use in our TFX pipeline. The dataset we are using is MovieLens 100K Dataset.

There are four numeric features in this dataset:

  • userId
  • movieId
  • rating
  • timestamp

We will build a ranking model which predicts the rating of the movies. We will not use the timestamp feature.

Because TFX ExampleGen reads inputs from a directory, we need to create a directory and copy dataset to it.

wget https://files.grouplens.org/datasets/movielens/ml-100k.zip
mkdir -p {DATA_ROOT}
unzip ml-100k.zip
echo 'userId,movieId,rating,timestamp' > {DATA_ROOT}/ratings.csv
sed 's/\t/,/g' ml-100k/u.data >> {DATA_ROOT}/ratings.csv
--2022-03-30 11:15:15--  https://files.grouplens.org/datasets/movielens/ml-100k.zip
Resolving files.grouplens.org (files.grouplens.org)... 128.101.65.152
Connecting to files.grouplens.org (files.grouplens.org)|128.101.65.152|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 4924029 (4.7M) [application/zip]
Saving to: ‘ml-100k.zip’

ml-100k.zip         100%[===================>]   4.70M  2.53MB/s    in 1.9s    

2022-03-30 11:15:17 (2.53 MB/s) - ‘ml-100k.zip’ saved [4924029/4924029]

Archive:  ml-100k.zip
   creating: ml-100k/
  inflating: ml-100k/allbut.pl       
  inflating: ml-100k/mku.sh          
  inflating: ml-100k/README          
  inflating: ml-100k/u.data          
  inflating: ml-100k/u.genre         
  inflating: ml-100k/u.info          
  inflating: ml-100k/u.item          
  inflating: ml-100k/u.occupation    
  inflating: ml-100k/u.user          
  inflating: ml-100k/u1.base         
  inflating: ml-100k/u1.test         
  inflating: ml-100k/u2.base         
  inflating: ml-100k/u2.test         
  inflating: ml-100k/u3.base         
  inflating: ml-100k/u3.test         
  inflating: ml-100k/u4.base         
  inflating: ml-100k/u4.test         
  inflating: ml-100k/u5.base         
  inflating: ml-100k/u5.test         
  inflating: ml-100k/ua.base         
  inflating: ml-100k/ua.test         
  inflating: ml-100k/ub.base         
  inflating: ml-100k/ub.test

Take a quick look at the CSV file.

head {DATA_ROOT}/ratings.csv
userId,movieId,rating,timestamp
196,242,3,881250949
186,302,3,891717742
22,377,1,878887116
244,51,2,880606923
166,346,1,886397596
298,474,4,884182806
115,265,2,881171488
253,465,5,891628467
305,451,3,886324817

You should be able to see four values. For example, the first example means user '196' gives a rating of 3 to movie '242'.

Create a pipeline

TFX pipelines are defined using Python APIs. We will define a pipeline which consists of following three components.

  • CsvExampleGen: Reads in data files and convert them to TFX internal format for further processing. There are multiple ExampleGens for various formats. In this tutorial, we will use CsvExampleGen which takes CSV file input.
  • Trainer: Trains an ML model. Trainer component requires a model definition code from users. You can use TensorFlow APIs to specify how to train a model and save it in a _savedmodel format.
  • Pusher: Copies the trained model outside of the TFX pipeline. Pusher component can be thought of an deployment process of the trained ML model.

Before actually define the pipeline, we need to write a model code for the Trainer component first.

Write model training code

We will build a simple ranking model to predict movie ratings. This model training code will be saved to a separate file.

In this tutorial we will use Generic Trainer of TFX which support Keras-based models. You need to write a Python file containing run_fn function, which is the entrypoint for the Trainer component.

_trainer_module_file = 'tfrs_ranking_trainer.py'

The ranking model we use is almost exactly the same as in the Basic Ranking tutorial. The only difference is that we use movie IDs instead of movie titles in the candidate tower.

%%writefile {_trainer_module_file}

from typing import Dict, Text
from typing import List

import numpy as np
import tensorflow as tf

from tensorflow_metadata.proto.v0 import schema_pb2
import tensorflow_recommenders as tfrs
from tensorflow_transform.tf_metadata import schema_utils
from tfx import v1 as tfx
from tfx_bsl.public import tfxio

_FEATURE_KEYS = ['userId', 'movieId']
_LABEL_KEY = 'rating'

_FEATURE_SPEC = {
    **{
        feature: tf.io.FixedLenFeature(shape=[1], dtype=tf.int64)
        for feature in _FEATURE_KEYS
    }, _LABEL_KEY: tf.io.FixedLenFeature(shape=[1], dtype=tf.int64)
}


class RankingModel(tf.keras.Model):

  def __init__(self):
    super().__init__()
    embedding_dimension = 32

    unique_user_ids = np.array(range(943)).astype(str)
    unique_movie_ids = np.array(range(1682)).astype(str)

    # Compute embeddings for users.
    self.user_embeddings = tf.keras.Sequential([
        tf.keras.layers.Input(shape=(1,), name='userId', dtype=tf.int64),
        tf.keras.layers.Lambda(lambda x: tf.as_string(x)),
        tf.keras.layers.StringLookup(
            vocabulary=unique_user_ids, mask_token=None),
        tf.keras.layers.Embedding(
            len(unique_user_ids) + 1, embedding_dimension)
    ])

    # Compute embeddings for movies.
    self.movie_embeddings = tf.keras.Sequential([
        tf.keras.layers.Input(shape=(1,), name='movieId', dtype=tf.int64),
        tf.keras.layers.Lambda(lambda x: tf.as_string(x)),
        tf.keras.layers.StringLookup(
            vocabulary=unique_movie_ids, mask_token=None),
        tf.keras.layers.Embedding(
            len(unique_movie_ids) + 1, embedding_dimension)
    ])

    # Compute predictions.
    self.ratings = tf.keras.Sequential([
        tf.keras.layers.Dense(256, activation='relu'),
        tf.keras.layers.Dense(64, activation='relu'),
        tf.keras.layers.Dense(1)
    ])

  def call(self, inputs):

    user_id, movie_id = inputs

    user_embedding = self.user_embeddings(user_id)
    movie_embedding = self.movie_embeddings(movie_id)

    return self.ratings(tf.concat([user_embedding, movie_embedding], axis=2))


class MovielensModel(tfrs.models.Model):

  def __init__(self):
    super().__init__()
    self.ranking_model: tf.keras.Model = RankingModel()
    self.task: tf.keras.layers.Layer = tfrs.tasks.Ranking(
        loss=tf.keras.losses.MeanSquaredError(),
        metrics=[tf.keras.metrics.RootMeanSquaredError()])

  def call(self, features: Dict[str, tf.Tensor]) -> tf.Tensor:
    return self.ranking_model((features['userId'], features['movieId']))

  def compute_loss(self,
                   features: Dict[Text, tf.Tensor],
                   training=False) -> tf.Tensor:

    labels = features[1]
    rating_predictions = self(features[0])

    # The task computes the loss and the metrics.
    return self.task(labels=labels, predictions=rating_predictions)


def _input_fn(file_pattern: List[str],
              data_accessor: tfx.components.DataAccessor,
              schema: schema_pb2.Schema,
              batch_size: int = 256) -> tf.data.Dataset:
  return data_accessor.tf_dataset_factory(
      file_pattern,
      tfxio.TensorFlowDatasetOptions(
          batch_size=batch_size, label_key=_LABEL_KEY),
      schema=schema).repeat()


def _build_keras_model() -> tf.keras.Model:
  return MovielensModel()


# TFX Trainer will call this function.
def run_fn(fn_args: tfx.components.FnArgs):
  """Train the model based on given args.

  Args:
    fn_args: Holds args used to train the model as name/value pairs.
  """
  schema = schema_utils.schema_from_feature_spec(_FEATURE_SPEC)

  train_dataset = _input_fn(
      fn_args.train_files, fn_args.data_accessor, schema, batch_size=8192)
  eval_dataset = _input_fn(
      fn_args.eval_files, fn_args.data_accessor, schema, batch_size=4096)

  model = _build_keras_model()

  model.compile(optimizer=tf.keras.optimizers.Adagrad(learning_rate=0.1))

  model.fit(
      train_dataset,
      steps_per_epoch=fn_args.train_steps,
      epochs = 3,
      validation_data=eval_dataset,
      validation_steps=fn_args.eval_steps)

  model.save(fn_args.serving_model_dir)
Writing tfrs_ranking_trainer.py

Now you have completed all preparation steps to build the TFX pipeline.

Write a pipeline definition

We define a function to create a TFX pipeline. A Pipeline object represents a TFX pipeline which can be run using one of pipeline orchestration systems that TFX supports.

def _create_pipeline(pipeline_name: str, pipeline_root: str, data_root: str,
                     module_file: str, serving_model_dir: str,
                     metadata_path: str) -> tfx.dsl.Pipeline:
  """Creates a three component pipeline with TFX."""
  # Brings data into the pipeline.
  example_gen = tfx.components.CsvExampleGen(input_base=data_root)

  # Uses user-provided Python function that trains a model.
  trainer = tfx.components.Trainer(
      module_file=module_file,
      examples=example_gen.outputs['examples'],
      train_args=tfx.proto.TrainArgs(num_steps=12),
      eval_args=tfx.proto.EvalArgs(num_steps=24))

  # Pushes the model to a filesystem destination.
  pusher = tfx.components.Pusher(
      model=trainer.outputs['model'],
      push_destination=tfx.proto.PushDestination(
          filesystem=tfx.proto.PushDestination.Filesystem(
              base_directory=serving_model_dir)))

  # Following three components will be included in the pipeline.
  components = [
      example_gen,
      trainer,
      pusher,
  ]

  return tfx.dsl.Pipeline(
      pipeline_name=pipeline_name,
      pipeline_root=pipeline_root,
      metadata_connection_config=tfx.orchestration.metadata
      .sqlite_metadata_connection_config(metadata_path),
      components=components)

Run the pipeline

TFX supports multiple orchestrators to run pipelines. In this tutorial we will use LocalDagRunner which is included in the TFX Python package and runs pipelines on local environment.

Now we create a LocalDagRunner and pass a Pipeline object created from the function we already defined.

The pipeline runs directly and you can see logs for the progress of the pipeline including ML model training.

tfx.orchestration.LocalDagRunner().run(
  _create_pipeline(
      pipeline_name=PIPELINE_NAME,
      pipeline_root=PIPELINE_ROOT,
      data_root=DATA_ROOT,
      module_file=_trainer_module_file,
      serving_model_dir=SERVING_MODEL_DIR,
      metadata_path=METADATA_PATH))
INFO:absl:Generating ephemeral wheel package for '/tmpfs/src/temp/docs/examples/tfrs_ranking_trainer.py' (including modules: ['tfrs_ranking_trainer']).
INFO:absl:User module package has hash fingerprint version bc2b229701436196f12efdbb4e880e3fa05103599208d98a05325f064775f21c.
INFO:absl:Executing: ['/tmpfs/src/tf_docs_env/bin/python', '/tmp/tmpwvu8zivq/_tfx_generated_setup.py', 'bdist_wheel', '--bdist-dir', '/tmp/tmp2408k6fv', '--dist-dir', '/tmp/tmpiuuzrc80']
running bdist_wheel
running build
running build_py
creating build
creating build/lib
copying tfrs_ranking_trainer.py -> build/lib
installing to /tmp/tmp2408k6fv
running install
running install_lib
copying build/lib/tfrs_ranking_trainer.py -> /tmp/tmp2408k6fv
running install_egg_info
running egg_info
creating tfx_user_code_Trainer.egg-info
writing tfx_user_code_Trainer.egg-info/PKG-INFO
writing dependency_links to tfx_user_code_Trainer.egg-info/dependency_links.txt
/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/setuptools/command/install.py:37: SetuptoolsDeprecationWarning: setup.py install is deprecated. Use build and pip and other standards-based tools.
  setuptools.SetuptoolsDeprecationWarning,
INFO:absl:Successfully built user code wheel distribution at 'pipelines/TFRS-ranking/_wheels/tfx_user_code_Trainer-0.0+bc2b229701436196f12efdbb4e880e3fa05103599208d98a05325f064775f21c-py3-none-any.whl'; target user module is 'tfrs_ranking_trainer'.
INFO:absl:Full user module path is 'tfrs_ranking_trainer@pipelines/TFRS-ranking/_wheels/tfx_user_code_Trainer-0.0+bc2b229701436196f12efdbb4e880e3fa05103599208d98a05325f064775f21c-py3-none-any.whl'
INFO:absl:Using deployment config:
 executor_specs {
  key: "CsvExampleGen"
  value {
    beam_executable_spec {
      python_executor_spec {
        class_path: "tfx.components.example_gen.csv_example_gen.executor.Executor"
      }
    }
  }
}
executor_specs {
  key: "Pusher"
  value {
    python_class_executable_spec {
      class_path: "tfx.components.pusher.executor.Executor"
    }
  }
}
executor_specs {
  key: "Trainer"
  value {
    python_class_executable_spec {
      class_path: "tfx.components.trainer.executor.GenericExecutor"
    }
  }
}
custom_driver_specs {
  key: "CsvExampleGen"
  value {
    python_class_executable_spec {
      class_path: "tfx.components.example_gen.driver.FileBasedDriver"
    }
  }
}
metadata_connection_config {
  database_connection_config {
    sqlite {
      filename_uri: "metadata/TFRS-ranking/metadata.db"
      connection_mode: READWRITE_OPENCREATE
    }
  }
}

INFO:absl:Using connection config:
 sqlite {
  filename_uri: "metadata/TFRS-ranking/metadata.db"
  connection_mode: READWRITE_OPENCREATE
}

INFO:absl:Component CsvExampleGen is running.
INFO:absl:Running launcher for node_info {
  type {
    name: "tfx.components.example_gen.csv_example_gen.component.CsvExampleGen"
  }
  id: "CsvExampleGen"
}
contexts {
  contexts {
    type {
      name: "pipeline"
    }
    name {
      field_value {
        string_value: "TFRS-ranking"
      }
    }
  }
  contexts {
    type {
      name: "pipeline_run"
    }
    name {
      field_value {
        string_value: "2022-03-30T11:15:20.501436"
      }
    }
  }
  contexts {
    type {
      name: "node"
    }
    name {
      field_value {
        string_value: "TFRS-ranking.CsvExampleGen"
      }
    }
  }
}
outputs {
  outputs {
    key: "examples"
    value {
      artifact_spec {
        type {
          name: "Examples"
          properties {
            key: "span"
            value: INT
          }
          properties {
            key: "split_names"
            value: STRING
          }
          properties {
            key: "version"
            value: INT
          }
          base_type: DATASET
        }
      }
    }
  }
}
parameters {
  parameters {
    key: "input_base"
    value {
      field_value {
        string_value: "data/TFRS-ranking"
      }
    }
  }
  parameters {
    key: "input_config"
    value {
      field_value {
        string_value: "{\n  \"splits\": [\n    {\n      \"name\": \"single_split\",\n      \"pattern\": \"*\"\n    }\n  ]\n}"
      }
    }
  }
  parameters {
    key: "output_config"
    value {
      field_value {
        string_value: "{\n  \"split_config\": {\n    \"splits\": [\n      {\n        \"hash_buckets\": 2,\n        \"name\": \"train\"\n      },\n      {\n        \"hash_buckets\": 1,\n        \"name\": \"eval\"\n      }\n    ]\n  }\n}"
      }
    }
  }
  parameters {
    key: "output_data_format"
    value {
      field_value {
        int_value: 6
      }
    }
  }
  parameters {
    key: "output_file_format"
    value {
      field_value {
        int_value: 5
      }
    }
  }
}
downstream_nodes: "Trainer"
execution_options {
  caching_options {
  }
}

INFO:absl:MetadataStore with DB connection initialized
INFO:absl:select span and version = (0, None)
INFO:absl:latest span and version = (0, None)
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Going to run a new execution 1
INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=1, input_dict={}, output_dict=defaultdict(<class 'list'>, {'examples': [Artifact(artifact: uri: "pipelines/TFRS-ranking/CsvExampleGen/examples/1"
custom_properties {
  key: "input_fingerprint"
  value {
    string_value: "split:single_split,num_files:1,total_bytes:1979205,xor_checksum:1648638918,sum_checksum:1648638918"
  }
}
custom_properties {
  key: "name"
  value {
    string_value: "TFRS-ranking:2022-03-30T11:15:20.501436:CsvExampleGen:examples:0"
  }
}
custom_properties {
  key: "span"
  value {
    int_value: 0
  }
}
, artifact_type: name: "Examples"
properties {
  key: "span"
  value: INT
}
properties {
  key: "split_names"
  value: STRING
}
properties {
  key: "version"
  value: INT
}
base_type: DATASET
)]}), exec_properties={'input_config': '{\n  "splits": [\n    {\n      "name": "single_split",\n      "pattern": "*"\n    }\n  ]\n}', 'input_base': 'data/TFRS-ranking', 'output_file_format': 5, 'output_config': '{\n  "split_config": {\n    "splits": [\n      {\n        "hash_buckets": 2,\n        "name": "train"\n      },\n      {\n        "hash_buckets": 1,\n        "name": "eval"\n      }\n    ]\n  }\n}', 'output_data_format': 6, 'span': 0, 'version': None, 'input_fingerprint': 'split:single_split,num_files:1,total_bytes:1979205,xor_checksum:1648638918,sum_checksum:1648638918'}, execution_output_uri='pipelines/TFRS-ranking/CsvExampleGen/.system/executor_execution/1/executor_output.pb', stateful_working_dir='pipelines/TFRS-ranking/CsvExampleGen/.system/stateful_working_dir/2022-03-30T11:15:20.501436', tmp_dir='pipelines/TFRS-ranking/CsvExampleGen/.system/executor_execution/1/.temp/', pipeline_node=node_info {
  type {
    name: "tfx.components.example_gen.csv_example_gen.component.CsvExampleGen"
  }
  id: "CsvExampleGen"
}
contexts {
  contexts {
    type {
      name: "pipeline"
    }
    name {
      field_value {
        string_value: "TFRS-ranking"
      }
    }
  }
  contexts {
    type {
      name: "pipeline_run"
    }
    name {
      field_value {
        string_value: "2022-03-30T11:15:20.501436"
      }
    }
  }
  contexts {
    type {
      name: "node"
    }
    name {
      field_value {
        string_value: "TFRS-ranking.CsvExampleGen"
      }
    }
  }
}
outputs {
  outputs {
    key: "examples"
    value {
      artifact_spec {
        type {
          name: "Examples"
          properties {
            key: "span"
            value: INT
          }
          properties {
            key: "split_names"
            value: STRING
          }
          properties {
            key: "version"
            value: INT
          }
          base_type: DATASET
        }
      }
    }
  }
}
parameters {
  parameters {
    key: "input_base"
    value {
      field_value {
        string_value: "data/TFRS-ranking"
      }
    }
  }
  parameters {
    key: "input_config"
    value {
      field_value {
        string_value: "{\n  \"splits\": [\n    {\n      \"name\": \"single_split\",\n      \"pattern\": \"*\"\n    }\n  ]\n}"
      }
    }
  }
  parameters {
    key: "output_config"
    value {
      field_value {
        string_value: "{\n  \"split_config\": {\n    \"splits\": [\n      {\n        \"hash_buckets\": 2,\n        \"name\": \"train\"\n      },\n      {\n        \"hash_buckets\": 1,\n        \"name\": \"eval\"\n      }\n    ]\n  }\n}"
      }
    }
  }
  parameters {
    key: "output_data_format"
    value {
      field_value {
        int_value: 6
      }
    }
  }
  parameters {
    key: "output_file_format"
    value {
      field_value {
        int_value: 5
      }
    }
  }
}
downstream_nodes: "Trainer"
execution_options {
  caching_options {
  }
}
, pipeline_info=id: "TFRS-ranking"
, pipeline_run_id='2022-03-30T11:15:20.501436')
INFO:absl:Generating examples.
writing top-level names to tfx_user_code_Trainer.egg-info/top_level.txt
writing manifest file 'tfx_user_code_Trainer.egg-info/SOURCES.txt'
reading manifest file 'tfx_user_code_Trainer.egg-info/SOURCES.txt'
writing manifest file 'tfx_user_code_Trainer.egg-info/SOURCES.txt'
Copying tfx_user_code_Trainer.egg-info to /tmp/tmp2408k6fv/tfx_user_code_Trainer-0.0+bc2b229701436196f12efdbb4e880e3fa05103599208d98a05325f064775f21c-py3.7.egg-info
running install_scripts
creating /tmp/tmp2408k6fv/tfx_user_code_Trainer-0.0+bc2b229701436196f12efdbb4e880e3fa05103599208d98a05325f064775f21c.dist-info/WHEEL
creating '/tmp/tmpiuuzrc80/tfx_user_code_Trainer-0.0+bc2b229701436196f12efdbb4e880e3fa05103599208d98a05325f064775f21c-py3-none-any.whl' and adding '/tmp/tmp2408k6fv' to it
adding 'tfrs_ranking_trainer.py'
adding 'tfx_user_code_Trainer-0.0+bc2b229701436196f12efdbb4e880e3fa05103599208d98a05325f064775f21c.dist-info/METADATA'
adding 'tfx_user_code_Trainer-0.0+bc2b229701436196f12efdbb4e880e3fa05103599208d98a05325f064775f21c.dist-info/WHEEL'
adding 'tfx_user_code_Trainer-0.0+bc2b229701436196f12efdbb4e880e3fa05103599208d98a05325f064775f21c.dist-info/top_level.txt'
adding 'tfx_user_code_Trainer-0.0+bc2b229701436196f12efdbb4e880e3fa05103599208d98a05325f064775f21c.dist-info/RECORD'
removing /tmp/tmp2408k6fv
WARNING:apache_beam.runners.interactive.interactive_environment:Dependencies required for Interactive Beam PCollection visualization are not available, please use: `pip install apache-beam[interactive]` to install necessary dependencies to enable all data visualization features.
INFO:absl:Processing input csv data data/TFRS-ranking/* to TFExample.
WARNING:root:Make sure that locally built Python SDK docker image has Python 3.7 interpreter.
E0330 11:15:21.224762835   30409 fork_posix.cc:70]           Fork support is only compatible with the epoll1 and poll polling strategies
WARNING:apache_beam.io.tfrecordio:Couldn't find python-snappy so the implementation of _TFRecordUtil._masked_crc32c is not as fast as it could be.
INFO:absl:Examples generated.
INFO:absl:Value type <class 'NoneType'> of key version in exec_properties is not supported, going to drop it
INFO:absl:Value type <class 'list'> of key _beam_pipeline_args in exec_properties is not supported, going to drop it
INFO:absl:Cleaning up stateless execution info.
INFO:absl:Execution 1 succeeded.
INFO:absl:Cleaning up stateful execution info.
INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'examples': [Artifact(artifact: uri: "pipelines/TFRS-ranking/CsvExampleGen/examples/1"
custom_properties {
  key: "input_fingerprint"
  value {
    string_value: "split:single_split,num_files:1,total_bytes:1979205,xor_checksum:1648638918,sum_checksum:1648638918"
  }
}
custom_properties {
  key: "name"
  value {
    string_value: "TFRS-ranking:2022-03-30T11:15:20.501436:CsvExampleGen:examples:0"
  }
}
custom_properties {
  key: "span"
  value {
    int_value: 0
  }
}
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.7.0"
  }
}
, artifact_type: name: "Examples"
properties {
  key: "span"
  value: INT
}
properties {
  key: "split_names"
  value: STRING
}
properties {
  key: "version"
  value: INT
}
base_type: DATASET
)]}) for execution 1
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Component CsvExampleGen is finished.
INFO:absl:Component Trainer is running.
INFO:absl:Running launcher for node_info {
  type {
    name: "tfx.components.trainer.component.Trainer"
    base_type: TRAIN
  }
  id: "Trainer"
}
contexts {
  contexts {
    type {
      name: "pipeline"
    }
    name {
      field_value {
        string_value: "TFRS-ranking"
      }
    }
  }
  contexts {
    type {
      name: "pipeline_run"
    }
    name {
      field_value {
        string_value: "2022-03-30T11:15:20.501436"
      }
    }
  }
  contexts {
    type {
      name: "node"
    }
    name {
      field_value {
        string_value: "TFRS-ranking.Trainer"
      }
    }
  }
}
inputs {
  inputs {
    key: "examples"
    value {
      channels {
        producer_node_query {
          id: "CsvExampleGen"
        }
        context_queries {
          type {
            name: "pipeline"
          }
          name {
            field_value {
              string_value: "TFRS-ranking"
            }
          }
        }
        context_queries {
          type {
            name: "pipeline_run"
          }
          name {
            field_value {
              string_value: "2022-03-30T11:15:20.501436"
            }
          }
        }
        context_queries {
          type {
            name: "node"
          }
          name {
            field_value {
              string_value: "TFRS-ranking.CsvExampleGen"
            }
          }
        }
        artifact_query {
          type {
            name: "Examples"
            base_type: DATASET
          }
        }
        output_key: "examples"
      }
      min_count: 1
    }
  }
}
outputs {
  outputs {
    key: "model"
    value {
      artifact_spec {
        type {
          name: "Model"
          base_type: MODEL
        }
      }
    }
  }
  outputs {
    key: "model_run"
    value {
      artifact_spec {
        type {
          name: "ModelRun"
        }
      }
    }
  }
}
parameters {
  parameters {
    key: "custom_config"
    value {
      field_value {
        string_value: "null"
      }
    }
  }
  parameters {
    key: "eval_args"
    value {
      field_value {
        string_value: "{\n  \"num_steps\": 24\n}"
      }
    }
  }
  parameters {
    key: "module_path"
    value {
      field_value {
        string_value: "tfrs_ranking_trainer@pipelines/TFRS-ranking/_wheels/tfx_user_code_Trainer-0.0+bc2b229701436196f12efdbb4e880e3fa05103599208d98a05325f064775f21c-py3-none-any.whl"
      }
    }
  }
  parameters {
    key: "train_args"
    value {
      field_value {
        string_value: "{\n  \"num_steps\": 12\n}"
      }
    }
  }
}
upstream_nodes: "CsvExampleGen"
downstream_nodes: "Pusher"
execution_options {
  caching_options {
  }
}

INFO:absl:MetadataStore with DB connection initialized
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Going to run a new execution 2
INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=2, input_dict={'examples': [Artifact(artifact: id: 1
type_id: 15
uri: "pipelines/TFRS-ranking/CsvExampleGen/examples/1"
properties {
  key: "split_names"
  value {
    string_value: "[\"train\", \"eval\"]"
  }
}
custom_properties {
  key: "file_format"
  value {
    string_value: "tfrecords_gzip"
  }
}
custom_properties {
  key: "input_fingerprint"
  value {
    string_value: "split:single_split,num_files:1,total_bytes:1979205,xor_checksum:1648638918,sum_checksum:1648638918"
  }
}
custom_properties {
  key: "name"
  value {
    string_value: "TFRS-ranking:2022-03-30T11:15:20.501436:CsvExampleGen:examples:0"
  }
}
custom_properties {
  key: "payload_format"
  value {
    string_value: "FORMAT_TF_EXAMPLE"
  }
}
custom_properties {
  key: "span"
  value {
    int_value: 0
  }
}
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.7.0"
  }
}
state: LIVE
create_time_since_epoch: 1648638937044
last_update_time_since_epoch: 1648638937044
, artifact_type: id: 15
name: "Examples"
properties {
  key: "span"
  value: INT
}
properties {
  key: "split_names"
  value: STRING
}
properties {
  key: "version"
  value: INT
}
base_type: DATASET
)]}, output_dict=defaultdict(<class 'list'>, {'model_run': [Artifact(artifact: uri: "pipelines/TFRS-ranking/Trainer/model_run/2"
custom_properties {
  key: "name"
  value {
    string_value: "TFRS-ranking:2022-03-30T11:15:20.501436:Trainer:model_run:0"
  }
}
, artifact_type: name: "ModelRun"
)], 'model': [Artifact(artifact: uri: "pipelines/TFRS-ranking/Trainer/model/2"
custom_properties {
  key: "name"
  value {
    string_value: "TFRS-ranking:2022-03-30T11:15:20.501436:Trainer:model:0"
  }
}
, artifact_type: name: "Model"
base_type: MODEL
)]}), exec_properties={'eval_args': '{\n  "num_steps": 24\n}', 'custom_config': 'null', 'train_args': '{\n  "num_steps": 12\n}', 'module_path': 'tfrs_ranking_trainer@pipelines/TFRS-ranking/_wheels/tfx_user_code_Trainer-0.0+bc2b229701436196f12efdbb4e880e3fa05103599208d98a05325f064775f21c-py3-none-any.whl'}, execution_output_uri='pipelines/TFRS-ranking/Trainer/.system/executor_execution/2/executor_output.pb', stateful_working_dir='pipelines/TFRS-ranking/Trainer/.system/stateful_working_dir/2022-03-30T11:15:20.501436', tmp_dir='pipelines/TFRS-ranking/Trainer/.system/executor_execution/2/.temp/', pipeline_node=node_info {
  type {
    name: "tfx.components.trainer.component.Trainer"
    base_type: TRAIN
  }
  id: "Trainer"
}
contexts {
  contexts {
    type {
      name: "pipeline"
    }
    name {
      field_value {
        string_value: "TFRS-ranking"
      }
    }
  }
  contexts {
    type {
      name: "pipeline_run"
    }
    name {
      field_value {
        string_value: "2022-03-30T11:15:20.501436"
      }
    }
  }
  contexts {
    type {
      name: "node"
    }
    name {
      field_value {
        string_value: "TFRS-ranking.Trainer"
      }
    }
  }
}
inputs {
  inputs {
    key: "examples"
    value {
      channels {
        producer_node_query {
          id: "CsvExampleGen"
        }
        context_queries {
          type {
            name: "pipeline"
          }
          name {
            field_value {
              string_value: "TFRS-ranking"
            }
          }
        }
        context_queries {
          type {
            name: "pipeline_run"
          }
          name {
            field_value {
              string_value: "2022-03-30T11:15:20.501436"
            }
          }
        }
        context_queries {
          type {
            name: "node"
          }
          name {
            field_value {
              string_value: "TFRS-ranking.CsvExampleGen"
            }
          }
        }
        artifact_query {
          type {
            name: "Examples"
            base_type: DATASET
          }
        }
        output_key: "examples"
      }
      min_count: 1
    }
  }
}
outputs {
  outputs {
    key: "model"
    value {
      artifact_spec {
        type {
          name: "Model"
          base_type: MODEL
        }
      }
    }
  }
  outputs {
    key: "model_run"
    value {
      artifact_spec {
        type {
          name: "ModelRun"
        }
      }
    }
  }
}
parameters {
  parameters {
    key: "custom_config"
    value {
      field_value {
        string_value: "null"
      }
    }
  }
  parameters {
    key: "eval_args"
    value {
      field_value {
        string_value: "{\n  \"num_steps\": 24\n}"
      }
    }
  }
  parameters {
    key: "module_path"
    value {
      field_value {
        string_value: "tfrs_ranking_trainer@pipelines/TFRS-ranking/_wheels/tfx_user_code_Trainer-0.0+bc2b229701436196f12efdbb4e880e3fa05103599208d98a05325f064775f21c-py3-none-any.whl"
      }
    }
  }
  parameters {
    key: "train_args"
    value {
      field_value {
        string_value: "{\n  \"num_steps\": 12\n}"
      }
    }
  }
}
upstream_nodes: "CsvExampleGen"
downstream_nodes: "Pusher"
execution_options {
  caching_options {
  }
}
, pipeline_info=id: "TFRS-ranking"
, pipeline_run_id='2022-03-30T11:15:20.501436')
INFO:absl:Train on the 'train' split when train_args.splits is not set.
INFO:absl:Evaluate on the 'eval' split when eval_args.splits is not set.
INFO:absl:udf_utils.get_fn {'eval_args': '{\n  "num_steps": 24\n}', 'custom_config': 'null', 'train_args': '{\n  "num_steps": 12\n}', 'module_path': 'tfrs_ranking_trainer@pipelines/TFRS-ranking/_wheels/tfx_user_code_Trainer-0.0+bc2b229701436196f12efdbb4e880e3fa05103599208d98a05325f064775f21c-py3-none-any.whl'} 'run_fn'
INFO:absl:Installing 'pipelines/TFRS-ranking/_wheels/tfx_user_code_Trainer-0.0+bc2b229701436196f12efdbb4e880e3fa05103599208d98a05325f064775f21c-py3-none-any.whl' to a temporary directory.
INFO:absl:Executing: ['/tmpfs/src/tf_docs_env/bin/python', '-m', 'pip', 'install', '--target', '/tmp/tmpr_mts7cq', 'pipelines/TFRS-ranking/_wheels/tfx_user_code_Trainer-0.0+bc2b229701436196f12efdbb4e880e3fa05103599208d98a05325f064775f21c-py3-none-any.whl']
E0330 11:15:37.085941266   30409 fork_posix.cc:70]           Fork support is only compatible with the epoll1 and poll polling strategies
Processing ./pipelines/TFRS-ranking/_wheels/tfx_user_code_Trainer-0.0+bc2b229701436196f12efdbb4e880e3fa05103599208d98a05325f064775f21c-py3-none-any.whl
INFO:absl:Successfully installed 'pipelines/TFRS-ranking/_wheels/tfx_user_code_Trainer-0.0+bc2b229701436196f12efdbb4e880e3fa05103599208d98a05325f064775f21c-py3-none-any.whl'.
INFO:absl:Training model.
INFO:absl:Feature movieId has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature rating has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature userId has a shape dim {
  size: 1
}
. Setting to DenseTensor.
Installing collected packages: tfx-user-code-Trainer
Successfully installed tfx-user-code-Trainer-0.0+bc2b229701436196f12efdbb4e880e3fa05103599208d98a05325f064775f21c
INFO:absl:Feature movieId has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature rating has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature userId has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature movieId has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature rating has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature userId has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature movieId has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature rating has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature userId has a shape dim {
  size: 1
}
. Setting to DenseTensor.
Epoch 1/3
12/12 [==============================] - 4s 227ms/step - root_mean_squared_error: 2.0349 - loss: 3.9171 - regularization_loss: 0.0000e+00 - total_loss: 3.9171 - val_root_mean_squared_error: 1.1201 - val_loss: 1.2280 - val_regularization_loss: 0.0000e+00 - val_total_loss: 1.2280
Epoch 2/3
12/12 [==============================] - 2s 191ms/step - root_mean_squared_error: 1.1129 - loss: 1.2377 - regularization_loss: 0.0000e+00 - total_loss: 1.2377 - val_root_mean_squared_error: 1.1100 - val_loss: 1.2510 - val_regularization_loss: 0.0000e+00 - val_total_loss: 1.2510
Epoch 3/3
12/12 [==============================] - 2s 204ms/step - root_mean_squared_error: 1.1065 - loss: 1.2216 - regularization_loss: 0.0000e+00 - total_loss: 1.2216 - val_root_mean_squared_error: 1.0965 - val_loss: 1.2165 - val_regularization_loss: 0.0000e+00 - val_total_loss: 1.2165
WARNING:absl:Function `_wrapped_model` contains input name(s) movieId, userId with unsupported characters which will be renamed to movieid, userid in the SavedModel.
2022-03-30 11:15:49.914079: W tensorflow/python/util/util.cc:368] Sets are not currently considered sequences, but this may change in the future, so consider avoiding using them.
WARNING:absl:Found untraced functions such as ranking_layer_call_fn, ranking_layer_call_and_return_conditional_losses while saving (showing 2 of 2). These functions will not be directly callable after loading.
INFO:tensorflow:Assets written to: pipelines/TFRS-ranking/Trainer/model/2/Format-Serving/assets
INFO:tensorflow:Assets written to: pipelines/TFRS-ranking/Trainer/model/2/Format-Serving/assets
INFO:absl:Training complete. Model written to pipelines/TFRS-ranking/Trainer/model/2/Format-Serving. ModelRun written to pipelines/TFRS-ranking/Trainer/model_run/2
INFO:absl:Cleaning up stateless execution info.
INFO:absl:Execution 2 succeeded.
INFO:absl:Cleaning up stateful execution info.
INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'model_run': [Artifact(artifact: uri: "pipelines/TFRS-ranking/Trainer/model_run/2"
custom_properties {
  key: "name"
  value {
    string_value: "TFRS-ranking:2022-03-30T11:15:20.501436:Trainer:model_run:0"
  }
}
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.7.0"
  }
}
, artifact_type: name: "ModelRun"
)], 'model': [Artifact(artifact: uri: "pipelines/TFRS-ranking/Trainer/model/2"
custom_properties {
  key: "name"
  value {
    string_value: "TFRS-ranking:2022-03-30T11:15:20.501436:Trainer:model:0"
  }
}
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.7.0"
  }
}
, artifact_type: name: "Model"
base_type: MODEL
)]}) for execution 2
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Component Trainer is finished.
INFO:absl:Component Pusher is running.
INFO:absl:Running launcher for node_info {
  type {
    name: "tfx.components.pusher.component.Pusher"
    base_type: DEPLOY
  }
  id: "Pusher"
}
contexts {
  contexts {
    type {
      name: "pipeline"
    }
    name {
      field_value {
        string_value: "TFRS-ranking"
      }
    }
  }
  contexts {
    type {
      name: "pipeline_run"
    }
    name {
      field_value {
        string_value: "2022-03-30T11:15:20.501436"
      }
    }
  }
  contexts {
    type {
      name: "node"
    }
    name {
      field_value {
        string_value: "TFRS-ranking.Pusher"
      }
    }
  }
}
inputs {
  inputs {
    key: "model"
    value {
      channels {
        producer_node_query {
          id: "Trainer"
        }
        context_queries {
          type {
            name: "pipeline"
          }
          name {
            field_value {
              string_value: "TFRS-ranking"
            }
          }
        }
        context_queries {
          type {
            name: "pipeline_run"
          }
          name {
            field_value {
              string_value: "2022-03-30T11:15:20.501436"
            }
          }
        }
        context_queries {
          type {
            name: "node"
          }
          name {
            field_value {
              string_value: "TFRS-ranking.Trainer"
            }
          }
        }
        artifact_query {
          type {
            name: "Model"
            base_type: MODEL
          }
        }
        output_key: "model"
      }
    }
  }
}
outputs {
  outputs {
    key: "pushed_model"
    value {
      artifact_spec {
        type {
          name: "PushedModel"
          base_type: MODEL
        }
      }
    }
  }
}
parameters {
  parameters {
    key: "custom_config"
    value {
      field_value {
        string_value: "null"
      }
    }
  }
  parameters {
    key: "push_destination"
    value {
      field_value {
        string_value: "{\n  \"filesystem\": {\n    \"base_directory\": \"serving_model/TFRS-ranking\"\n  }\n}"
      }
    }
  }
}
upstream_nodes: "Trainer"
execution_options {
  caching_options {
  }
}

INFO:absl:MetadataStore with DB connection initialized
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Going to run a new execution 3
INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=3, input_dict={'model': [Artifact(artifact: id: 3
type_id: 18
uri: "pipelines/TFRS-ranking/Trainer/model/2"
custom_properties {
  key: "name"
  value {
    string_value: "TFRS-ranking:2022-03-30T11:15:20.501436:Trainer:model:0"
  }
}
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.7.0"
  }
}
state: LIVE
create_time_since_epoch: 1648638950928
last_update_time_since_epoch: 1648638950928
, artifact_type: id: 18
name: "Model"
base_type: MODEL
)]}, output_dict=defaultdict(<class 'list'>, {'pushed_model': [Artifact(artifact: uri: "pipelines/TFRS-ranking/Pusher/pushed_model/3"
custom_properties {
  key: "name"
  value {
    string_value: "TFRS-ranking:2022-03-30T11:15:20.501436:Pusher:pushed_model:0"
  }
}
, artifact_type: name: "PushedModel"
base_type: MODEL
)]}), exec_properties={'push_destination': '{\n  "filesystem": {\n    "base_directory": "serving_model/TFRS-ranking"\n  }\n}', 'custom_config': 'null'}, execution_output_uri='pipelines/TFRS-ranking/Pusher/.system/executor_execution/3/executor_output.pb', stateful_working_dir='pipelines/TFRS-ranking/Pusher/.system/stateful_working_dir/2022-03-30T11:15:20.501436', tmp_dir='pipelines/TFRS-ranking/Pusher/.system/executor_execution/3/.temp/', pipeline_node=node_info {
  type {
    name: "tfx.components.pusher.component.Pusher"
    base_type: DEPLOY
  }
  id: "Pusher"
}
contexts {
  contexts {
    type {
      name: "pipeline"
    }
    name {
      field_value {
        string_value: "TFRS-ranking"
      }
    }
  }
  contexts {
    type {
      name: "pipeline_run"
    }
    name {
      field_value {
        string_value: "2022-03-30T11:15:20.501436"
      }
    }
  }
  contexts {
    type {
      name: "node"
    }
    name {
      field_value {
        string_value: "TFRS-ranking.Pusher"
      }
    }
  }
}
inputs {
  inputs {
    key: "model"
    value {
      channels {
        producer_node_query {
          id: "Trainer"
        }
        context_queries {
          type {
            name: "pipeline"
          }
          name {
            field_value {
              string_value: "TFRS-ranking"
            }
          }
        }
        context_queries {
          type {
            name: "pipeline_run"
          }
          name {
            field_value {
              string_value: "2022-03-30T11:15:20.501436"
            }
          }
        }
        context_queries {
          type {
            name: "node"
          }
          name {
            field_value {
              string_value: "TFRS-ranking.Trainer"
            }
          }
        }
        artifact_query {
          type {
            name: "Model"
            base_type: MODEL
          }
        }
        output_key: "model"
      }
    }
  }
}
outputs {
  outputs {
    key: "pushed_model"
    value {
      artifact_spec {
        type {
          name: "PushedModel"
          base_type: MODEL
        }
      }
    }
  }
}
parameters {
  parameters {
    key: "custom_config"
    value {
      field_value {
        string_value: "null"
      }
    }
  }
  parameters {
    key: "push_destination"
    value {
      field_value {
        string_value: "{\n  \"filesystem\": {\n    \"base_directory\": \"serving_model/TFRS-ranking\"\n  }\n}"
      }
    }
  }
}
upstream_nodes: "Trainer"
execution_options {
  caching_options {
  }
}
, pipeline_info=id: "TFRS-ranking"
, pipeline_run_id='2022-03-30T11:15:20.501436')
WARNING:absl:Pusher is going to push the model without validation. Consider using Evaluator or InfraValidator in your pipeline.
INFO:absl:Model version: 1648638950
INFO:absl:Model written to serving path serving_model/TFRS-ranking/1648638950.
INFO:absl:Model pushed to pipelines/TFRS-ranking/Pusher/pushed_model/3.
INFO:absl:Cleaning up stateless execution info.
INFO:absl:Execution 3 succeeded.
INFO:absl:Cleaning up stateful execution info.
INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'pushed_model': [Artifact(artifact: uri: "pipelines/TFRS-ranking/Pusher/pushed_model/3"
custom_properties {
  key: "name"
  value {
    string_value: "TFRS-ranking:2022-03-30T11:15:20.501436:Pusher:pushed_model:0"
  }
}
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.7.0"
  }
}
, artifact_type: name: "PushedModel"
base_type: MODEL
)]}) for execution 3
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Component Pusher is finished.

You should see "INFO:absl:Component Pusher is finished." at the end of the logs if the pipeline finished successfully. Because Pusher component is the last component of the pipeline.

The pusher component pushes the trained model to the SERVING_MODEL_DIR which is the serving_model/TFRS-ranking directory if you did not change the variables in the previous steps. You can see the result from the file browser in the left-side panel in Colab, or using the following command:

# List files in created model directory.
ls -R {SERVING_MODEL_DIR}
serving_model/TFRS-ranking:
1648638950

serving_model/TFRS-ranking/1648638950:
assets  keras_metadata.pb  saved_model.pb  variables

serving_model/TFRS-ranking/1648638950/assets:

serving_model/TFRS-ranking/1648638950/variables:
variables.data-00000-of-00001  variables.index
E0330 11:15:51.002207297   30409 fork_posix.cc:70]           Fork support is only compatible with the epoll1 and poll polling strategies

Now we can test the ranking model by computing predictions for a user and a movie:

import glob
# Load the latest model for testing
loaded = tf.saved_model.load(max(glob.glob(os.path.join(SERVING_MODEL_DIR, '*/')), key=os.path.getmtime))
print(loaded({'userId': [[42]], 'movieId': [[15]]}).numpy())
[[[3.7477503]]]

This concludes the TensorFlow Recommenders + TFX tutorial.