FFJORD

设置

首先,安装本演示使用的软件包。

pip install -q dm-sonnet

Imports (tf, tfp with adjoint trick, etc)

import numpy as np
import tqdm as tqdm
import sklearn.datasets as skd

# visualization
import matplotlib.pyplot as plt
import seaborn as sns
from scipy.stats import kde

# tf and friends
import tensorflow.compat.v2 as tf
import tensorflow_probability as tfp
import sonnet as snt
tf.enable_v2_behavior()

tfb = tfp.bijectors
tfd = tfp.distributions

def make_grid(xmin, xmax, ymin, ymax, gridlines, pts):
  xpts = np.linspace(xmin, xmax, pts)
  ypts = np.linspace(ymin, ymax, pts)
  xgrid = np.linspace(xmin, xmax, gridlines)
  ygrid = np.linspace(ymin, ymax, gridlines)
  xlines = np.stack([a.ravel() for a in np.meshgrid(xpts, ygrid)])
  ylines = np.stack([a.ravel() for a in np.meshgrid(xgrid, ypts)])
  return np.concatenate([xlines, ylines], 1).T

grid = make_grid(-3, 3, -3, 3, 4, 100)

/usr/local/lib/python3.6/dist-packages/statsmodels/tools/_testing.py:19: FutureWarning: pandas.util.testing is deprecated. Use the functions in the public API at pandas.testing instead.
  import pandas.util.testing as tm

Helper functions for visualization

def plot_density(data, axis):
  x, y = np.squeeze(np.split(data, 2, axis=1))
  levels = np.linspace(0.0, 0.75, 10)
  kwargs = {'levels': levels}
  return sns.kdeplot(x, y, cmap="viridis", shade=True, 
                     shade_lowest=True, ax=axis, **kwargs)


def plot_points(data, axis, s=10, color='b', label=''):
  x, y = np.squeeze(np.split(data, 2, axis=1))
  axis.scatter(x, y, c=color, s=s, label=label)


def plot_panel(
    grid, samples, transformed_grid, transformed_samples,
    dataset, axarray, limits=True):
  if len(axarray) != 4:
    raise ValueError('Expected 4 axes for the panel')
  ax1, ax2, ax3, ax4 = axarray
  plot_points(data=grid, axis=ax1, s=20, color='black', label='grid')
  plot_points(samples, ax1, s=30, color='blue', label='samples')
  plot_points(transformed_grid, ax2, s=20, color='black', label='ode(grid)')
  plot_points(transformed_samples, ax2, s=30, color='blue', label='ode(samples)')
  ax3 = plot_density(transformed_samples, ax3)
  ax4 = plot_density(dataset, ax4)
  if limits:
    set_limits([ax1], -3.0, 3.0, -3.0, 3.0)
    set_limits([ax2], -2.0, 3.0, -2.0, 3.0)
    set_limits([ax3, ax4], -1.5, 2.5, -0.75, 1.25)


def set_limits(axes, min_x, max_x, min_y, max_y):
  if isinstance(axes, list):
    for axis in axes:
      set_limits(axis, min_x, max_x, min_y, max_y)
  else:
    axes.set_xlim(min_x, max_x)
    axes.set_ylim(min_y, max_y)

FFJORD 双射器

在此 Colab 中,我们将演示 FFJORD 双射器,此双射器最初由 Grathwohl、Will 等人在其论文(arXiv 链接)中提出。

简而言之,这种方式背后的思想是在已知的基础分布数据分布之间建立对应关系。

为了建立这种联系,我们需要进行以下操作:

  1. 在定义基础分布的空间 Y 与数据域的空间 X 之间定义一个双射映射 Tθ:xy, Tθ1:yx
  2. 有效地跟踪我们执行的将概率概念转移到 X 上的变形。

X 上定义的概率分布的以下表达式中对第二个条件进行了形式化:

logpx(x)=logpy(y)logdet|Tθ(y)y|

FFJORD 双射器通过定义以下转换来实现这一点:Tθ:x=z(t0)y=z(t1):dzdt=f(t,z,θ)

只要描述状态 z 演化的函数 f 表现良好,并且可以通过集成以下表达式来计算 log_det_jacobian,则此转换可逆。

logdet|Tθ(y)y|=t0t1Tr(f(t,z,θ)z(t))dt

在此演示中,我们将训练 FFJORD 双射器,将高斯分布扭曲到 moons 数据集定义的分布上。这将分 3 个步骤完成:

  • 定义基础分布
  • 定义 FFJORD 双射器
  • 最小化数据集的精确对数似然。

首先,我们加载数据

Dataset

DATASET_SIZE = 1024 * 8 
BATCH_SIZE = 256 
SAMPLE_SIZE = DATASET_SIZE

moons = skd.make_moons(n_samples=DATASET_SIZE, noise=.06)[0]

moons_ds = tf.data.Dataset.from_tensor_slices(moons.astype(np.float32))
moons_ds = moons_ds.prefetch(tf.data.experimental.AUTOTUNE)
moons_ds = moons_ds.cache()
moons_ds = moons_ds.shuffle(DATASET_SIZE)
moons_ds = moons_ds.batch(BATCH_SIZE)

plt.figure(figsize=[8, 8])
plt.scatter(moons[:, 0], moons[:, 1])
plt.show()

png

接下来,我们实例化基础分布

base_loc = np.array([0.0, 0.0]).astype(np.float32)
base_sigma = np.array([0.8, 0.8]).astype(np.float32)
base_distribution = tfd.MultivariateNormalDiag(base_loc, base_sigma)

我们使用多层感知器对 state_derivative_fn 进行建模。

虽然对此数据集来说并非必需,但使 state_derivative_fn 依赖于时间通常有益。在这里,我们通过将 t 连接到网络的输入来实现这一点。

class MLP_ODE(snt.Module):
  """Multi-layer NN ode_fn."""
  def __init__(self, num_hidden, num_layers, num_output, name='mlp_ode'):
    super(MLP_ODE, self).__init__(name=name)
    self._num_hidden = num_hidden
    self._num_output = num_output
    self._num_layers = num_layers
    self._modules = []
    for _ in range(self._num_layers - 1):
      self._modules.append(snt.Linear(self._num_hidden))
      self._modules.append(tf.math.tanh)
    self._modules.append(snt.Linear(self._num_output))
    self._model = snt.Sequential(self._modules)

  def __call__(self, t, inputs):
    inputs = tf.concat([tf.broadcast_to(t, inputs.shape), inputs], -1)
    return self._model(inputs)

Model and training parameters

LR = 1e-2 
NUM_EPOCHS = 80 
STACKED_FFJORDS = 4 
NUM_HIDDEN = 8 
NUM_LAYERS = 3 
NUM_OUTPUT = 2

现在,我们构造一个 FFJORD 双射器的堆栈。为每个双射器提供 ode_solve_fntrace_augmentation_fn,以及它自己的 state_derivative_fn 模型,因此它们表示一个不同转换的序列。

Building bijector

solver = tfp.math.ode.DormandPrince(atol=1e-5)
ode_solve_fn = solver.solve
trace_augmentation_fn = tfb.ffjord.trace_jacobian_exact

bijectors = []
for _ in range(STACKED_FFJORDS):
  mlp_model = MLP_ODE(NUM_HIDDEN, NUM_LAYERS, NUM_OUTPUT)
  next_ffjord = tfb.FFJORD(
      state_time_derivative_fn=mlp_model,ode_solve_fn=ode_solve_fn,
      trace_augmentation_fn=trace_augmentation_fn)
  bijectors.append(next_ffjord)

stacked_ffjord = tfb.Chain(bijectors[::-1])

现在,我们可以使用 TransformedDistribution,这是使用 stacked_ffjord 双射器扭曲 base_distribution 的结果。

transformed_distribution = tfd.TransformedDistribution(
    distribution=base_distribution, bijector=stacked_ffjord)

现在,我们来定义训练过程。只需使数据的负对数似然最小化。

Training

@tf.function
def train_step(optimizer, target_sample):
  with tf.GradientTape() as tape:
    loss = -tf.reduce_mean(transformed_distribution.log_prob(target_sample))
  variables = tape.watched_variables()
  gradients = tape.gradient(loss, variables)
  optimizer.apply(gradients, variables)
  return loss

Samples

@tf.function
def get_samples():
  base_distribution_samples = base_distribution.sample(SAMPLE_SIZE)
  transformed_samples = transformed_distribution.sample(SAMPLE_SIZE)
  return base_distribution_samples, transformed_samples


@tf.function
def get_transformed_grid():
  transformed_grid = stacked_ffjord.forward(grid)
  return transformed_grid

根据基础分布和转换分布绘制样本。

evaluation_samples = []
base_samples, transformed_samples = get_samples()
transformed_grid = get_transformed_grid()
evaluation_samples.append((base_samples, transformed_samples, transformed_grid))
WARNING:tensorflow:From /usr/local/lib/python3.6/dist-packages/tensorflow/python/ops/resource_variable_ops.py:1817: calling BaseResourceVariable.__init__ (from tensorflow.python.ops.resource_variable_ops) with constraint is deprecated and will be removed in a future version.
Instructions for updating:
If using Keras pass *_constraint arguments to layers.
panel_id = 0
panel_data = evaluation_samples[panel_id]
fig, axarray = plt.subplots(
  1, 4, figsize=(16, 6))
plot_panel(
    grid, panel_data[0], panel_data[2], panel_data[1], moons, axarray, False)
plt.tight_layout()

png

learning_rate = tf.Variable(LR, trainable=False)
optimizer = snt.optimizers.Adam(learning_rate)

for epoch in tqdm.trange(NUM_EPOCHS // 2):
  base_samples, transformed_samples = get_samples()
  transformed_grid = get_transformed_grid()
  evaluation_samples.append(
      (base_samples, transformed_samples, transformed_grid))
  for batch in moons_ds:
    _ = train_step(optimizer, batch)
0%|          | 0/40 [00:00<?, ?it/s]
WARNING:tensorflow:From /usr/local/lib/python3.6/dist-packages/tensorflow_probability/python/math/ode/base.py:350: calling while_loop_v2 (from tensorflow.python.ops.control_flow_ops) with back_prop=False is deprecated and will be removed in a future version.
Instructions for updating:
back_prop=False is deprecated. Consider using tf.stop_gradient instead.
Instead of:
results = tf.while_loop(c, b, vars, back_prop=False)
Use:
results = tf.nest.map_structure(tf.stop_gradient, tf.while_loop(c, b, vars))
100%|██████████| 40/40 [07:00<00:00, 10.52s/it]
panel_id = -1
panel_data = evaluation_samples[panel_id]
fig, axarray = plt.subplots(
  1, 4, figsize=(16, 6))
plot_panel(grid, panel_data[0], panel_data[2], panel_data[1], moons, axarray)
plt.tight_layout()

png

通过学习率对其训练更长时间,可获得进一步改善。

本例中未涉及,FFJORD 双射器支持哈钦森的随机迹估算。可通过 trace_augmentation_fn 提供特定的 estimator。同样,也可以通过定义自定义 ode_solve_fn 来使用替代积分器。