在 TensorFlow.org 上查看 | 在 Google Colab 中运行 | 在 GitHub 上查看源代码 | 下载笔记本 |
简介
您在进行监督学习时可以使用 fit()
,一切都可以顺利完成。
需要从头开始编写自己的训练循环时,您可以使用 GradientTape
并控制每个微小的细节。
但如果您需要自定义训练算法,又想从 fit()
的便捷功能(例如回调、内置分布支持或步骤融合)中受益,那么该怎么做?
Keras 的核心原则是渐进式呈现复杂性。您应当始终能够以渐进的方式习惯较低级别的工作流。如果高级功能并不完全符合您的用例,那么您就不应深陷其中。您应当能够从容地控制微小的细节,同时保留与之相称的高级便利性。
需要自定义 fit()
的功能时,您应重写 Model
类的训练步骤函数。此函数是 fit()
会针对每批次数据调用的函数。然后,您将能够像往常一样调用 fit()
,它将运行您自己的学习算法。
请注意,此模式不会妨碍您使用函数式 API 构建模型。无论是构建 Sequential
模型、函数式 API 模型还是子类模型,均可采用这种模式。
让我们了解一下它的工作方式。
设置
需要 TensorFlow 2.2 或更高版本。
import tensorflow as tf
from tensorflow import keras
2022-12-14 21:58:38.950990: W tensorflow/compiler/xla/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libnvinfer.so.7'; dlerror: libnvinfer.so.7: cannot open shared object file: No such file or directory 2022-12-14 21:58:38.951077: W tensorflow/compiler/xla/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libnvinfer_plugin.so.7'; dlerror: libnvinfer_plugin.so.7: cannot open shared object file: No such file or directory 2022-12-14 21:58:38.951086: W tensorflow/compiler/tf2tensorrt/utils/py_utils.cc:38] TF-TRT Warning: Cannot dlopen some TensorRT libraries. If you would like to use Nvidia GPU with TensorRT, please make sure the missing libraries mentioned above are installed properly.
第一个简单的示例
让我们从一个简单的示例开始:
- 创建一个将
keras.Model
子类化的新类。 - 仅重写
train_step(self, data)
方法。 - 返回一个将指标名称(包括损失)映射到其当前值的字典。
输入参数 data
是传递以拟合训练数据的数据:
- 如果通过调用
fit(x, y, ...)
传递 Numpy 数组,则data
将为元祖(x, y)
。 - 如果通过调用
fit(dataset, ...)
传递tf.data.Dataset
,则data
将为每批次dataset
产生的数据。
我们在 train_step
方法的主体中实现了定期的训练更新,类似于您已经熟悉的内容。重要的是,我们通过 self.compiled_loss
计算损失,它会封装传递给 compile()
的损失函数。
同样,我们调用 self.compiled_metrics.update_state(y, y_pred)
来更新在 compile()
中传递的指标的状态,并在最后从 self.metrics
中查询结果以检索其当前值。
class CustomModel(keras.Model):
def train_step(self, data):
# Unpack the data. Its structure depends on your model and
# on what you pass to `fit()`.
x, y = data
with tf.GradientTape() as tape:
y_pred = self(x, training=True) # Forward pass
# Compute the loss value
# (the loss function is configured in `compile()`)
loss = self.compiled_loss(y, y_pred, regularization_losses=self.losses)
# Compute gradients
trainable_vars = self.trainable_variables
gradients = tape.gradient(loss, trainable_vars)
# Update weights
self.optimizer.apply_gradients(zip(gradients, trainable_vars))
# Update metrics (includes the metric that tracks the loss)
self.compiled_metrics.update_state(y, y_pred)
# Return a dict mapping metric names to current value
return {m.name: m.result() for m in self.metrics}
我们来试一下:
import numpy as np
# Construct and compile an instance of CustomModel
inputs = keras.Input(shape=(32,))
outputs = keras.layers.Dense(1)(inputs)
model = CustomModel(inputs, outputs)
model.compile(optimizer="adam", loss="mse", metrics=["mae"])
# Just use `fit` as usual
x = np.random.random((1000, 32))
y = np.random.random((1000, 1))
model.fit(x, y, epochs=3)
Epoch 1/3 32/32 [==============================] - 1s 2ms/step - loss: 1.8958 - mae: 1.2818 Epoch 2/3 32/32 [==============================] - 0s 2ms/step - loss: 0.8889 - mae: 0.8171 Epoch 3/3 32/32 [==============================] - 0s 2ms/step - loss: 0.4363 - mae: 0.5347 <keras.callbacks.History at 0x7f9f90038e20>
在更低级别上操作
当然,您可以直接跳过在 compile()
中传递损失函数,而在 train_step
中手动完成所有内容。指标也是如此。
以下是一个较低级别的示例,仅使用 compile()
配置优化器:
- 我们从创建
Metric
实例以跟踪我们的损失和 MAE 得分开始。 - 我们实现可更新这些指标状态(通过对指标调用
update_state()
)的自定义train_step()
,然后对其进行查询(通过result()
)以返回其当前平均值,由进度条显示并传递给任何回调。 - 请注意,需要在每个周期之间对指标调用
reset_states()
!否则,调用result()
会返回自训练开始以来的平均值,但我们通常要使用的是每个周期的平均值。幸运的是,该框架可以帮助我们实现:只需在模型的metrics
属性中列出要重置的任何指标。模型将在每个fit()
周期开始时或在开始调用evaluate()
时对其中列出的任何对象调用reset_states()
。
loss_tracker = keras.metrics.Mean(name="loss")
mae_metric = keras.metrics.MeanAbsoluteError(name="mae")
class CustomModel(keras.Model):
def train_step(self, data):
x, y = data
with tf.GradientTape() as tape:
y_pred = self(x, training=True) # Forward pass
# Compute our own loss
loss = keras.losses.mean_squared_error(y, y_pred)
# Compute gradients
trainable_vars = self.trainable_variables
gradients = tape.gradient(loss, trainable_vars)
# Update weights
self.optimizer.apply_gradients(zip(gradients, trainable_vars))
# Compute our own metrics
loss_tracker.update_state(loss)
mae_metric.update_state(y, y_pred)
return {"loss": loss_tracker.result(), "mae": mae_metric.result()}
@property
def metrics(self):
# We list our `Metric` objects here so that `reset_states()` can be
# called automatically at the start of each epoch
# or at the start of `evaluate()`.
# If you don't implement this property, you have to call
# `reset_states()` yourself at the time of your choosing.
return [loss_tracker, mae_metric]
# Construct an instance of CustomModel
inputs = keras.Input(shape=(32,))
outputs = keras.layers.Dense(1)(inputs)
model = CustomModel(inputs, outputs)
# We don't passs a loss or metrics here.
model.compile(optimizer="adam")
# Just use `fit` as usual -- you can use callbacks, etc.
x = np.random.random((1000, 32))
y = np.random.random((1000, 1))
model.fit(x, y, epochs=5)
Epoch 1/5 32/32 [==============================] - 0s 2ms/step - loss: 0.2442 - mae: 0.3902 Epoch 2/5 32/32 [==============================] - 0s 2ms/step - loss: 0.2286 - mae: 0.3773 Epoch 3/5 32/32 [==============================] - 0s 2ms/step - loss: 0.2163 - mae: 0.3672 Epoch 4/5 32/32 [==============================] - 0s 2ms/step - loss: 0.2047 - mae: 0.3578 Epoch 5/5 32/32 [==============================] - 0s 2ms/step - loss: 0.1942 - mae: 0.3491 <keras.callbacks.History at 0x7f9f6c339f70>
支持 sample_weight
和 class_weight
您可能已经注意到,我们的第一个基本示例并没有提及样本加权。如果要支持 fit()
参数 sample_weight
和 class_weight
,只需执行以下操作:
- 从
data
参数中解包sample_weight
- 将其传递给
compiled_loss
和compiled_metrics
(当然,如果您不依赖compile()
来获取损失和指标,也可以手动应用) - 就是这么简单。
class CustomModel(keras.Model):
def train_step(self, data):
# Unpack the data. Its structure depends on your model and
# on what you pass to `fit()`.
if len(data) == 3:
x, y, sample_weight = data
else:
sample_weight = None
x, y = data
with tf.GradientTape() as tape:
y_pred = self(x, training=True) # Forward pass
# Compute the loss value.
# The loss function is configured in `compile()`.
loss = self.compiled_loss(
y,
y_pred,
sample_weight=sample_weight,
regularization_losses=self.losses,
)
# Compute gradients
trainable_vars = self.trainable_variables
gradients = tape.gradient(loss, trainable_vars)
# Update weights
self.optimizer.apply_gradients(zip(gradients, trainable_vars))
# Update the metrics.
# Metrics are configured in `compile()`.
self.compiled_metrics.update_state(y, y_pred, sample_weight=sample_weight)
# Return a dict mapping metric names to current value.
# Note that it will include the loss (tracked in self.metrics).
return {m.name: m.result() for m in self.metrics}
# Construct and compile an instance of CustomModel
inputs = keras.Input(shape=(32,))
outputs = keras.layers.Dense(1)(inputs)
model = CustomModel(inputs, outputs)
model.compile(optimizer="adam", loss="mse", metrics=["mae"])
# You can now use sample_weight argument
x = np.random.random((1000, 32))
y = np.random.random((1000, 1))
sw = np.random.random((1000, 1))
model.fit(x, y, sample_weight=sw, epochs=3)
Epoch 1/3 32/32 [==============================] - 1s 2ms/step - loss: 0.3687 - mae: 0.7165 Epoch 2/3 32/32 [==============================] - 0s 2ms/step - loss: 0.1641 - mae: 0.4603 Epoch 3/3 32/32 [==============================] - 0s 2ms/step - loss: 0.1271 - mae: 0.4127 <keras.callbacks.History at 0x7f9f6c2f40d0>
提供您自己的评估步骤
如何对调用 model.evaluate()
进行相同的处理?您需要以完全相同的方式重写 test_step
。如下所示:
class CustomModel(keras.Model):
def test_step(self, data):
# Unpack the data
x, y = data
# Compute predictions
y_pred = self(x, training=False)
# Updates the metrics tracking the loss
self.compiled_loss(y, y_pred, regularization_losses=self.losses)
# Update the metrics.
self.compiled_metrics.update_state(y, y_pred)
# Return a dict mapping metric names to current value.
# Note that it will include the loss (tracked in self.metrics).
return {m.name: m.result() for m in self.metrics}
# Construct an instance of CustomModel
inputs = keras.Input(shape=(32,))
outputs = keras.layers.Dense(1)(inputs)
model = CustomModel(inputs, outputs)
model.compile(loss="mse", metrics=["mae"])
# Evaluate with our custom test_step
x = np.random.random((1000, 32))
y = np.random.random((1000, 1))
model.evaluate(x, y)
32/32 [==============================] - 0s 2ms/step - loss: 0.4197 - mae: 0.5246 [0.4196917414665222, 0.5245950818061829]
总结:端到端 GAN 示例
让我们看一个利用您刚刚所学全部内容的端到端示例。
请考虑:
- 旨在生成 28x28x1 图像的生成器网络。
- 旨在将 28x28x1 图像分为两类(“fake”和“real”)的鉴别器网络。
- 分别用于两个网络的优化器。
- 训练鉴别器的损失函数。
from tensorflow.keras import layers
# Create the discriminator
discriminator = keras.Sequential(
[
keras.Input(shape=(28, 28, 1)),
layers.Conv2D(64, (3, 3), strides=(2, 2), padding="same"),
layers.LeakyReLU(alpha=0.2),
layers.Conv2D(128, (3, 3), strides=(2, 2), padding="same"),
layers.LeakyReLU(alpha=0.2),
layers.GlobalMaxPooling2D(),
layers.Dense(1),
],
name="discriminator",
)
# Create the generator
latent_dim = 128
generator = keras.Sequential(
[
keras.Input(shape=(latent_dim,)),
# We want to generate 128 coefficients to reshape into a 7x7x128 map
layers.Dense(7 * 7 * 128),
layers.LeakyReLU(alpha=0.2),
layers.Reshape((7, 7, 128)),
layers.Conv2DTranspose(128, (4, 4), strides=(2, 2), padding="same"),
layers.LeakyReLU(alpha=0.2),
layers.Conv2DTranspose(128, (4, 4), strides=(2, 2), padding="same"),
layers.LeakyReLU(alpha=0.2),
layers.Conv2D(1, (7, 7), padding="same", activation="sigmoid"),
],
name="generator",
)
这是特征齐全的 GAN 类,重写了 compile()
以使用其自己的签名,并在 train_step
的 17 行中实现了整个 GAN 算法:
class GAN(keras.Model):
def __init__(self, discriminator, generator, latent_dim):
super(GAN, self).__init__()
self.discriminator = discriminator
self.generator = generator
self.latent_dim = latent_dim
def compile(self, d_optimizer, g_optimizer, loss_fn):
super(GAN, self).compile()
self.d_optimizer = d_optimizer
self.g_optimizer = g_optimizer
self.loss_fn = loss_fn
def train_step(self, real_images):
if isinstance(real_images, tuple):
real_images = real_images[0]
# Sample random points in the latent space
batch_size = tf.shape(real_images)[0]
random_latent_vectors = tf.random.normal(shape=(batch_size, self.latent_dim))
# Decode them to fake images
generated_images = self.generator(random_latent_vectors)
# Combine them with real images
combined_images = tf.concat([generated_images, real_images], axis=0)
# Assemble labels discriminating real from fake images
labels = tf.concat(
[tf.ones((batch_size, 1)), tf.zeros((batch_size, 1))], axis=0
)
# Add random noise to the labels - important trick!
labels += 0.05 * tf.random.uniform(tf.shape(labels))
# Train the discriminator
with tf.GradientTape() as tape:
predictions = self.discriminator(combined_images)
d_loss = self.loss_fn(labels, predictions)
grads = tape.gradient(d_loss, self.discriminator.trainable_weights)
self.d_optimizer.apply_gradients(
zip(grads, self.discriminator.trainable_weights)
)
# Sample random points in the latent space
random_latent_vectors = tf.random.normal(shape=(batch_size, self.latent_dim))
# Assemble labels that say "all real images"
misleading_labels = tf.zeros((batch_size, 1))
# Train the generator (note that we should *not* update the weights
# of the discriminator)!
with tf.GradientTape() as tape:
predictions = self.discriminator(self.generator(random_latent_vectors))
g_loss = self.loss_fn(misleading_labels, predictions)
grads = tape.gradient(g_loss, self.generator.trainable_weights)
self.g_optimizer.apply_gradients(zip(grads, self.generator.trainable_weights))
return {"d_loss": d_loss, "g_loss": g_loss}
让我们对其进行测试:
# Prepare the dataset. We use both the training & test MNIST digits.
batch_size = 64
(x_train, _), (x_test, _) = keras.datasets.mnist.load_data()
all_digits = np.concatenate([x_train, x_test])
all_digits = all_digits.astype("float32") / 255.0
all_digits = np.reshape(all_digits, (-1, 28, 28, 1))
dataset = tf.data.Dataset.from_tensor_slices(all_digits)
dataset = dataset.shuffle(buffer_size=1024).batch(batch_size)
gan = GAN(discriminator=discriminator, generator=generator, latent_dim=latent_dim)
gan.compile(
d_optimizer=keras.optimizers.Adam(learning_rate=0.0003),
g_optimizer=keras.optimizers.Adam(learning_rate=0.0003),
loss_fn=keras.losses.BinaryCrossentropy(from_logits=True),
)
# To limit the execution time, we only train on 100 batches. You can train on
# the entire dataset. You will need about 20 epochs to get nice results.
gan.fit(dataset.take(100), epochs=1)
100/100 [==============================] - 5s 15ms/step - d_loss: 0.5041 - g_loss: 0.9849 <keras.callbacks.History at 0x7f9f6c0f9370>
深度学习背后的思想十分简单,那么实现又何必复杂呢?