自定义 Model.fit 的内容

在 TensorFlow.org 上查看 在 Google Colab 中运行 在 GitHub 上查看源代码 下载笔记本

简介

您在进行监督学习时可以使用 fit(),一切都可以顺利完成。

需要从头开始编写自己的训练循环时,您可以使用 GradientTape 并控制每个微小的细节。

但如果您需要自定义训练算法,又想从 fit() 的便捷功能(例如回调、内置分布支持或步骤融合)中受益,那么该怎么做?

Keras 的核心原则是渐进式呈现复杂性。您应当始终能够以渐进的方式习惯较低级别的工作流。如果高级功能并不完全符合您的用例,那么您就不应深陷其中。您应当能够从容地控制微小的细节,同时保留与之相称的高级便利性。

需要自定义 fit() 的功能时,您应重写 Model 类的训练步骤函数。此函数是 fit() 会针对每批次数据调用的函数。然后,您将能够像往常一样调用 fit(),它将运行您自己的学习算法。

请注意,此模式不会妨碍您使用函数式 API 构建模型。无论是构建 Sequential 模型、函数式 API 模型还是子类模型,均可采用这种模式。

让我们了解一下它的工作方式。

设置

需要 TensorFlow 2.2 或更高版本。

import tensorflow as tf
from tensorflow import keras
2022-12-14 21:58:38.950990: W tensorflow/compiler/xla/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libnvinfer.so.7'; dlerror: libnvinfer.so.7: cannot open shared object file: No such file or directory
2022-12-14 21:58:38.951077: W tensorflow/compiler/xla/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libnvinfer_plugin.so.7'; dlerror: libnvinfer_plugin.so.7: cannot open shared object file: No such file or directory
2022-12-14 21:58:38.951086: W tensorflow/compiler/tf2tensorrt/utils/py_utils.cc:38] TF-TRT Warning: Cannot dlopen some TensorRT libraries. If you would like to use Nvidia GPU with TensorRT, please make sure the missing libraries mentioned above are installed properly.

第一个简单的示例

让我们从一个简单的示例开始:

  • 创建一个将 keras.Model 子类化的新类。
  • 仅重写 train_step(self, data) 方法。
  • 返回一个将指标名称(包括损失)映射到其当前值的字典。

输入参数 data 是传递以拟合训练数据的数据:

  • 如果通过调用 fit(x, y, ...) 传递 Numpy 数组,则 data 将为元祖 (x, y)
  • 如果通过调用 fit(dataset, ...) 传递 tf.data.Dataset,则 data 将为每批次 dataset 产生的数据。

我们在 train_step 方法的主体中实现了定期的训练更新,类似于您已经熟悉的内容。重要的是,我们通过 self.compiled_loss 计算损失,它会封装传递给 compile() 的损失函数。

同样,我们调用 self.compiled_metrics.update_state(y, y_pred) 来更新在 compile() 中传递的指标的状态,并在最后从 self.metrics 中查询结果以检索其当前值。

class CustomModel(keras.Model):
    def train_step(self, data):
        # Unpack the data. Its structure depends on your model and
        # on what you pass to `fit()`.
        x, y = data

        with tf.GradientTape() as tape:
            y_pred = self(x, training=True)  # Forward pass
            # Compute the loss value
            # (the loss function is configured in `compile()`)
            loss = self.compiled_loss(y, y_pred, regularization_losses=self.losses)

        # Compute gradients
        trainable_vars = self.trainable_variables
        gradients = tape.gradient(loss, trainable_vars)
        # Update weights
        self.optimizer.apply_gradients(zip(gradients, trainable_vars))
        # Update metrics (includes the metric that tracks the loss)
        self.compiled_metrics.update_state(y, y_pred)
        # Return a dict mapping metric names to current value
        return {m.name: m.result() for m in self.metrics}

我们来试一下:

import numpy as np

# Construct and compile an instance of CustomModel
inputs = keras.Input(shape=(32,))
outputs = keras.layers.Dense(1)(inputs)
model = CustomModel(inputs, outputs)
model.compile(optimizer="adam", loss="mse", metrics=["mae"])

# Just use `fit` as usual
x = np.random.random((1000, 32))
y = np.random.random((1000, 1))
model.fit(x, y, epochs=3)
Epoch 1/3
32/32 [==============================] - 1s 2ms/step - loss: 1.8958 - mae: 1.2818
Epoch 2/3
32/32 [==============================] - 0s 2ms/step - loss: 0.8889 - mae: 0.8171
Epoch 3/3
32/32 [==============================] - 0s 2ms/step - loss: 0.4363 - mae: 0.5347
<keras.callbacks.History at 0x7f9f90038e20>

在更低级别上操作

当然,您可以直接跳过在 compile() 中传递损失函数,而在 train_step手动完成所有内容。指标也是如此。

以下是一个较低级别的示例,仅使用 compile() 配置优化器:

  • 我们从创建 Metric 实例以跟踪我们的损失和 MAE 得分开始。
  • 我们实现可更新这些指标状态(通过对指标调用 update_state())的自定义 train_step() ,然后对其进行查询(通过 result())以返回其当前平均值,由进度条显示并传递给任何回调。
  • 请注意,需要在每个周期之间对指标调用 reset_states()!否则,调用 result() 会返回自训练开始以来的平均值,但我们通常要使用的是每个周期的平均值。幸运的是,该框架可以帮助我们实现:只需在模型的 metrics 属性中列出要重置的任何指标。模型将在每个 fit() 周期开始时或在开始调用 evaluate() 时对其中列出的任何对象调用 reset_states()
loss_tracker = keras.metrics.Mean(name="loss")
mae_metric = keras.metrics.MeanAbsoluteError(name="mae")


class CustomModel(keras.Model):
    def train_step(self, data):
        x, y = data

        with tf.GradientTape() as tape:
            y_pred = self(x, training=True)  # Forward pass
            # Compute our own loss
            loss = keras.losses.mean_squared_error(y, y_pred)

        # Compute gradients
        trainable_vars = self.trainable_variables
        gradients = tape.gradient(loss, trainable_vars)

        # Update weights
        self.optimizer.apply_gradients(zip(gradients, trainable_vars))

        # Compute our own metrics
        loss_tracker.update_state(loss)
        mae_metric.update_state(y, y_pred)
        return {"loss": loss_tracker.result(), "mae": mae_metric.result()}

    @property
    def metrics(self):
        # We list our `Metric` objects here so that `reset_states()` can be
        # called automatically at the start of each epoch
        # or at the start of `evaluate()`.
        # If you don't implement this property, you have to call
        # `reset_states()` yourself at the time of your choosing.
        return [loss_tracker, mae_metric]


# Construct an instance of CustomModel
inputs = keras.Input(shape=(32,))
outputs = keras.layers.Dense(1)(inputs)
model = CustomModel(inputs, outputs)

# We don't passs a loss or metrics here.
model.compile(optimizer="adam")

# Just use `fit` as usual -- you can use callbacks, etc.
x = np.random.random((1000, 32))
y = np.random.random((1000, 1))
model.fit(x, y, epochs=5)
Epoch 1/5
32/32 [==============================] - 0s 2ms/step - loss: 0.2442 - mae: 0.3902
Epoch 2/5
32/32 [==============================] - 0s 2ms/step - loss: 0.2286 - mae: 0.3773
Epoch 3/5
32/32 [==============================] - 0s 2ms/step - loss: 0.2163 - mae: 0.3672
Epoch 4/5
32/32 [==============================] - 0s 2ms/step - loss: 0.2047 - mae: 0.3578
Epoch 5/5
32/32 [==============================] - 0s 2ms/step - loss: 0.1942 - mae: 0.3491
<keras.callbacks.History at 0x7f9f6c339f70>

支持 sample_weightclass_weight

您可能已经注意到,我们的第一个基本示例并没有提及样本加权。如果要支持 fit() 参数 sample_weightclass_weight,只需执行以下操作:

  • data 参数中解包 sample_weight
  • 将其传递给 compiled_losscompiled_metrics(当然,如果您不依赖 compile() 来获取损失和指标,也可以手动应用)
  • 就是这么简单。
class CustomModel(keras.Model):
    def train_step(self, data):
        # Unpack the data. Its structure depends on your model and
        # on what you pass to `fit()`.
        if len(data) == 3:
            x, y, sample_weight = data
        else:
            sample_weight = None
            x, y = data

        with tf.GradientTape() as tape:
            y_pred = self(x, training=True)  # Forward pass
            # Compute the loss value.
            # The loss function is configured in `compile()`.
            loss = self.compiled_loss(
                y,
                y_pred,
                sample_weight=sample_weight,
                regularization_losses=self.losses,
            )

        # Compute gradients
        trainable_vars = self.trainable_variables
        gradients = tape.gradient(loss, trainable_vars)

        # Update weights
        self.optimizer.apply_gradients(zip(gradients, trainable_vars))

        # Update the metrics.
        # Metrics are configured in `compile()`.
        self.compiled_metrics.update_state(y, y_pred, sample_weight=sample_weight)

        # Return a dict mapping metric names to current value.
        # Note that it will include the loss (tracked in self.metrics).
        return {m.name: m.result() for m in self.metrics}


# Construct and compile an instance of CustomModel
inputs = keras.Input(shape=(32,))
outputs = keras.layers.Dense(1)(inputs)
model = CustomModel(inputs, outputs)
model.compile(optimizer="adam", loss="mse", metrics=["mae"])

# You can now use sample_weight argument
x = np.random.random((1000, 32))
y = np.random.random((1000, 1))
sw = np.random.random((1000, 1))
model.fit(x, y, sample_weight=sw, epochs=3)
Epoch 1/3
32/32 [==============================] - 1s 2ms/step - loss: 0.3687 - mae: 0.7165
Epoch 2/3
32/32 [==============================] - 0s 2ms/step - loss: 0.1641 - mae: 0.4603
Epoch 3/3
32/32 [==============================] - 0s 2ms/step - loss: 0.1271 - mae: 0.4127
<keras.callbacks.History at 0x7f9f6c2f40d0>

提供您自己的评估步骤

如何对调用 model.evaluate() 进行相同的处理?您需要以完全相同的方式重写 test_step。如下所示:

class CustomModel(keras.Model):
    def test_step(self, data):
        # Unpack the data
        x, y = data
        # Compute predictions
        y_pred = self(x, training=False)
        # Updates the metrics tracking the loss
        self.compiled_loss(y, y_pred, regularization_losses=self.losses)
        # Update the metrics.
        self.compiled_metrics.update_state(y, y_pred)
        # Return a dict mapping metric names to current value.
        # Note that it will include the loss (tracked in self.metrics).
        return {m.name: m.result() for m in self.metrics}


# Construct an instance of CustomModel
inputs = keras.Input(shape=(32,))
outputs = keras.layers.Dense(1)(inputs)
model = CustomModel(inputs, outputs)
model.compile(loss="mse", metrics=["mae"])

# Evaluate with our custom test_step
x = np.random.random((1000, 32))
y = np.random.random((1000, 1))
model.evaluate(x, y)
32/32 [==============================] - 0s 2ms/step - loss: 0.4197 - mae: 0.5246
[0.4196917414665222, 0.5245950818061829]

总结:端到端 GAN 示例

让我们看一个利用您刚刚所学全部内容的端到端示例。

请考虑:

  • 旨在生成 28x28x1 图像的生成器网络。
  • 旨在将 28x28x1 图像分为两类(“fake”和“real”)的鉴别器网络。
  • 分别用于两个网络的优化器。
  • 训练鉴别器的损失函数。
from tensorflow.keras import layers

# Create the discriminator
discriminator = keras.Sequential(
    [
        keras.Input(shape=(28, 28, 1)),
        layers.Conv2D(64, (3, 3), strides=(2, 2), padding="same"),
        layers.LeakyReLU(alpha=0.2),
        layers.Conv2D(128, (3, 3), strides=(2, 2), padding="same"),
        layers.LeakyReLU(alpha=0.2),
        layers.GlobalMaxPooling2D(),
        layers.Dense(1),
    ],
    name="discriminator",
)

# Create the generator
latent_dim = 128
generator = keras.Sequential(
    [
        keras.Input(shape=(latent_dim,)),
        # We want to generate 128 coefficients to reshape into a 7x7x128 map
        layers.Dense(7 * 7 * 128),
        layers.LeakyReLU(alpha=0.2),
        layers.Reshape((7, 7, 128)),
        layers.Conv2DTranspose(128, (4, 4), strides=(2, 2), padding="same"),
        layers.LeakyReLU(alpha=0.2),
        layers.Conv2DTranspose(128, (4, 4), strides=(2, 2), padding="same"),
        layers.LeakyReLU(alpha=0.2),
        layers.Conv2D(1, (7, 7), padding="same", activation="sigmoid"),
    ],
    name="generator",
)

这是特征齐全的 GAN 类,重写了 compile() 以使用其自己的签名,并在 train_step 的 17 行中实现了整个 GAN 算法:

class GAN(keras.Model):
    def __init__(self, discriminator, generator, latent_dim):
        super(GAN, self).__init__()
        self.discriminator = discriminator
        self.generator = generator
        self.latent_dim = latent_dim

    def compile(self, d_optimizer, g_optimizer, loss_fn):
        super(GAN, self).compile()
        self.d_optimizer = d_optimizer
        self.g_optimizer = g_optimizer
        self.loss_fn = loss_fn

    def train_step(self, real_images):
        if isinstance(real_images, tuple):
            real_images = real_images[0]
        # Sample random points in the latent space
        batch_size = tf.shape(real_images)[0]
        random_latent_vectors = tf.random.normal(shape=(batch_size, self.latent_dim))

        # Decode them to fake images
        generated_images = self.generator(random_latent_vectors)

        # Combine them with real images
        combined_images = tf.concat([generated_images, real_images], axis=0)

        # Assemble labels discriminating real from fake images
        labels = tf.concat(
            [tf.ones((batch_size, 1)), tf.zeros((batch_size, 1))], axis=0
        )
        # Add random noise to the labels - important trick!
        labels += 0.05 * tf.random.uniform(tf.shape(labels))

        # Train the discriminator
        with tf.GradientTape() as tape:
            predictions = self.discriminator(combined_images)
            d_loss = self.loss_fn(labels, predictions)
        grads = tape.gradient(d_loss, self.discriminator.trainable_weights)
        self.d_optimizer.apply_gradients(
            zip(grads, self.discriminator.trainable_weights)
        )

        # Sample random points in the latent space
        random_latent_vectors = tf.random.normal(shape=(batch_size, self.latent_dim))

        # Assemble labels that say "all real images"
        misleading_labels = tf.zeros((batch_size, 1))

        # Train the generator (note that we should *not* update the weights
        # of the discriminator)!
        with tf.GradientTape() as tape:
            predictions = self.discriminator(self.generator(random_latent_vectors))
            g_loss = self.loss_fn(misleading_labels, predictions)
        grads = tape.gradient(g_loss, self.generator.trainable_weights)
        self.g_optimizer.apply_gradients(zip(grads, self.generator.trainable_weights))
        return {"d_loss": d_loss, "g_loss": g_loss}

让我们对其进行测试:

# Prepare the dataset. We use both the training & test MNIST digits.
batch_size = 64
(x_train, _), (x_test, _) = keras.datasets.mnist.load_data()
all_digits = np.concatenate([x_train, x_test])
all_digits = all_digits.astype("float32") / 255.0
all_digits = np.reshape(all_digits, (-1, 28, 28, 1))
dataset = tf.data.Dataset.from_tensor_slices(all_digits)
dataset = dataset.shuffle(buffer_size=1024).batch(batch_size)

gan = GAN(discriminator=discriminator, generator=generator, latent_dim=latent_dim)
gan.compile(
    d_optimizer=keras.optimizers.Adam(learning_rate=0.0003),
    g_optimizer=keras.optimizers.Adam(learning_rate=0.0003),
    loss_fn=keras.losses.BinaryCrossentropy(from_logits=True),
)

# To limit the execution time, we only train on 100 batches. You can train on
# the entire dataset. You will need about 20 epochs to get nice results.
gan.fit(dataset.take(100), epochs=1)
100/100 [==============================] - 5s 15ms/step - d_loss: 0.5041 - g_loss: 0.9849
<keras.callbacks.History at 0x7f9f6c0f9370>

深度学习背后的思想十分简单,那么实现又何必复杂呢?