简单的音频识别:识别关键词

在 TensorFlow.org 上查看 在 Google Colab 中运行 在 Github 上查看源代码 下载笔记本

本教程演示了如何预处理 WAV 格式的音频文件,并构建和训练一个基本的自动语音识别 (ASR) 模型来识别十个不同的单词。您将使用 Speech Commands 数据集Warden,2018 年)的一部分,其中包含命令的短(一秒或更短)音频片段,例如“down”、“go”、“left”、“no”、“right”、“stop”、“up”和“yes”。

现实世界的语音和音频识别系统很复杂。但是,就像使用 MNIST 数据集进行图像分类一样,本教程应该能够使您对所涉及的技术有一个基本的了解。

设置

导入必要的模块和依赖项。您将使用 tf.keras.utils.audio_dataset_from_directory(在 TensorFlow 2.10 中引入),它有助于从 .wav 文件的目录生成音频分类数据集。在本教程中,您还需要 seaborn 进行呈现。

pip install -U -q tensorflow tensorflow_datasets
apt install --allow-change-held-packages libcudnn8=8.1.0.77-1+cuda11.2
import os
import pathlib

import matplotlib.pyplot as plt
import numpy as np
import seaborn as sns
import tensorflow as tf

from tensorflow.keras import layers
from tensorflow.keras import models
from IPython import display

# Set the seed value for experiment reproducibility.
seed = 42
tf.random.set_seed(seed)
np.random.seed(seed)

导入迷你 Speech Commands 数据集

为了节省数据加载时间,您将使用较小版本的 Speech Commands 数据集。原始数据集包含超过 105,000 个音频文件,采用 WAV(波形)音频文件格式,内容是不同的人们说出 35 个不同的单词。此数据由 Google 收集并根据 CC BY 许可发布。

使用 tf.keras.utils.get_file 下载并提取包含较 Speech Commands 数据集的 mini_speech_commands.zip

DATASET_PATH = 'data/mini_speech_commands'

data_dir = pathlib.Path(DATASET_PATH)
if not data_dir.exists():
  tf.keras.utils.get_file(
      'mini_speech_commands.zip',
      origin="http://storage.googleapis.com/download.tensorflow.org/data/mini_speech_commands.zip",
      extract=True,
      cache_dir='.', cache_subdir='data')

数据集的音频片段存储在与每个语音命令对应的八个文件夹中:noyesdowngoleftuprightstop

commands = np.array(tf.io.gfile.listdir(str(data_dir)))
commands = commands[(commands != 'README.md') & (commands != '.DS_Store')]
print('Commands:', commands)

以这种方式划分目录,您可以使用 keras.utils.audio_dataset_from_directory 轻松加载数据。

音频剪辑在 16kHz 时为 1 秒或更短。output_sequence_length=16000 会将短音频剪辑填充到恰好 1 秒(并且会修剪较长的音频剪辑),以便可以轻松地对它们进行批处理。

train_ds, val_ds = tf.keras.utils.audio_dataset_from_directory(
    directory=data_dir,
    batch_size=64,
    validation_split=0.2,
    seed=0,
    output_sequence_length=16000,
    subset='both')

label_names = np.array(train_ds.class_names)
print()
print("label names:", label_names)

数据集现在包含批量的音频剪辑和整数标签。音频剪辑的形状为 (batch, samples, channels)

train_ds.element_spec

此数据集仅包含单声道音频,因此使用 tf.squeeze 函数删除额外的轴:

def squeeze(audio, labels):
  audio = tf.squeeze(audio, axis=-1)
  return audio, labels

train_ds = train_ds.map(squeeze, tf.data.AUTOTUNE)
val_ds = val_ds.map(squeeze, tf.data.AUTOTUNE)

utils.audio_dataset_from_directory 函数最多只返回两个拆分。将测试集与验证集分开是一个好主意。理想情况下,您会将其保存在单独的目录中,但在这种情况下,您可以使用 Dataset.shard 将验证集拆分成两半。请注意,遍历任何分片将加载所有数据,并且只保留它的片段。

test_ds = val_ds.shard(num_shards=2, index=0)
val_ds = val_ds.shard(num_shards=2, index=1)
for example_audio, example_labels in train_ds.take(1):  
  print(example_audio.shape)
  print(example_labels.shape)

我们来绘制一些音频波形:

label_names[[1,1,3,0]]
plt.figure(figsize=(16, 10))
rows = 3
cols = 3
n = rows * cols
for i in range(n):
  plt.subplot(rows, cols, i+1)
  audio_signal = example_audio[i]
  plt.plot(audio_signal)
  plt.title(label_names[example_labels[i]])
  plt.yticks(np.arange(-1.2, 1.2, 0.2))
  plt.ylim([-1.1, 1.1])

将波形转换为频谱图

数据集中的波形在时域中表示。接下来,您将通过计算短时傅里叶变换 (STFT) 将波形从时域信号转换为时频域信号,以将波形转换为频谱图,显示频率随时间的变化,并且可以表示为二维图像。您将把频谱图图像输入您的神经网络以训练模型。

傅里叶变换 (tf.signal.fft) 会将信号转换为其分量频率,但会丢失所有时间信息。相比之下,STFT (tf.signal.stft) 会将信号拆分为时间窗口,并在每个窗口上运行傅里叶变换,保留一些时间信息,并返回可以运行标准卷积的二维张量。

创建用于将波形转换为频谱图的效用函数:

  • 这些波形需要具有相同的长度,以便将它们转换为频谱图时,结果具有相似的维度。这可以通过简单地对短于一秒的音频片段进行零填充(使用 tf.zeros)来完成。
  • 调用 tf.signal.stft 时,请选择 frame_lengthframe_step 参数,使生成的频谱图“图像”几乎为方形。有关 STFT 参数选择的更多信息,请参阅有关音频信号处理和 STFT 的 Coursera 视频
  • STFT 会产生表示幅度和相位的复数数组。但是,在本教程中,您将只使用幅度,您可以通过在 tf.signal.stft 的输出上应用 tf.abs 来获得该 tf.signal.stft
def get_spectrogram(waveform):
  # Convert the waveform to a spectrogram via a STFT.
  spectrogram = tf.signal.stft(
      waveform, frame_length=255, frame_step=128)
  # Obtain the magnitude of the STFT.
  spectrogram = tf.abs(spectrogram)
  # Add a `channels` dimension, so that the spectrogram can be used
  # as image-like input data with convolution layers (which expect
  # shape (`batch_size`, `height`, `width`, `channels`).
  spectrogram = spectrogram[..., tf.newaxis]
  return spectrogram

接下来,开始探索数据。打印一个样本的张量波形形状和相应的频谱图,并播放原始音频:

for i in range(3):
  label = label_names[example_labels[i]]
  waveform = example_audio[i]
  spectrogram = get_spectrogram(waveform)

  print('Label:', label)
  print('Waveform shape:', waveform.shape)
  print('Spectrogram shape:', spectrogram.shape)
  print('Audio playback')
  display.display(display.Audio(waveform, rate=16000))

现在,定义一个显示频谱图的函数:

def plot_spectrogram(spectrogram, ax):
  if len(spectrogram.shape) > 2:
    assert len(spectrogram.shape) == 3
    spectrogram = np.squeeze(spectrogram, axis=-1)
  # Convert the frequencies to log scale and transpose, so that the time is
  # represented on the x-axis (columns).
  # Add an epsilon to avoid taking a log of zero.
  log_spec = np.log(spectrogram.T + np.finfo(float).eps)
  height = log_spec.shape[0]
  width = log_spec.shape[1]
  X = np.linspace(0, np.size(spectrogram), num=width, dtype=int)
  Y = range(height)
  ax.pcolormesh(X, Y, log_spec)

绘制样本随时间变化的波形和相应的频谱图(随时间变化的频率):

fig, axes = plt.subplots(2, figsize=(12, 8))
timescale = np.arange(waveform.shape[0])
axes[0].plot(timescale, waveform.numpy())
axes[0].set_title('Waveform')
axes[0].set_xlim([0, 16000])

plot_spectrogram(spectrogram.numpy(), axes[1])
axes[1].set_title('Spectrogram')
plt.suptitle(label.title())
plt.show()

接下来,从音频数据集创建频谱图数据集:

def make_spec_ds(ds):
  return ds.map(
      map_func=lambda audio,label: (get_spectrogram(audio), label),
      num_parallel_calls=tf.data.AUTOTUNE)
train_spectrogram_ds = make_spec_ds(train_ds)
val_spectrogram_ds = make_spec_ds(val_ds)
test_spectrogram_ds = make_spec_ds(test_ds)

检查数据集不同样本的频谱图:

for example_spectrograms, example_spect_labels in train_spectrogram_ds.take(1):
  break
rows = 3
cols = 3
n = rows*cols
fig, axes = plt.subplots(rows, cols, figsize=(16, 9))

for i in range(n):
    r = i // cols
    c = i % cols
    ax = axes[r][c]
    plot_spectrogram(example_spectrograms[i].numpy(), ax)
    ax.set_title(label_names[example_spect_labels[i].numpy()])

plt.show()

构建并训练模型

添加 Dataset.cacheDataset.prefetch 运算以减少训练模型时的读取延迟:

train_spectrogram_ds = train_spectrogram_ds.cache().shuffle(10000).prefetch(tf.data.AUTOTUNE)
val_spectrogram_ds = val_spectrogram_ds.cache().prefetch(tf.data.AUTOTUNE)
test_spectrogram_ds = test_spectrogram_ds.cache().prefetch(tf.data.AUTOTUNE)

对于模型,您将使用简单的卷积神经网络 (CNN),因为您已将音频文件转换为频谱图图像。

您的 tf.keras.Sequential 模型将使用以下 Keras 预处理层:

对于 Normalization 层,首先需要在训练数据上调用其 adapt 方法,以计算聚合统计数据(即均值和标准差)。

input_shape = example_spectrograms.shape[1:]
print('Input shape:', input_shape)
num_labels = len(label_names)

# Instantiate the `tf.keras.layers.Normalization` layer.
norm_layer = layers.Normalization()
# Fit the state of the layer to the spectrograms
# with `Normalization.adapt`.
norm_layer.adapt(data=train_spectrogram_ds.map(map_func=lambda spec, label: spec))

model = models.Sequential([
    layers.Input(shape=input_shape),
    # Downsample the input.
    layers.Resizing(32, 32),
    # Normalize.
    norm_layer,
    layers.Conv2D(32, 3, activation='relu'),
    layers.Conv2D(64, 3, activation='relu'),
    layers.MaxPooling2D(),
    layers.Dropout(0.25),
    layers.Flatten(),
    layers.Dense(128, activation='relu'),
    layers.Dropout(0.5),
    layers.Dense(num_labels),
])

model.summary()

使用 Adam 优化器和交叉熵损失配置 Keras 模型:

model.compile(
    optimizer=tf.keras.optimizers.Adam(),
    loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
    metrics=['accuracy'],
)

出于演示目的,将模型训练超过 10 个周期:

EPOCHS = 10
history = model.fit(
    train_spectrogram_ds,
    validation_data=val_spectrogram_ds,
    epochs=EPOCHS,
    callbacks=tf.keras.callbacks.EarlyStopping(verbose=1, patience=2),
)

我们来绘制训练和验证损失曲线,以检查您的模型在训练期间的改进情况:

metrics = history.history
plt.figure(figsize=(16,6))
plt.subplot(1,2,1)
plt.plot(history.epoch, metrics['loss'], metrics['val_loss'])
plt.legend(['loss', 'val_loss'])
plt.ylim([0, max(plt.ylim())])
plt.xlabel('Epoch')
plt.ylabel('Loss [CrossEntropy]')

plt.subplot(1,2,2)
plt.plot(history.epoch, 100*np.array(metrics['accuracy']), 100*np.array(metrics['val_accuracy']))
plt.legend(['accuracy', 'val_accuracy'])
plt.ylim([0, 100])
plt.xlabel('Epoch')
plt.ylabel('Accuracy [%]')

评估模型性能

在测试集上运行模型并检查模型的性能:

model.evaluate(test_spectrogram_ds, return_dict=True)

显示混淆矩阵

使用混淆矩阵检查模型对测试集中每个命令的分类效果:

y_pred = model.predict(test_spectrogram_ds)
y_pred = tf.argmax(y_pred, axis=1)
y_true = tf.concat(list(test_spectrogram_ds.map(lambda s,lab: lab)), axis=0)
confusion_mtx = tf.math.confusion_matrix(y_true, y_pred)
plt.figure(figsize=(10, 8))
sns.heatmap(confusion_mtx,
            xticklabels=label_names,
            yticklabels=label_names,
            annot=True, fmt='g')
plt.xlabel('Prediction')
plt.ylabel('Label')
plt.show()

对音频文件运行推断

最后,使用某人说“no”的输入音频文件验证模型的预测输出。您的模型表现如何?

x = data_dir/'no/01bb6a2a_nohash_0.wav'
x = tf.io.read_file(str(x))
x, sample_rate = tf.audio.decode_wav(x, desired_channels=1, desired_samples=16000,)
x = tf.squeeze(x, axis=-1)
waveform = x
x = get_spectrogram(x)
x = x[tf.newaxis,...]

prediction = model(x)
x_labels = ['no', 'yes', 'down', 'go', 'left', 'up', 'right', 'stop']
plt.bar(x_labels, tf.nn.softmax(prediction[0]))
plt.title('No')
plt.show()

display.display(display.Audio(waveform, rate=16000))

如输出所示,您的模型应该已将音频命令识别为“no”。

通过预处理导出模型

如果您必须在将数据传递给模型进行推断之前应用这些预处理步骤,则该模型不是很易于使用。因此,构建一个端到端的版本:

class ExportModel(tf.Module):
  def __init__(self, model):
    self.model = model

    # Accept either a string-filename or a batch of waveforms.
    # YOu could add additional signatures for a single wave, or a ragged-batch. 
    self.__call__.get_concrete_function(
        x=tf.TensorSpec(shape=(), dtype=tf.string))
    self.__call__.get_concrete_function(
       x=tf.TensorSpec(shape=[None, 16000], dtype=tf.float32))


  @tf.function
  def __call__(self, x):
    # If they pass a string, load the file and decode it. 
    if x.dtype == tf.string:
      x = tf.io.read_file(x)
      x, _ = tf.audio.decode_wav(x, desired_channels=1, desired_samples=16000,)
      x = tf.squeeze(x, axis=-1)
      x = x[tf.newaxis, :]

    x = get_spectrogram(x)  
    result = self.model(x, training=False)

    class_ids = tf.argmax(result, axis=-1)
    class_names = tf.gather(label_names, class_ids)
    return {'predictions':result,
            'class_ids': class_ids,
            'class_names': class_names}

测试运行“导出”模型:

export = ExportModel(model)
export(tf.constant(str(data_dir/'no/01bb6a2a_nohash_0.wav')))

保存并重新加载模型,重新加载的模型给出了相同的输出:

tf.saved_model.save(export, "saved")
imported = tf.saved_model.load("saved")
imported(waveform[tf.newaxis, :])

后续步骤

本教程演示了如何使用带有 TensorFlow 和 Python 的卷积神经网络执行简单的音频分类/自动语音识别。要了解更多信息,请考虑以下资源: