调试 TensorFlow 2 迁移的训练流水线

在 TensorFlow.org 上查看 在 Google Colab 中运行 在 GitHub 上查看源代码 下载笔记本

此笔记本演示了如何在迁移到 TensorFlow 2 (TF2) 时调试训练流水线。它由以下组件组成:

  1. 调试训练流水线的建议步骤和代码示例
  2. 用于调试的工具
  3. 其他相关资源

一个假设是您有用于比较的 TensorFlow 1 (TF1.x) 代码和已训练模型,并且您希望构建一个 TF2 模型来实现类似的验证准确率。

此笔记本涵盖有关训练/推断速度或内存使用量的调试性能问题。

调试工作流

下面是调试 TF2 训练流水线的一般工作流。请注意,您不需要按顺序执行这些步骤。您也可以使用二分查找方法,在中间步骤中测试模型并缩小调试范围。

  1. 修复编译和运行时错误

  2. 单次前向传递验证(在单独的指南中)

    a. 在单个 CPU 设备上

    • 验证变量是否只创建一次
    • 检查变量计数、名称和形状是否匹配
    • 重置所有变量,在停用所有随机性的情况下检查数值等价性
    • 对齐随机数生成,检查推断中的数值等价性
    • (可选)检查检查点已正确加载,TF1.x/TF2 模型生成相同的输出

    b. 在单个 GPU/TPU 设备上

    c. 采用多设备策略

  3. 几个步骤的模型训练数值等价性验证(下面提供代码示例)

    a. 在单个 CPU 设备上使用少量固定数据进行单次训练步骤验证。具体来说,检查以下组件的数值等价性

    • 损失计算
    • 指标
    • 学习率
    • 梯度计算和更新

    b. 在训练 3 个或更多步骤后检查统计数据,验证优化器的行为(如动量)在单个 CPU 设备上是否仍然使用固定数据

    c. 在单个 GPU/TPU 设备上

    d. 使用多设备策略(查看底部 MultiProcessRunner 的介绍)

  4. 对真实数据集的端到端收敛测试

    a. 使用 TensorBoard 检查训练行为

    • 首先使用 SGD 等简单的优化器和 tf.distribute.OneDeviceStrategy 等简单的分布策略
    • 训练指标
    • 评估指标
    • 找出对固有随机性的合理容忍度是多少

    b. 使用高级优化器/学习率调度器/分布策略检查等价性

    c. 使用混合精度时检查等价性

  5. 附加产品基准

安装

# The `DeterministicRandomTestTool` is only available from Tensorflow 2.8:
pip install -q "tensorflow==2.9.*"

单前向传递验证

单前向传递验证(包括检查点加载)将在不同的 colab 中介绍。

import sys
import unittest
import numpy as np

import tensorflow as tf
import tensorflow.compat.v1 as v1
2023-11-07 19:59:22.275296: W tensorflow/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libcudart.so.11.0'; dlerror: libcudart.so.11.0: cannot open shared object file: No such file or directory

几个步骤的模型训练数值等价性验证

设置模型配置并准备一个假数据集。

params = {
    'input_size': 3,
    'num_classes': 3,
    'layer_1_size': 2,
    'layer_2_size': 2,
    'num_train_steps': 100,
    'init_lr': 1e-3,
    'end_lr': 0.0,
    'decay_steps': 1000,
    'lr_power': 1.0,
}

# make a small fixed dataset
fake_x = np.ones((2, params['input_size']), dtype=np.float32)
fake_y = np.zeros((2, params['num_classes']), dtype=np.int32)
fake_y[0][0] = 1
fake_y[1][1] = 1

step_num = 3

定义 TF1.x 模型。

# Assume there is an existing TF1.x model using estimator API
# Wrap the model_fn to log necessary tensors for result comparison
class SimpleModelWrapper():
  def __init__(self):
    self.logged_ops = {}
    self.logs = {
        'step': [],
        'lr': [],
        'loss': [],
        'grads_and_vars': [],
        'layer_out': []}

  def model_fn(self, features, labels, mode, params):
      out_1 = tf.compat.v1.layers.dense(features, units=params['layer_1_size'])
      out_2 = tf.compat.v1.layers.dense(out_1, units=params['layer_2_size'])
      logits = tf.compat.v1.layers.dense(out_2, units=params['num_classes'])
      loss = tf.compat.v1.losses.softmax_cross_entropy(labels, logits)

      # skip EstimatorSpec details for prediction and evaluation 
      if mode == tf.estimator.ModeKeys.PREDICT:
          pass
      if mode == tf.estimator.ModeKeys.EVAL:
          pass
      assert mode == tf.estimator.ModeKeys.TRAIN

      global_step = tf.compat.v1.train.get_or_create_global_step()
      lr = tf.compat.v1.train.polynomial_decay(
        learning_rate=params['init_lr'],
        global_step=global_step,
        decay_steps=params['decay_steps'],
        end_learning_rate=params['end_lr'],
        power=params['lr_power'])

      optmizer = tf.compat.v1.train.GradientDescentOptimizer(lr)
      grads_and_vars = optmizer.compute_gradients(
          loss=loss,
          var_list=graph.get_collection(
              tf.compat.v1.GraphKeys.TRAINABLE_VARIABLES))
      train_op = optmizer.apply_gradients(
          grads_and_vars,
          global_step=global_step)

      # log tensors
      self.logged_ops['step'] = global_step
      self.logged_ops['lr'] = lr
      self.logged_ops['loss'] = loss
      self.logged_ops['grads_and_vars'] = grads_and_vars
      self.logged_ops['layer_out'] = {
          'layer_1': out_1,
          'layer_2': out_2,
          'logits': logits}

      return tf.estimator.EstimatorSpec(mode, loss=loss, train_op=train_op)

  def update_logs(self, logs):
    for key in logs.keys():
      model_tf1.logs[key].append(logs[key])

以下 v1.keras.utils.DeterministicRandomTestTool 类提供了一个上下文管理器 scope(),它可以使有状态的随机运算在 TF1 计算图/会话和 Eager Execution 中使用相同的种子。

此工具提供两种测试模式:

  1. constant,无论被调用过多少次,都会为每个单一运算使用相同的种子,以及
  2. num_random_ops,使用先前观测到的有状态随机运算的数量作为运算种子。

这既适用于用于创建和初始化变量的有状态随机运算,也适用于计算中使用的有状态随机运算(例如用于随机失活层)。

random_tool = v1.keras.utils.DeterministicRandomTestTool(mode='num_random_ops')
WARNING:tensorflow:From /tmpfs/tmp/ipykernel_234052/2689227634.py:1: The name tf.keras.utils.DeterministicRandomTestTool is deprecated. Please use tf.compat.v1.keras.utils.DeterministicRandomTestTool instead.

在计算图模式下运行 TF1.x 模型。收集前 3 个训练步骤的统计数据以进行数值等价性比较。

with random_tool.scope():
  graph = tf.Graph()
  with graph.as_default(), tf.compat.v1.Session(graph=graph) as sess:
    model_tf1 = SimpleModelWrapper()
    # build the model
    inputs = tf.compat.v1.placeholder(tf.float32, shape=(None, params['input_size']))
    labels = tf.compat.v1.placeholder(tf.float32, shape=(None, params['num_classes']))
    spec = model_tf1.model_fn(inputs, labels, tf.estimator.ModeKeys.TRAIN, params)
    train_op = spec.train_op

    sess.run(tf.compat.v1.global_variables_initializer())
    for step in range(step_num):
      # log everything and update the model for one step
      logs, _ = sess.run(
          [model_tf1.logged_ops, train_op],
          feed_dict={inputs: fake_x, labels: fake_y})
      model_tf1.update_logs(logs)
2023-11-07 19:59:25.069458: W tensorflow/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libcudart.so.11.0'; dlerror: libcudart.so.11.0: cannot open shared object file: No such file or directory
2023-11-07 19:59:25.069585: W tensorflow/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libcublas.so.11'; dlerror: libcublas.so.11: cannot open shared object file: No such file or directory
2023-11-07 19:59:25.069663: W tensorflow/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libcublasLt.so.11'; dlerror: libcublasLt.so.11: cannot open shared object file: No such file or directory
2023-11-07 19:59:25.069737: W tensorflow/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libcufft.so.10'; dlerror: libcufft.so.10: cannot open shared object file: No such file or directory
2023-11-07 19:59:25.149993: W tensorflow/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libcusparse.so.11'; dlerror: libcusparse.so.11: cannot open shared object file: No such file or directory
2023-11-07 19:59:25.150247: W tensorflow/core/common_runtime/gpu/gpu_device.cc:1850] Cannot dlopen some GPU libraries. Please make sure the missing libraries mentioned above are installed properly if you would like to use GPU. Follow the guide at https://www.tensorflow.org/install/gpu for how to download and setup the required libraries for your platform.
Skipping registering GPU devices...
/tmpfs/tmp/ipykernel_234052/1984550333.py:14: UserWarning: `tf.layers.dense` is deprecated and will be removed in a future version. Please use `tf.keras.layers.Dense` instead.
  out_1 = tf.compat.v1.layers.dense(features, units=params['layer_1_size'])
/tmpfs/tmp/ipykernel_234052/1984550333.py:15: UserWarning: `tf.layers.dense` is deprecated and will be removed in a future version. Please use `tf.keras.layers.Dense` instead.
  out_2 = tf.compat.v1.layers.dense(out_1, units=params['layer_2_size'])
/tmpfs/tmp/ipykernel_234052/1984550333.py:16: UserWarning: `tf.layers.dense` is deprecated and will be removed in a future version. Please use `tf.keras.layers.Dense` instead.
  logits = tf.compat.v1.layers.dense(out_2, units=params['num_classes'])

定义 TF2 模型。

class SimpleModel(tf.keras.Model):
  def __init__(self, params, *args, **kwargs):
    super(SimpleModel, self).__init__(*args, **kwargs)
    # define the model
    self.dense_1 = tf.keras.layers.Dense(params['layer_1_size'])
    self.dense_2 = tf.keras.layers.Dense(params['layer_2_size'])
    self.out = tf.keras.layers.Dense(params['num_classes'])
    learning_rate_fn = tf.keras.optimizers.schedules.PolynomialDecay(
      initial_learning_rate=params['init_lr'],
      decay_steps=params['decay_steps'],
      end_learning_rate=params['end_lr'],
      power=params['lr_power'])  
    self.optimizer = tf.keras.optimizers.legacy.SGD(learning_rate_fn)
    self.compiled_loss = tf.keras.losses.CategoricalCrossentropy(from_logits=True)
    self.logs = {
        'lr': [],
        'loss': [],
        'grads': [],
        'weights': [],
        'layer_out': []}

  def call(self, inputs):
    out_1 = self.dense_1(inputs)
    out_2 = self.dense_2(out_1)
    logits = self.out(out_2)
    # log output features for every layer for comparison
    layer_wise_out = {
        'layer_1': out_1,
        'layer_2': out_2,
        'logits': logits}
    self.logs['layer_out'].append(layer_wise_out)
    return logits

  def train_step(self, data):
    x, y = data
    with tf.GradientTape() as tape:
      logits = self(x)
      loss = self.compiled_loss(y, logits)
    grads = tape.gradient(loss, self.trainable_weights)
    # log training statistics
    step = self.optimizer.iterations.numpy()
    self.logs['lr'].append(self.optimizer.learning_rate(step).numpy())
    self.logs['loss'].append(loss.numpy())
    self.logs['grads'].append(grads)
    self.logs['weights'].append(self.trainable_weights)
    # update model
    self.optimizer.apply_gradients(zip(grads, self.trainable_weights))
    return

在 Eager 模式下运行 TF2 模型。收集前 3 个训练步骤的统计数据以进行数值等价性比较。

random_tool = v1.keras.utils.DeterministicRandomTestTool(mode='num_random_ops')
with random_tool.scope():
  model_tf2 = SimpleModel(params)
  for step in range(step_num):
    model_tf2.train_step([fake_x, fake_y])

比较前几个训练步骤的数值等价性。

您还可以查看验证正确性和数值等价性笔记本以获得关于数值等价性的额外建议。

np.testing.assert_allclose(model_tf1.logs['lr'], model_tf2.logs['lr'])
np.testing.assert_allclose(model_tf1.logs['loss'], model_tf2.logs['loss'])
for step in range(step_num):
  for name in model_tf1.logs['layer_out'][step]:
    np.testing.assert_allclose(
        model_tf1.logs['layer_out'][step][name],
        model_tf2.logs['layer_out'][step][name])

单元测试

有几种类型的单元测试可以帮助调试迁移代码。

  1. 单前向传递验证
  2. 几个步骤的模型训练数值等价性验证
  3. 基准推断性能
  4. 已训练模型对固定和简单的数据点做出正确预测

可以使用 @parameterized.parameters 来测试具有不同配置的模型。包含代码示例的详细信息

请注意,可以在同一个测试用例中运行会话 API 和 Eager Execution。下面的代码段显示了具体方式。

import unittest

class TestNumericalEquivalence(unittest.TestCase):

  # copied from code samples above
  def setup(self):
    # record statistics for 100 training steps
    step_num = 100

    # setup TF 1 model
    random_tool = v1.keras.utils.DeterministicRandomTestTool(mode='num_random_ops')
    with random_tool.scope():
      # run TF1.x code in graph mode with context management
      graph = tf.Graph()
      with graph.as_default(), tf.compat.v1.Session(graph=graph) as sess:
        self.model_tf1 = SimpleModelWrapper()
        # build the model
        inputs = tf.compat.v1.placeholder(tf.float32, shape=(None, params['input_size']))
        labels = tf.compat.v1.placeholder(tf.float32, shape=(None, params['num_classes']))
        spec = self.model_tf1.model_fn(inputs, labels, tf.estimator.ModeKeys.TRAIN, params)
        train_op = spec.train_op

        sess.run(tf.compat.v1.global_variables_initializer())
        for step in range(step_num):
          # log everything and update the model for one step
          logs, _ = sess.run(
              [self.model_tf1.logged_ops, train_op],
              feed_dict={inputs: fake_x, labels: fake_y})
          self.model_tf1.update_logs(logs)

    # setup TF2 model
    random_tool = v1.keras.utils.DeterministicRandomTestTool(mode='num_random_ops')
    with random_tool.scope():
      self.model_tf2 = SimpleModel(params)
      for step in range(step_num):
        self.model_tf2.train_step([fake_x, fake_y])

  def test_learning_rate(self):
    np.testing.assert_allclose(
        self.model_tf1.logs['lr'],
        self.model_tf2.logs['lr'])

  def test_training_loss(self):
    # adopt different tolerance strategies before and after 10 steps
    first_n_step = 10

    # absolute difference is limited below 1e-5
    # set `equal_nan` to be False to detect potential NaN loss issues
    abosolute_tolerance = 1e-5
    np.testing.assert_allclose(
        actual=self.model_tf1.logs['loss'][:first_n_step],
        desired=self.model_tf2.logs['loss'][:first_n_step],
        atol=abosolute_tolerance,
        equal_nan=False)

    # relative difference is limited below 5%
    relative_tolerance = 0.05
    np.testing.assert_allclose(self.model_tf1.logs['loss'][first_n_step:],
                               self.model_tf2.logs['loss'][first_n_step:],
                               rtol=relative_tolerance,
                               equal_nan=False)

调试工具

tf.print

tf.print 与 print/logging.info

  • 利用可配置参数,tf.print 能够以递归方式显示打印张量的每个维度的前几个和最后几个元素。请查看 API 文档以了解详情。
  • 对于 Eager Execution,printtf.print 都会打印张量的值。但 print 可能涉及设备到主机的复制,这可能会减慢代码速度。
  • 对于包含 tf.function 内用法的计算图模式,您需要使用 tf.print 打印实际张量值。tf.print 被编译成计算图中的一个运算,而 printlogging.info 只在跟踪时记录,这通常不是您希望的。
  • tf.print 还支持打印复合张量,如 tf.RaggedTensortf.sparse.SparseTensor
  • 您还可以使用回调来监视指标和变量。请检查如何使用带有日志字典self.model 特性的自定义回调。

tf.print 与 tf.function 内的 print

# `print` prints info of tensor object
# `tf.print` prints the tensor value
@tf.function
def dummy_func(num):
  num += 1
  print(num)
  tf.print(num)
  return num

_ = dummy_func(tf.constant([1.0]))

# Output:
# Tensor("add:0", shape=(1,), dtype=float32)
# [2]
Tensor("add:0", shape=(1,), dtype=float32)
[2]

tf.distribute.Strategy

  • 如果包含 tf.printtf.function 在工作进程上执行,例如使用 TPUStrategyParameterServerStrategy 时,您需要检查工作进程/参数服务器日志以查找打印的值。
  • 对于 printlogging.info,使用 ParameterServerStrategy 时将在协调器上打印日志,使用 TPU 时将在 worker0 的 STDOUT 上打印日志。

tf.keras.Model

  • 使用序列式和函数式 API 模型时,如果您想在某些层之后打印值,例如模型输入或中间特征,您可以选择以下选项。
    1. 编写一个自定义层,该层使用 tf.print 打印输入。
    2. 在模型输出中包含要检查的中间输出。
  • tf.keras.layers.Lambda 层具有(反)序列化限制。为避免检查点加载问题,请改为编写自定义子类化层。请参阅 API 文档,了解更多详细信息。
  • 如果您无权访问实际值,则无法在 tf.keras.callbacks.LambdaCallback 中使用 tf.print 打印中间输出,而只能访问符号 Keras 张量对象。

选项 1:编写一个自定义层

class PrintLayer(tf.keras.layers.Layer):
  def call(self, inputs):
    tf.print(inputs)
    return inputs

def get_model():
  inputs = tf.keras.layers.Input(shape=(1,))
  out_1 = tf.keras.layers.Dense(4)(inputs)
  out_2 = tf.keras.layers.Dense(1)(out_1)
  # use custom layer to tf.print intermediate features
  out_3 = PrintLayer()(out_2)
  model = tf.keras.Model(inputs=inputs, outputs=out_3)
  return model

model = get_model()
model.compile(optimizer="adam", loss="mse")
model.fit([1, 2, 3], [0.0, 0.0, 1.0])
[[-0.327884018]
 [-0.109294683]
 [-0.218589365]]
1/1 [==============================] - 0s 319ms/step - loss: 0.6077
<keras.callbacks.History at 0x7f00cfd4c460>

选项 2:在模型输出中包含要检查的中间输出。

请注意,在这种情况下,您可能需要进行一些自定义才能使用 Model.fit

def get_model():
  inputs = tf.keras.layers.Input(shape=(1,))
  out_1 = tf.keras.layers.Dense(4)(inputs)
  out_2 = tf.keras.layers.Dense(1)(out_1)
  # include intermediate values in model outputs
  model = tf.keras.Model(
      inputs=inputs,
      outputs={
          'inputs': inputs,
          'out_1': out_1,
          'out_2': out_2})
  return model

pdb

可以在终端和 Colab 中使用 pdb 来检查中间值以进行调试。

使用 TensorBoard 呈现计算图

可以使用 TensorBoard 检查 TensorFlow 计算图Colab 上也支持 TensorBoard。TensorBoard 是呈现摘要的绝佳工具。您可以利用它来比较训练过程中的学习率、模型权重、梯度尺度、训练/验证指标,甚至是 TF1.x 模型和迁移的 TF2 模型之间的模型中间输出,并查看值是否符合预期。

TensorFlow Profiler

TensorFlow Profiler 可以帮助您呈现 GPU/TPU 上的执行时间线。可以查看此 Colab 演示来了解其基本用法。

MultiProcessRunner

在使用 MultiWorkerMirroredStrategy 和 ParameterServerStrategy 进行调试时,MultiProcessRunner 是一个实用的工具。可以查看此具体示例来了解它的用法。

特别是对于这两种策略的情况,建议您 1) 不仅要使用单元测试来覆盖它们的流,2) 还要尝试在单元测试中使用它来重现失败,以避免每次尝试修复时都启动真正的分布式作业。