Introduction au pipeline de classement TensorFlow

TL;DR : Réduire le code passe-partout pour créer, former et servir des modèles TensorFlow Ranking avec TensorFlow Ranking Pipelines ; Utilisez des stratégies distribuées appropriées pour les applications de classement à grande échelle, en fonction du cas d'utilisation et des ressources.

Introduction

TensorFlow Ranking Pipeline se compose d'une série de processus de traitement de données, de création de modèles, de formation et de diffusion qui vous permettent de construire, de former et de diffuser des modèles de classement évolutifs basés sur un réseau neuronal à partir de journaux de données avec un minimum d'efforts. Le pipeline est plus efficace lorsque le système évolue. En général, si l'exécution de votre modèle prend 10 minutes ou plus sur une seule machine, envisagez d'utiliser cette infrastructure de pipeline pour répartir la charge et accélérer le traitement.

Le pipeline de classement TensorFlow a été exécuté de manière constante et stable dans le cadre d'expériences et de productions à grande échelle avec du Big Data (plus de téraoctets) et de gros modèles (plus de 100 millions de FLOP) sur des systèmes distribués (1 000 CPU et plus et plus de 100 GPU et TPU). Une fois qu'un modèle TensorFlow est éprouvé avec model.fit sur une petite partie des données, le pipeline est recommandé pour l'analyse des hyperparamètres, la formation continue et d'autres situations à grande échelle.

Pipeline de classement

Dans TensorFlow, un pipeline typique pour créer, former et servir un modèle de classement comprend les étapes typiques suivantes.

  • Définir la structure du modèle :
    • Créer des entrées ;
    • Créer des couches de prétraitement ;
    • Créer une architecture de réseau neuronal ;
  • Modèle de train :
    • Générer des ensembles de données d'entraînement et de validation à partir des journaux de données ;
    • Préparez le modèle avec les hyper-paramètres appropriés :
      • Optimiseur ;
      • Pertes de classement ;
      • Mesures de classement ;
    • Configurez des stratégies distribuées pour vous entraîner sur plusieurs appareils.
    • Configurez les rappels pour diverses comptabilités.
    • Modèle d'exportation pour le service ;
  • Servir le modèle :
    • Déterminer le format des données au moment de la diffusion ;
    • Choisissez et chargez le modèle formé ;
    • Processus avec le modèle chargé.

L'un des principaux objectifs du pipeline TensorFlow Ranking est de réduire le code passe-partout dans les étapes, telles que le chargement et le prétraitement des ensembles de données, la compatibilité des données par liste et de la fonction de notation ponctuelle, ainsi que l'exportation du modèle. L'autre objectif important est d'imposer la conception cohérente de nombreux processus intrinsèquement corrélés, par exemple, les entrées du modèle doivent être compatibles à la fois avec les ensembles de données d'entraînement et le format des données au moment de la diffusion.

Guide d'utilisation

Avec toute la conception ci-dessus, le lancement d'un modèle de classement TF se déroule selon les étapes suivantes, comme le montre la figure 1.

Schéma du pipeline de classement TensorFlow
Figure 1 : Diagramme des classes TensorFlow Ranking et étapes pour entraîner des modèles de classement avec le pipeline TF Ranking. Les modules verts peuvent être personnalisés pour votre modèle de classement.

Exemple utilisant un réseau de neurones distribué

Dans cet exemple, vous exploiterez les tfr.keras.model.FeatureSpecInputCreator , tfr.keras.pipeline.SimpleDatasetBuilder et tfr.keras.pipeline.SimplePipeline intégrés qui intègrent feature_spec pour définir de manière cohérente les fonctionnalités d'entrée dans les entrées du modèle et serveur de jeux de données. La version notebook avec une procédure pas à pas peut être trouvée dans le didacticiel de classement distribué .

Définissez d’abord les feature_spec pour les fonctionnalités de contexte et d’exemple.

context_feature_spec = {}
example_feature_spec = {
    'custom_features_{}'.format(i + 1):
    tf.io.FixedLenFeature(shape=(1,), dtype=tf.float32, default_value=0.0)
    for i in range(10)
}
label_spec = ('utility', tf.io.FixedLenFeature(
    shape=(1,), dtype=tf.float32, default_value=-1))

Suivez les étapes illustrées dans la figure 1 :
Définissez input_creator à partir de feature_spec s.

input_creator = tfr.keras.model.FeatureSpecInputCreator(
    context_feature_spec, example_feature_spec)

Définissez ensuite les transformations d’entités de prétraitement pour le même ensemble d’entités en entrée.

def log1p(tensor):
    return tf.math.log1p(tensor * tf.sign(tensor)) * tf.sign(tensor)
preprocessor = {
    'custom_features_{}'.format(i + 1): log1p
    for i in range(10)
}

Définissez un marqueur avec le modèle DNN à rétroaction intégré.

dnn_scorer = tfr.keras.model.DNNScorer(
    hidden_layer_dims=[1024, 512, 256],
    output_units=1,
    activation=tf.nn.relu,
    use_batch_norm=True,
    batch_norm_moment=0.99,
    dropout=0.4)

Créez le model_builder avec input_creator , preprocessor et scorer .

model_builder = tfr.keras.model.ModelBuilder(
    input_creator=input_creator,
    preprocessor=preprocessor,
    scorer=dnn_scorer,
    mask_feature_name='__list_mask__',
    name='web30k_dnn_model')

Définissez maintenant les hyperparamètres pour dataset_builder .

dataset_hparams = tfr.keras.pipeline.DatasetHparams(
    train_input_pattern='/path/to/MSLR-WEB30K-ELWC/train-*',
    valid_input_pattern='/path/to/MSLR-WEB30K-ELWC/vali-*',
    train_batch_size=128,
    valid_batch_size=128,
    list_size=200,
    dataset_reader=tf.data.RecordIODataset,
    convert_labels_to_binary=False)

Créez le dataset_builder .

tfr.keras.pipeline.SimpleDatasetBuilder(
    context_feature_spec=context_feature_spec,
    example_feature_spec=example_feature_spec,
    mask_feature_name='__list_mask__',
    label_spec=label_spec,
    hparams=dataset_hparams)

Définissez également les hyperparamètres du pipeline.

pipeline_hparams = tfr.keras.pipeline.PipelineHparams(
    model_dir='/tmp/web30k_dnn_model',
    num_epochs=100,
    num_train_steps=100000,
    num_valid_steps=100,
    loss='softmax_loss',
    loss_reduction=tf.losses.Reduction.AUTO,
    optimizer='adam',
    learning_rate=0.0001,
    steps_per_execution=100,
    export_best_model=True,
    strategy='MirroredStrategy',
    tpu=None)

Créez le ranking_pipeline et entraînez-vous.

ranking_pipeline = tfr.keras.pipeline.SimplePipeline(
    model_builder=model_builder,
    dataset_builder=dataset_builder,
    hparams=pipeline_hparams,
)
ranking_pipeline.train_and_validate()

Conception du pipeline de classement TensorFlow

Le pipeline de classement TensorFlow permet de gagner du temps d'ingénierie grâce au code passe-partout, tout en offrant une flexibilité de personnalisation grâce au remplacement et au sous-classement. Pour y parvenir, le pipeline introduit les classes personnalisables tfr.keras.model.AbstractModelBuilder , tfr.keras.pipeline.AbstractDatasetBuilder et tfr.keras.pipeline.AbstractPipeline pour configurer le pipeline TensorFlow Ranking.

Conception de classes de pipeline de classement TensorFlow
Figure 2 : Conception globale des classes du pipeline de classement TensorFlow.

Générateur de modèles

Le code passe-partout lié à la construction du modèle Keras est intégré dans AbstractModelBuilder , qui est transmis au AbstractPipeline et appelé à l'intérieur du pipeline pour créer le modèle dans le cadre de la stratégie. Ceci est illustré dans la figure 1. Les méthodes de classe sont définies dans la classe de base abstraite.

class AbstractModelBuilder:
  def __init__(self, mask_feature_name, name):

  @abstractmethod
  def create_inputs(self):
    // To create tf.keras.Input. Abstract method, to be overridden.
    ...
  @abstractmethod
  def preprocess(self, context_inputs, example_inputs, mask):
    // To preprocess input features. Abstract method, to be overridden.
    ...
  @abstractmethod
  def score(self, context_features, example_features, mask):
    // To score based on preprocessed features. Abstract method, to be overridden.
    ...
  def build(self):
    context_inputs, example_inputs, mask = self.create_inputs()
    context_features, example_features = self.preprocess(
        context_inputs, example_inputs, mask)
    logits = self.score(context_features, example_features, mask)
    return tf.keras.Model(inputs=..., outputs=logits, name=self._name)

Vous pouvez directement sous-classer AbstractModelBuilder et l'écraser avec les méthodes concrètes de personnalisation, comme

class MyModelBuilder(AbstractModelBuilder):
  def create_inputs(self, ...):
  ...

Dans le même temps, vous devez utiliser ModelBuilder avec les fonctionnalités d'entrée, les transformations de prétraitement et les fonctions de notation spécifiées comme entrées de fonction input_creator , preprocessor et scorer dans la classe init au lieu de sous-classer.

class ModelBuilder(AbstractModelBuilder):
  def __init__(self, input_creator, preprocessor, scorer, mask_feature_name, name):
  ...

Pour réduire les passe-partout liés à la création de ces entrées, les classes de fonctions tfr.keras.model.InputCreator pour input_creator , tfr.keras.model.Preprocessor pour preprocessor et tfr.keras.model.Scorer pour scorer sont fournies, ainsi que des sous-classes concrètes tfr.keras.model.FeatureSpecInputCreator , tfr.keras.model.TypeSpecInputCreator , tfr.keras.model.PreprocessorWithSpec , tfr.keras.model.UnivariateScorer , tfr.keras.model.DNNScorer et tfr.keras.model.GAMScorer . Ceux-ci devraient couvrir la plupart des cas d’utilisation courants.

Notez que ces classes de fonctions sont des classes Keras, aucune sérialisation n'est donc nécessaire. Le sous-classement est la méthode recommandée pour les personnaliser.

Générateur d'ensembles de données

La classe DatasetBuilder collecte un passe-partout lié aux ensembles de données. Les données sont transmises au Pipeline et appelées pour servir les ensembles de données de formation et de validation et pour définir les signatures de service pour les modèles enregistrés. Comme le montre la figure 1, les méthodes DatasetBuilder sont définies dans la classe de base tfr.keras.pipeline.AbstractDatasetBuilder ,

class AbstractDatasetBuilder:

  @abstractmethod
  def build_train_dataset(self, *arg, **kwargs):
    // To return the training dataset.
    ...
  @abstractmethod
  def build_valid_dataset(self, *arg, **kwargs):
    // To return the validation dataset.
    ...
  @abstractmethod
  def build_signatures(self, *arg, **kwargs):
    // To build the signatures to export saved model.
    ...

Dans une classe DatasetBuilder concrète, vous devez implémenter build_train_datasets , build_valid_datasets et build_signatures .

Une classe concrète qui crée des ensembles de données à partir de feature_spec s est également fournie :

class BaseDatasetBuilder(AbstractDatasetBuilder):

  def __init__(self, context_feature_spec, example_feature_spec,
               training_only_example_spec,
               mask_feature_name, hparams,
               training_only_context_spec=None):
    // Specify label and weight specs in training_only_example_spec.
    ...
  def _features_and_labels(self, features):
    // To split the labels and weights from input features.
    ...

  def _build_dataset(self, ...):
    return tfr.data.build_ranking_dataset(
        context_feature_spec+training_only_context_spec,
        example_feature_spec+training_only_example_spec, mask_feature_name, ...)

  def build_train_dataset(self):
    return self._build_dataset(...)

  def build_valid_dataset(self):
    return self._build_dataset(...)

  def build_signatures(self, model):
    return saved_model.Signatures(model, context_feature_spec,
                                  example_feature_spec, mask_feature_name)()

Les hparams utilisés dans DatasetBuilder sont spécifiés dans la classe de données tfr.keras.pipeline.DatasetHparams .

Pipeline

Le pipeline de classement est basé sur la classe tfr.keras.pipeline.AbstractPipeline :

class AbstractPipeline:

  @abstractmethod
  def build_loss(self):
    // Returns a tf.keras.losses.Loss or a dict of Loss. To be overridden.
    ...
  @abstractmethod
  def build_metrics(self):
    // Returns a list of evaluation metrics. To be overridden.
    ...
  @abstractmethod
  def build_weighted_metrics(self):
    // Returns a list of weighted metrics. To be overridden.
    ...
  @abstractmethod
  def train_and_validate(self, *arg, **kwargs):
    // Main function to run the training pipeline. To be overridden.
    ...

Une classe de pipeline concrète qui entraîne le modèle avec différents tf.distribute.strategy compatibles avec model.fit est également fournie :

class ModelFitPipeline(AbstractPipeline):

  def __init__(self, model_builder, dataset_builder, hparams):
    ...
  def build_callbacks(self):
    // Builds callbacks used in model.fit. Override for customized usage.
    ...
  def export_saved_model(self, model, export_to, checkpoint=None):
    if checkpoint:
      model.load_weights(checkpoint)
    model.save(export_to, signatures=dataset_builder.build_signatures(model))

  def train_and_validate(self, verbose=0):
    with self._strategy.scope():
      model = model_builder.build()
      model.compile(
          optimizer,
          loss=self.build_loss(),
          metrics=self.build_metrics(),
          loss_weights=self.hparams.loss_weights,
          weighted_metrics=self.build_weighted_metrics())
      train_dataset, valid_dataset = (
          dataset_builder.build_train_dataset(),
          dataset_builder.build_valid_dataset())
      model.fit(
          x=train_dataset,
          validation_data=valid_dataset,
          callbacks=self.build_callbacks(),
          verbose=verbose)
      self.export_saved_model(model, export_to=model_output_dir)

Les hparams utilisés dans tfr.keras.pipeline.ModelFitPipeline sont spécifiés dans la classe de données tfr.keras.pipeline.PipelineHparams . Cette classe ModelFitPipeline est suffisante pour la plupart des cas d’utilisation de TF Ranking. Les clients peuvent facilement le sous-classer à des fins spécifiques.

Prise en charge de la stratégie distribuée

Veuillez vous référer à la formation distribuée pour une introduction détaillée des stratégies distribuées prises en charge par TensorFlow. Actuellement, le pipeline TensorFlow Ranking prend en charge tf.distribute.MirroredStrategy (par défaut), tf.distribute.TPUStrategy , tf.distribute.MultiWorkerMirroredStrategy et tf.distribute.ParameterServerStrategy . La stratégie miroir est compatible avec la plupart des systèmes à machine unique. Veuillez définir strategy sur None pour aucune stratégie distribuée.

En général, MirroredStrategy fonctionne pour des modèles relativement petits sur la plupart des appareils dotés d'options CPU et GPU. MultiWorkerMirroredStrategy fonctionne pour les grands modèles qui ne tiennent pas dans un seul travailleur. ParameterServerStrategy effectue une formation asynchrone et nécessite la disponibilité de plusieurs travailleurs. TPUStrategy est idéal pour les gros modèles et le big data lorsque les TPU sont disponibles, cependant, il est moins flexible en termes de formes de tenseurs qu'il peut gérer.

FAQ

  1. L'ensemble minimal de composants pour utiliser le RankingPipeline
    Voir l'exemple de code ci-dessus.

  2. Et si j'ai mon propre model Keras
    Pour être formé avec les stratégies tf.distribute , model doit être construit avec toutes les variables entraînables définies sous stratégie.scope(). Alors enveloppez votre modèle dans ModelBuilder comme suit :

class MyModelBuilder(AbstractModelBuilder):
  def __init__(self, model, context_feature_names, example_feature_names,
               mask_feature_name, name):
    super().__init__(mask_feature_name, name)
    self._model = model
    self._context_feature_names = context_feature_names
    self._example_feature_names = example_feature_names

  def create_inputs(self):
    inputs = self._model.input
    context_inputs = {inputs[name] for name in self._context_feature_names}
    example_inputs = {inputs[name] for name in self._example_feature_names}
    mask = inputs[self._mask_feature_name]
    return context_inputs, example_inputs, mask

  def preprocess(self, context_inputs, example_inputs, mask):
    return context_inputs, example_inputs, mask

  def score(self, context_features, example_features, mask):
    inputs = dict(
        list(context_features.items()) + list(example_features.items()) +
        [(self._mask_feature_name, mask)])
    return self._model(inputs)

model_builder = MyModelBuilder(model, context_feature_names, example_feature_names,
                               mask_feature_name, "my_model")

Introduisez ensuite ce model_builder dans le pipeline pour une formation ultérieure.