Save the date! Google I/O returns May 18-20 Register now

从头编写训练循环

在 TensorFlow.org 上查看 在 Google Colab 中运行 在 GitHub 上查看源代码 下载笔记本

设置

import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import numpy as np

简介

Keras 提供了默认的训练与评估循环 fit()evaluate()使用内置方法进行训练和评估指南中介绍了它们的用法。

如果想要自定义模型的学习算法,同时又能利用 fit() 的便利性(例如,使用 fit() 训练 GAN),则可以将 Model 类子类化并实现自己的 train_step() 方法,此方法可在 fit() 中重复调用。自定义 fit() 的功能指南对此进行了介绍。

现在,如果您想对训练和评估进行低级别控制,则应当从头开始编写自己的训练和评估循环。这正是本指南要介绍的内容。

使用 GradientTape:第一个端到端示例

GradientTape 作用域内调用模型使您可以检索层的可训练权重相对于损失值的梯度。利用优化器实例,您可以使用上述梯度来更新这些变量(可以使用 model.trainable_weights 检索这些变量)。

我们考虑一个简单的 MNIST 模型:

inputs = keras.Input(shape=(784,), name="digits")
x1 = layers.Dense(64, activation="relu")(inputs)
x2 = layers.Dense(64, activation="relu")(x1)
outputs = layers.Dense(10, name="predictions")(x2)
model = keras.Model(inputs=inputs, outputs=outputs)

我们使用带自定义训练循环的 mini-batch 梯度对其进行训练。

首先,我们需要优化器、损失函数和数据集:

# Instantiate an optimizer.
optimizer = keras.optimizers.SGD(learning_rate=1e-3)
# Instantiate a loss function.
loss_fn = keras.losses.SparseCategoricalCrossentropy(from_logits=True)

# Prepare the training dataset.
batch_size = 64
(x_train, y_train), (x_test, y_test) = keras.datasets.mnist.load_data()
x_train = np.reshape(x_train, (-1, 784))
x_test = np.reshape(x_test, (-1, 784))

# Reserve 10,000 samples for validation.
x_val = x_train[-10000:]
y_val = y_train[-10000:]
x_train = x_train[:-10000]
y_train = y_train[:-10000]

# Prepare the training dataset.
train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train))
train_dataset = train_dataset.shuffle(buffer_size=1024).batch(batch_size)

# Prepare the validation dataset.
val_dataset = tf.data.Dataset.from_tensor_slices((x_val, y_val))
val_dataset = val_dataset.batch(batch_size)

下面是我们的训练循环:

  • 我们打开一个遍历各周期的 for 循环
  • 对于每个周期,我们打开一个分批遍历数据集的 for 循环
  • 对于每个批次,我们打开一个 GradientTape() 作用域
  • 在此作用域内,我们调用模型(前向传递)并计算损失
  • 在作用域之外,我们检索模型权重相对于损失的梯度
  • 最后,我们根据梯度使用优化器来更新模型的权重
epochs = 2
for epoch in range(epochs):
    print("\nStart of epoch %d" % (epoch,))

    # Iterate over the batches of the dataset.
    for step, (x_batch_train, y_batch_train) in enumerate(train_dataset):

        # Open a GradientTape to record the operations run
        # during the forward pass, which enables auto-differentiation.
        with tf.GradientTape() as tape:

            # Run the forward pass of the layer.
            # The operations that the layer applies
            # to its inputs are going to be recorded
            # on the GradientTape.
            logits = model(x_batch_train, training=True)  # Logits for this minibatch

            # Compute the loss value for this minibatch.
            loss_value = loss_fn(y_batch_train, logits)

        # Use the gradient tape to automatically retrieve
        # the gradients of the trainable variables with respect to the loss.
        grads = tape.gradient(loss_value, model.trainable_weights)

        # Run one step of gradient descent by updating
        # the value of the variables to minimize the loss.
        optimizer.apply_gradients(zip(grads, model.trainable_weights))

        # Log every 200 batches.
        if step % 200 == 0:
            print(
                "Training loss (for one batch) at step %d: %.4f"
                % (step, float(loss_value))
            )
            print("Seen so far: %s samples" % ((step + 1) * batch_size))
Start of epoch 0
Training loss (for one batch) at step 0: 97.0760
Seen so far: 64 samples
Training loss (for one batch) at step 200: 1.0666
Seen so far: 12864 samples
Training loss (for one batch) at step 400: 1.1811
Seen so far: 25664 samples
Training loss (for one batch) at step 600: 1.8537
Seen so far: 38464 samples

Start of epoch 1
Training loss (for one batch) at step 0: 0.9178
Seen so far: 64 samples
Training loss (for one batch) at step 200: 1.2292
Seen so far: 12864 samples
Training loss (for one batch) at step 400: 0.7191
Seen so far: 25664 samples
Training loss (for one batch) at step 600: 0.5821
Seen so far: 38464 samples

指标的低级处理

我们在此基本循环中添加指标监视。

在这种从头开始编写的训练循环中,您可以轻松重用内置指标(或编写的自定义指标)。下面列出了具体流程:

  • 在循环开始时实例化指标
  • 在每个批次后调用 metric.update_state()
  • 当您需要显示指标的当前值时,调用 metric.result()
  • 当您需要清除指标的状态(通常在周期结束)时,调用 metric.reset_states()

我们利用这些知识在每个周期结束时基于验证数据计算 SparseCategoricalAccuracy

# Get model
inputs = keras.Input(shape=(784,), name="digits")
x = layers.Dense(64, activation="relu", name="dense_1")(inputs)
x = layers.Dense(64, activation="relu", name="dense_2")(x)
outputs = layers.Dense(10, name="predictions")(x)
model = keras.Model(inputs=inputs, outputs=outputs)

# Instantiate an optimizer to train the model.
optimizer = keras.optimizers.SGD(learning_rate=1e-3)
# Instantiate a loss function.
loss_fn = keras.losses.SparseCategoricalCrossentropy(from_logits=True)

# Prepare the metrics.
train_acc_metric = keras.metrics.SparseCategoricalAccuracy()
val_acc_metric = keras.metrics.SparseCategoricalAccuracy()

下面是我们的训练和评估循环:

import time

epochs = 2
for epoch in range(epochs):
    print("\nStart of epoch %d" % (epoch,))
    start_time = time.time()

    # Iterate over the batches of the dataset.
    for step, (x_batch_train, y_batch_train) in enumerate(train_dataset):
        with tf.GradientTape() as tape:
            logits = model(x_batch_train, training=True)
            loss_value = loss_fn(y_batch_train, logits)
        grads = tape.gradient(loss_value, model.trainable_weights)
        optimizer.apply_gradients(zip(grads, model.trainable_weights))

        # Update training metric.
        train_acc_metric.update_state(y_batch_train, logits)

        # Log every 200 batches.
        if step % 200 == 0:
            print(
                "Training loss (for one batch) at step %d: %.4f"
                % (step, float(loss_value))
            )
            print("Seen so far: %d samples" % ((step + 1) * batch_size))

    # Display metrics at the end of each epoch.
    train_acc = train_acc_metric.result()
    print("Training acc over epoch: %.4f" % (float(train_acc),))

    # Reset training metrics at the end of each epoch
    train_acc_metric.reset_states()

    # Run a validation loop at the end of each epoch.
    for x_batch_val, y_batch_val in val_dataset:
        val_logits = model(x_batch_val, training=False)
        # Update val metrics
        val_acc_metric.update_state(y_batch_val, val_logits)
    val_acc = val_acc_metric.result()
    val_acc_metric.reset_states()
    print("Validation acc: %.4f" % (float(val_acc),))
    print("Time taken: %.2fs" % (time.time() - start_time))
Start of epoch 0
Training loss (for one batch) at step 0: 96.9259
Seen so far: 64 samples
Training loss (for one batch) at step 200: 1.4109
Seen so far: 12864 samples
Training loss (for one batch) at step 400: 0.5330
Seen so far: 25664 samples
Training loss (for one batch) at step 600: 0.5641
Seen so far: 38464 samples
Training acc over epoch: 0.7641
Validation acc: 0.7811
Time taken: 5.66s

Start of epoch 1
Training loss (for one batch) at step 0: 0.6936
Seen so far: 64 samples
Training loss (for one batch) at step 200: 0.5072
Seen so far: 12864 samples
Training loss (for one batch) at step 400: 0.5011
Seen so far: 25664 samples
Training loss (for one batch) at step 600: 0.3613
Seen so far: 38464 samples
Training acc over epoch: 0.8634
Validation acc: 0.8817
Time taken: 5.56s

使用 tf.function 加快训练步骤的速度

TensorFlow 2.0 中的默认运行时为 Eager Execution。因此,上面的训练循环会以 Eager 模式执行。

这对于调试非常有用,但计算图编译具有确定的性能优势。将您的计算描述为静态计算图可以使框架应用全局性能优化。当框架受约束而以贪心方式一个接一个地执行运算,而又不知道接下来会发生什么时,便无法做到这一点。

以张量为输入的任何函数都可以编译为静态计算图。只需添加一个 @tf.function 装饰器,具体如下所示:

@tf.function
def train_step(x, y):
    with tf.GradientTape() as tape:
        logits = model(x, training=True)
        loss_value = loss_fn(y, logits)
    grads = tape.gradient(loss_value, model.trainable_weights)
    optimizer.apply_gradients(zip(grads, model.trainable_weights))
    train_acc_metric.update_state(y, logits)
    return loss_value

我们对评估步骤执行相同的操作:

@tf.function
def test_step(x, y):
    val_logits = model(x, training=False)
    val_acc_metric.update_state(y, val_logits)

现在,我们使用编译后的训练步骤重新运行训练循环:

import time

epochs = 2
for epoch in range(epochs):
    print("\nStart of epoch %d" % (epoch,))
    start_time = time.time()

    # Iterate over the batches of the dataset.
    for step, (x_batch_train, y_batch_train) in enumerate(train_dataset):
        loss_value = train_step(x_batch_train, y_batch_train)

        # Log every 200 batches.
        if step % 200 == 0:
            print(
                "Training loss (for one batch) at step %d: %.4f"
                % (step, float(loss_value))
            )
            print("Seen so far: %d samples" % ((step + 1) * batch_size))

    # Display metrics at the end of each epoch.
    train_acc = train_acc_metric.result()
    print("Training acc over epoch: %.4f" % (float(train_acc),))

    # Reset training metrics at the end of each epoch
    train_acc_metric.reset_states()

    # Run a validation loop at the end of each epoch.
    for x_batch_val, y_batch_val in val_dataset:
        test_step(x_batch_val, y_batch_val)

    val_acc = val_acc_metric.result()
    val_acc_metric.reset_states()
    print("Validation acc: %.4f" % (float(val_acc),))
    print("Time taken: %.2fs" % (time.time() - start_time))
Start of epoch 0
Training loss (for one batch) at step 0: 0.6896
Seen so far: 64 samples
Training loss (for one batch) at step 200: 0.6898
Seen so far: 12864 samples
Training loss (for one batch) at step 400: 0.4330
Seen so far: 25664 samples
Training loss (for one batch) at step 600: 0.2993
Seen so far: 38464 samples
Training acc over epoch: 0.8877
Validation acc: 0.8907
Time taken: 1.31s

Start of epoch 1
Training loss (for one batch) at step 0: 0.5393
Seen so far: 64 samples
Training loss (for one batch) at step 200: 0.3981
Seen so far: 12864 samples
Training loss (for one batch) at step 400: 0.4990
Seen so far: 25664 samples
Training loss (for one batch) at step 600: 0.3251
Seen so far: 38464 samples
Training acc over epoch: 0.9019
Validation acc: 0.8986
Time taken: 1.05s

速度快了很多,对吗?

对模型跟踪的损失进行低级处理

层和模型以递归方式跟踪调用 self.add_loss(value) 的层在前向传递过程中创建的任何损失。可在前向传递结束时通过属性 model.losses 获得标量损失值的结果列表。

如果要使用这些损失分量,应将它们求和并添加到训练步骤的主要损失中。

考虑下面这个层,它会产生活动正则化损失:

class ActivityRegularizationLayer(layers.Layer):
    def call(self, inputs):
        self.add_loss(1e-2 * tf.reduce_sum(inputs))
        return inputs

我们构建一个使用它的超简单模型:

inputs = keras.Input(shape=(784,), name="digits")
x = layers.Dense(64, activation="relu")(inputs)
# Insert activity regularization as a layer
x = ActivityRegularizationLayer()(x)
x = layers.Dense(64, activation="relu")(x)
outputs = layers.Dense(10, name="predictions")(x)

model = keras.Model(inputs=inputs, outputs=outputs)

我们的训练步骤现在应当如下所示:

@tf.function
def train_step(x, y):
    with tf.GradientTape() as tape:
        logits = model(x, training=True)
        loss_value = loss_fn(y, logits)
        # Add any extra losses created during the forward pass.
        loss_value += sum(model.losses)
    grads = tape.gradient(loss_value, model.trainable_weights)
    optimizer.apply_gradients(zip(grads, model.trainable_weights))
    train_acc_metric.update_state(y, logits)
    return loss_value

总结

现在,您已了解如何使用内置训练循环以及从头开始编写自己的训练循环。

总之,下面是一个简单的端到端示例,它将您在本指南中学到的所有知识串联起来:一个在 MNIST 数字上训练的 DCGAN。

端到端示例:从头开始的 GAN 训练循环

您可能熟悉生成对抗网络 (GAN)。通过学习图像训练数据集的隐分布(图像的“隐空间”),GAN 可以生成看起来极为真实的新图像。

一个 GAN 由两部分组成:一个“生成器”模型(可将隐空间中的点映射到图像空间中的点)和一个“判别器”模型,后者是一个可以区分真实图像(来自训练数据集)与虚假图像(生成器网络的输出)之间差异的分类器。

GAN 训练循环如下所示:

  1. 训练判别器。
  • 在隐空间中对一批随机点采样。
  • 通过“生成器”模型将这些点转换为虚假图像。
  • 获取一批真实图像,并将它们与生成的图像组合。
  • 训练“判别器”模型以对生成的图像与真实图像进行分类。
  1. 训练生成器。
  • 在隐空间中对随机点采样。
  • 通过“生成器”网络将这些点转换为虚假图像。
  • 获取一批真实图像,并将它们与生成的图像组合。
  • 训练“生成器”模型以“欺骗”判别器,并将虚假图像分类为真实图像。

有关 GAN 工作原理的详细介绍,请参阅 Deep Learning with Python

我们来实现这个训练循环。首先,创建用于区分虚假数字和真实数字的判别器:

discriminator = keras.Sequential(
    [
        keras.Input(shape=(28, 28, 1)),
        layers.Conv2D(64, (3, 3), strides=(2, 2), padding="same"),
        layers.LeakyReLU(alpha=0.2),
        layers.Conv2D(128, (3, 3), strides=(2, 2), padding="same"),
        layers.LeakyReLU(alpha=0.2),
        layers.GlobalMaxPooling2D(),
        layers.Dense(1),
    ],
    name="discriminator",
)
discriminator.summary()
Model: "discriminator"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
=================================================================
conv2d (Conv2D)              (None, 14, 14, 64)        640       
_________________________________________________________________
leaky_re_lu (LeakyReLU)      (None, 14, 14, 64)        0         
_________________________________________________________________
conv2d_1 (Conv2D)            (None, 7, 7, 128)         73856     
_________________________________________________________________
leaky_re_lu_1 (LeakyReLU)    (None, 7, 7, 128)         0         
_________________________________________________________________
global_max_pooling2d (Global (None, 128)               0         
_________________________________________________________________
dense_4 (Dense)              (None, 1)                 129       
=================================================================
Total params: 74,625
Trainable params: 74,625
Non-trainable params: 0
_________________________________________________________________

接着,我们创建一个生成器网络,它可以将隐向量转换成形状为 (28, 28, 1)(表示 MNIST 数字)的输出:

latent_dim = 128

generator = keras.Sequential(
    [
        keras.Input(shape=(latent_dim,)),
        # We want to generate 128 coefficients to reshape into a 7x7x128 map
        layers.Dense(7 * 7 * 128),
        layers.LeakyReLU(alpha=0.2),
        layers.Reshape((7, 7, 128)),
        layers.Conv2DTranspose(128, (4, 4), strides=(2, 2), padding="same"),
        layers.LeakyReLU(alpha=0.2),
        layers.Conv2DTranspose(128, (4, 4), strides=(2, 2), padding="same"),
        layers.LeakyReLU(alpha=0.2),
        layers.Conv2D(1, (7, 7), padding="same", activation="sigmoid"),
    ],
    name="generator",
)

这是关键部分:训练循环。如您所见,训练非常简单。训练步骤函数仅有 17 行代码。

# Instantiate one optimizer for the discriminator and another for the generator.
d_optimizer = keras.optimizers.Adam(learning_rate=0.0003)
g_optimizer = keras.optimizers.Adam(learning_rate=0.0004)

# Instantiate a loss function.
loss_fn = keras.losses.BinaryCrossentropy(from_logits=True)


@tf.function
def train_step(real_images):
    # Sample random points in the latent space
    random_latent_vectors = tf.random.normal(shape=(batch_size, latent_dim))
    # Decode them to fake images
    generated_images = generator(random_latent_vectors)
    # Combine them with real images
    combined_images = tf.concat([generated_images, real_images], axis=0)

    # Assemble labels discriminating real from fake images
    labels = tf.concat(
        [tf.ones((batch_size, 1)), tf.zeros((real_images.shape[0], 1))], axis=0
    )
    # Add random noise to the labels - important trick!
    labels += 0.05 * tf.random.uniform(labels.shape)

    # Train the discriminator
    with tf.GradientTape() as tape:
        predictions = discriminator(combined_images)
        d_loss = loss_fn(labels, predictions)
    grads = tape.gradient(d_loss, discriminator.trainable_weights)
    d_optimizer.apply_gradients(zip(grads, discriminator.trainable_weights))

    # Sample random points in the latent space
    random_latent_vectors = tf.random.normal(shape=(batch_size, latent_dim))
    # Assemble labels that say "all real images"
    misleading_labels = tf.zeros((batch_size, 1))

    # Train the generator (note that we should *not* update the weights
    # of the discriminator)!
    with tf.GradientTape() as tape:
        predictions = discriminator(generator(random_latent_vectors))
        g_loss = loss_fn(misleading_labels, predictions)
    grads = tape.gradient(g_loss, generator.trainable_weights)
    g_optimizer.apply_gradients(zip(grads, generator.trainable_weights))
    return d_loss, g_loss, generated_images

我们通过在各个图像批次上重复调用 train_step 来训练 GAN。

由于我们的判别器和生成器是卷积神经网络,因此您将在 GPU 上运行此代码。

import os

# Prepare the dataset. We use both the training & test MNIST digits.
batch_size = 64
(x_train, _), (x_test, _) = keras.datasets.mnist.load_data()
all_digits = np.concatenate([x_train, x_test])
all_digits = all_digits.astype("float32") / 255.0
all_digits = np.reshape(all_digits, (-1, 28, 28, 1))
dataset = tf.data.Dataset.from_tensor_slices(all_digits)
dataset = dataset.shuffle(buffer_size=1024).batch(batch_size)

epochs = 1  # In practice you need at least 20 epochs to generate nice digits.
save_dir = "./"

for epoch in range(epochs):
    print("\nStart epoch", epoch)

    for step, real_images in enumerate(dataset):
        # Train the discriminator & generator on one batch of real images.
        d_loss, g_loss, generated_images = train_step(real_images)

        # Logging.
        if step % 200 == 0:
            # Print metrics
            print("discriminator loss at step %d: %.2f" % (step, d_loss))
            print("adversarial loss at step %d: %.2f" % (step, g_loss))

            # Save one generated image
            img = tf.keras.preprocessing.image.array_to_img(
                generated_images[0] * 255.0, scale=False
            )
            img.save(os.path.join(save_dir, "generated_img" + str(step) + ".png"))

        # To limit execution time we stop after 10 steps.
        # Remove the lines below to actually train the model!
        if step > 10:
            break
Start epoch 0
discriminator loss at step 0: 0.70
adversarial loss at step 0: 0.70

就是这样!在 Colab GPU 上进行约 30 秒钟的训练后,您将获得漂亮的虚假 MNIST 数字。