在 TensorFlow.org 上查看 | 在 Google Colab 中运行 | 在 GitHub 上查看源代码 | 下载笔记本 |
概述
在本教程中,您将学习如何将 DTensor 与 Keras 一起使用。
通过将 DTensor 与 Keras 集成,您可以重用现有的 Keras 层和模型来构建和训练分布式机器学习模型。
您将使用 MNIST 数据训练多层分类模型。本文将演示如何设置子类化模型、序贯模型和函数式模型的布局。
本教程假设您已经阅读了 DTensor 编程指南,并且熟悉基本的 DTensor 概念,例如 Mesh
和 Layout
。
本教程基于 https://tensorflow.google.cn/datasets/keras_example。
安装
DTensor 是 TensorFlow 2.9.0 版本的一部分。
pip install --quiet --upgrade --pre tensorflow tensorflow-datasets
接下来,导入 tensorflow
和 tensorflow.experimental.dtensor
,并将 TensorFlow 配置为使用 8 个虚拟 CPU。
尽管本示例使用了 CPU,但 DTensor 在 CPU、GPU 或 TPU 设备上的工作方式相同。
import tensorflow as tf
import tensorflow_datasets as tfds
from tensorflow.experimental import dtensor
2023-11-07 23:29:54.584316: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:9261] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered 2023-11-07 23:29:54.584359: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:607] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered 2023-11-07 23:29:54.585900: E external/local_xla/xla/stream_executor/cuda/cuda_blas.cc:1515] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
def configure_virtual_cpus(ncpu):
phy_devices = tf.config.list_physical_devices('CPU')
tf.config.set_logical_device_configuration(
phy_devices[0],
[tf.config.LogicalDeviceConfiguration()] * ncpu)
configure_virtual_cpus(8)
tf.config.list_logical_devices('CPU')
devices = [f'CPU:{i}' for i in range(8)]
确定性伪随机数生成器
您应当注意的一件事是 DTensor API 要求每个正在运行的客户端具有相同的随机种子,以便它可以具有用于初始化权重的确定性行为。可以通过 tf.keras.utils.set_random_seed()
在 Keras 中设置全局种子来实现此目的。
tf.keras.backend.experimental.enable_tf_random_generator()
tf.keras.utils.set_random_seed(1337)
创建数据并行网格
本教程演示数据并行训练。适应模型并行训练和空间并行训练可以像切换到一组不同的 Layout
对象一样简单。有关数据并行之外的分布式训练的更多信息,请参阅 DTensor 深入机器学习教程。
数据并行训练是一种常用的并行训练方案,也被诸如 tf.distribute.MirroredStrategy
等使用。
使用 DTensor,数据并行训练循环使用由单个“批次”维度组成的 Mesh
,其中每个设备都会运行模型的副本,从全局批次接收分片。
mesh = dtensor.create_mesh([("batch", 8)], devices=devices)
由于每个设备都运行模型的完整副本,模型变量应在网格中完全复制(不分片)。例如,此 Mesh
上 2 秩权重的完全复制布局如下:
example_weight_layout = dtensor.Layout([dtensor.UNSHARDED, dtensor.UNSHARDED], mesh) # or
example_weight_layout = dtensor.Layout.replicated(mesh, rank=2)
此 Mesh
上 2 秩数据张量的布局将沿第一个维度进行分片(有时称为 batch_sharded
),
example_data_layout = dtensor.Layout(['batch', dtensor.UNSHARDED], mesh) # or
example_data_layout = dtensor.Layout.batch_sharded(mesh, 'batch', rank=2)
使用布局创建 Keras 层
在数据并行方案中,您通常使用完全复制的布局创建模型权重,以便模型的每个副本都可以使用分片输入数据进行计算。
为了为您的层权重配置布局信息,Keras 在层构造函数中为大多数内置层公开了一个额外的参数。
以下示例使用完全复制的权重布局构建了一个小型图像分类模型。您可以通过参数 kernel_layout
和 bias_layout
在 tf.keras.layers.Dense
中指定布局信息 kernel
和 bias
。大多数内置 Keras 层都可以显式地指定层权重的 Layout
。
unsharded_layout_2d = dtensor.Layout.replicated(mesh, 2)
unsharded_layout_1d = dtensor.Layout.replicated(mesh, 1)
model = tf.keras.models.Sequential([
tf.keras.layers.Flatten(input_shape=(28, 28)),
tf.keras.layers.Dense(128,
activation='relu',
name='d1',
kernel_layout=unsharded_layout_2d,
bias_layout=unsharded_layout_1d),
tf.keras.layers.Dense(10,
name='d2',
kernel_layout=unsharded_layout_2d,
bias_layout=unsharded_layout_1d)
])
您可以通过检查权重的 layout
属性来查看布局信息。
for weight in model.weights:
print(f'Weight name: {weight.name} with layout: {weight.layout}')
break
Weight name: d1/kernel:0 with layout: Layout.from_string(sharding_specs:unsharded,unsharded, mesh:|batch=8|0,1,2,3,4,5,6,7|0,1,2,3,4,5,6,7|/job:localhost/replica:0/task:0/device:CPU:0,/job:localhost/replica:0/task:0/device:CPU:1,/job:localhost/replica:0/task:0/device:CPU:2,/job:localhost/replica:0/task:0/device:CPU:3,/job:localhost/replica:0/task:0/device:CPU:4,/job:localhost/replica:0/task:0/device:CPU:5,/job:localhost/replica:0/task:0/device:CPU:6,/job:localhost/replica:0/task:0/device:CPU:7)
加载数据集并构建输入流水线
加载一个 MNIST 数据集并为其配置一些预处理输入流水线。数据集本身与任何 DTensor 布局信息不关联。我们计划在未来的 TensorFlow 版本中改进 DTensor Keras 与 tf.data
的集成。
(ds_train, ds_test), ds_info = tfds.load(
'mnist',
split=['train', 'test'],
shuffle_files=True,
as_supervised=True,
with_info=True,
)
def normalize_img(image, label):
"""Normalizes images: `uint8` -> `float32`."""
return tf.cast(image, tf.float32) / 255., label
batch_size = 128
ds_train = ds_train.map(
normalize_img, num_parallel_calls=tf.data.AUTOTUNE)
ds_train = ds_train.cache()
ds_train = ds_train.shuffle(ds_info.splits['train'].num_examples)
ds_train = ds_train.batch(batch_size)
ds_train = ds_train.prefetch(tf.data.AUTOTUNE)
ds_test = ds_test.map(
normalize_img, num_parallel_calls=tf.data.AUTOTUNE)
ds_test = ds_test.batch(batch_size)
ds_test = ds_test.cache()
ds_test = ds_test.prefetch(tf.data.AUTOTUNE)
定义模型的训练逻辑
接下来,定义模型的训练和评估逻辑。
从 TensorFlow 2.9 开始,您必须为启用 DTensor 的 Keras 模型编写自定义训练循环。这是为了用适当的布局信息打包输入数据,这些信息未与 Keras 中的标准 tf.keras.Model.fit()
或 tf.keras.Model.eval()
函数集成。您将在即将发布的版本中获得更多 tf.data
支持。
@tf.function
def train_step(model, x, y, optimizer, metrics):
with tf.GradientTape() as tape:
logits = model(x, training=True)
# tf.reduce_sum sums the batch sharded per-example loss to a replicated
# global loss (scalar).
loss = tf.reduce_sum(tf.keras.losses.sparse_categorical_crossentropy(
y, logits, from_logits=True))
gradients = tape.gradient(loss, model.trainable_variables)
optimizer.apply_gradients(zip(gradients, model.trainable_variables))
for metric in metrics.values():
metric.update_state(y_true=y, y_pred=logits)
loss_per_sample = loss / len(x)
results = {'loss': loss_per_sample}
return results
@tf.function
def eval_step(model, x, y, metrics):
logits = model(x, training=False)
loss = tf.reduce_sum(tf.keras.losses.sparse_categorical_crossentropy(
y, logits, from_logits=True))
for metric in metrics.values():
metric.update_state(y_true=y, y_pred=logits)
loss_per_sample = loss / len(x)
results = {'eval_loss': loss_per_sample}
return results
def pack_dtensor_inputs(images, labels, image_layout, label_layout):
num_local_devices = image_layout.mesh.num_local_devices()
images = tf.split(images, num_local_devices)
labels = tf.split(labels, num_local_devices)
images = dtensor.pack(images, image_layout)
labels = dtensor.pack(labels, label_layout)
return images, labels
指标和优化器
将 DTensor API 与 Keras Metric
和 Optimizer
一起使用时,您需要提供额外的网格信息,以便任何内部状态变量和张量都可以使用模型中的变量。
对于优化器,DTensor 引入了一个新的实验性命名空间
keras.dtensor.experimental.optimizers
,其中扩展了许多现有的 Keras 优化器以接收额外的mesh
参数。在未来的版本中,它可能会与 Keras 核心优化器合并。对于指标,可以直接将
mesh
作为参数指定给构造函数,使其成为兼容 DTensor 的Metric
。
optimizer = tf.keras.dtensor.experimental.optimizers.Adam(0.01, mesh=mesh)
metrics = {'accuracy': tf.keras.metrics.SparseCategoricalAccuracy(mesh=mesh)}
eval_metrics = {'eval_accuracy': tf.keras.metrics.SparseCategoricalAccuracy(mesh=mesh)}
训练模型
以下示例在批次维度上对来自输入流水线的数据进行分片,并使用具有完全复制权重的模型进行训练。
经过 3 个周期后,模型应当达到大约 97% 的准确率。
num_epochs = 3
image_layout = dtensor.Layout.batch_sharded(mesh, 'batch', rank=4)
label_layout = dtensor.Layout.batch_sharded(mesh, 'batch', rank=1)
for epoch in range(num_epochs):
print("============================")
print("Epoch: ", epoch)
for metric in metrics.values():
metric.reset_state()
step = 0
results = {}
pbar = tf.keras.utils.Progbar(target=None, stateful_metrics=[])
for input in ds_train:
images, labels = input[0], input[1]
images, labels = pack_dtensor_inputs(
images, labels, image_layout, label_layout)
results.update(train_step(model, images, labels, optimizer, metrics))
for metric_name, metric in metrics.items():
results[metric_name] = metric.result()
pbar.update(step, values=results.items(), finalize=False)
step += 1
pbar.update(step, values=results.items(), finalize=True)
for metric in eval_metrics.values():
metric.reset_state()
for input in ds_test:
images, labels = input[0], input[1]
images, labels = pack_dtensor_inputs(
images, labels, image_layout, label_layout)
results.update(eval_step(model, images, labels, eval_metrics))
for metric_name, metric in eval_metrics.items():
results[metric_name] = metric.result()
for metric_name, metric in results.items():
print(f"{metric_name}: {metric.numpy()}")
============================ Epoch: 0 469/Unknown - 7s 16ms/step - loss: 0.2907 - accuracy: 0.8308 469/Unknown - 4s 9ms/step - loss: 0.1285 - accuracy: 0.9595 469/Unknown - 4s 9ms/step - loss: 0.1010 - accuracy: 0.9682 loss: 0.044021397829055786 accuracy: 0.9682833552360535 eval_loss: 0.05413995310664177 eval_accuracy: 0.9656000137329102
为现有模型代码指定布局
通常,您的模型非常适合您的用例。为模型中的每个单独层指定 Layout
信息将是一项需要大量编辑的工作。
为了帮助您轻松地将现有 Keras 模型转换为使用 DTensor API,可以使用新的 dtensor.LayoutMap
API,它允许您从全局角度指定 Layout
。
首先,您需要创建一个 LayoutMap
实例,它是一个类似字典的对象,其中包含您要为模型权重指定的所有 Layout
。
LayoutMap
在初始化时需要一个 Mesh
实例,该实例可用于为任何未配置布局的权重提供默认的复制 Layout
。如果您希望完全复制所有模型权重,则可以提供空的 LayoutMap
,默认网格将用于创建复制的 Layout
。
LayoutMap
使用字符串作为键,使用 Layout
作为值。普通的 Python 字典与此类之间存在行为差异。检索值时,字符串键将被视为正则表达式
子类化模型
考虑使用 Keras 子类化模型语法定义的以下模型。
class SubclassedModel(tf.keras.Model):
def __init__(self, name=None):
super().__init__(name=name)
self.feature = tf.keras.layers.Dense(16)
self.feature_2 = tf.keras.layers.Dense(24)
self.dropout = tf.keras.layers.Dropout(0.1)
def call(self, inputs, training=None):
x = self.feature(inputs)
x = self.dropout(x, training=training)
return self.feature_2(x)
此模型中有 4 个权重,分别是两个 Dense
层的 kernel
和 bias
。它们中的每一个都基于对象路径进行映射:
model.feature.kernel
model.feature.bias
model.feature_2.kernel
model.feature_2.bias
注:对于子类化模型,特性名称而不是层的 .name
特性用作从映射中检索布局的键。这与 tf.Module
检查点遵循的约定一致。对于具有多个层的复杂模型,您可以手动检查检查点来查看特性映射。
现在,定义以下 LayoutMap
并将其应用于模型。
layout_map = tf.keras.dtensor.experimental.LayoutMap(mesh=mesh)
layout_map['feature.*kernel'] = dtensor.Layout.batch_sharded(mesh, 'batch', rank=2)
layout_map['feature.*bias'] = dtensor.Layout.batch_sharded(mesh, 'batch', rank=1)
with layout_map.scope():
subclassed_model = SubclassedModel()
模型权重是在第一次调用时创建的,因此使用 DTensor 输入调用模型并确认权重具有预期的布局。
dtensor_input = dtensor.copy_to_mesh(tf.zeros((16, 16)), layout=unsharded_layout_2d)
# Trigger the weights creation for subclass model
subclassed_model(dtensor_input)
print(subclassed_model.feature.kernel.layout)
Layout.from_string(sharding_specs:batch,unsharded, mesh:|batch=8|0,1,2,3,4,5,6,7|0,1,2,3,4,5,6,7|/job:localhost/replica:0/task:0/device:CPU:0,/job:localhost/replica:0/task:0/device:CPU:1,/job:localhost/replica:0/task:0/device:CPU:2,/job:localhost/replica:0/task:0/device:CPU:3,/job:localhost/replica:0/task:0/device:CPU:4,/job:localhost/replica:0/task:0/device:CPU:5,/job:localhost/replica:0/task:0/device:CPU:6,/job:localhost/replica:0/task:0/device:CPU:7)
这样一来,您就可以快速将 Layout
映射到您的模型,而无需更新任何现有代码。
序贯模型和函数式模型
对于 Keras 序贯和函数式模型,您也可以使用 LayoutMap
。
注:对于序贯模型和函数式模型,映射略有不同。模型中的层没有附加到模型的公共特性(尽管可以通过 model.layers
作为列表访问它们)。在这种情况下,使用字符串名称作为键。字符串名称保证在模型中是唯一的。
layout_map = tf.keras.dtensor.experimental.LayoutMap(mesh=mesh)
layout_map['feature.*kernel'] = dtensor.Layout.batch_sharded(mesh, 'batch', rank=2)
layout_map['feature.*bias'] = dtensor.Layout.batch_sharded(mesh, 'batch', rank=1)
with layout_map.scope():
inputs = tf.keras.Input((16,), batch_size=16)
x = tf.keras.layers.Dense(16, name='feature')(inputs)
x = tf.keras.layers.Dropout(0.1)(x)
output = tf.keras.layers.Dense(32, name='feature_2')(x)
model = tf.keras.Model(inputs, output)
print(model.layers[1].kernel.layout)
Layout.from_string(sharding_specs:batch,unsharded, mesh:|batch=8|0,1,2,3,4,5,6,7|0,1,2,3,4,5,6,7|/job:localhost/replica:0/task:0/device:CPU:0,/job:localhost/replica:0/task:0/device:CPU:1,/job:localhost/replica:0/task:0/device:CPU:2,/job:localhost/replica:0/task:0/device:CPU:3,/job:localhost/replica:0/task:0/device:CPU:4,/job:localhost/replica:0/task:0/device:CPU:5,/job:localhost/replica:0/task:0/device:CPU:6,/job:localhost/replica:0/task:0/device:CPU:7)
with layout_map.scope():
model = tf.keras.Sequential([
tf.keras.layers.Dense(16, name='feature', input_shape=(16,)),
tf.keras.layers.Dropout(0.1),
tf.keras.layers.Dense(32, name='feature_2')
])
print(model.layers[2].kernel.layout)
Layout.from_string(sharding_specs:batch,unsharded, mesh:|batch=8|0,1,2,3,4,5,6,7|0,1,2,3,4,5,6,7|/job:localhost/replica:0/task:0/device:CPU:0,/job:localhost/replica:0/task:0/device:CPU:1,/job:localhost/replica:0/task:0/device:CPU:2,/job:localhost/replica:0/task:0/device:CPU:3,/job:localhost/replica:0/task:0/device:CPU:4,/job:localhost/replica:0/task:0/device:CPU:5,/job:localhost/replica:0/task:0/device:CPU:6,/job:localhost/replica:0/task:0/device:CPU:7)