Apache Beam 和 TFX

Apache Beam 提供了一个框架,用于运行在各种执行引擎上运行的数据批处理和流处理作业。一些 TFX 库使用 Beam 运行任务,实现了跨计算集群的高度可扩展性。Beam 包含对各种执行引擎或“运行程序”的支持,其中包括在单个计算节点上运行的直接运行程序,这对于开发、测试或小型部署而言非常实用。Beam 提供了一个抽象层,使 TFX 无需修改代码便可在任何支持的运行程序上运行。TFX 使用 Beam Python API,因此仅适用于 Python API 支持的运行程序。

部署和可扩展性

随着工作负载要求的增加,Beam 可以扩展到跨大型计算集群的超大型部署。它的可扩展性仅受限于底层运行程序的可扩展性。大型部署中的运行程序通常将部署到诸如 Kubernetes 或 Apache Mesos 之类的容器编排系统中,实现应用部署、扩展和管理的自动化。

有关 Apache Beam 的更多信息,请参阅 Apache Beam 文档。

对于 Google Cloud 用户,推荐使用 Dataflow,它通过自动扩缩资源,动态工作再平衡,与其他 Google Cloud 服务深度集成,内置安全性,以及监控。

自定义 Python 代码和依赖关系

在 TFX 流水线中使用 Beam 的一个明显的复杂性是处理自定义代码和/或需要从其他 Python 模块获取的依赖关系。这可能会在以下示例中成为问题:

  • preprocessing_fn 需要引用用户自己的 Python 模块
  • 用于 Evaluator 组件的自定义提取程序
  • 从 TFX 组件子类化的自定义模块

TFX 依赖于 Beam 对管理 Python 流水线依赖关系的支持来处理 Python 依赖项。目前有两种管理方式:

  1. 以源代码包的形式提供 Python 代码和依赖关系
  2. [仅限 Dataflow] 将容器映像用作工作进程

下文将讨论这些问题。

以源代码包的形式提供 Python 代码和依赖关系

建议以下用户使用此方式:

  1. 熟悉 Python 打包,并且
  2. 仅使用 Python 源代码(即,没有 C 模块或共享库)。

请按照管理 Python 流水线依赖关系的其中一个路径,使用以下 beam_pipeline_args 中的一项来提供此功能:

  • --setup_file
  • --extra_package
  • --requirements_file

注意:在上述任何情况下,请务必将相同版本的 tfx 列为依赖项。

[仅限 Dataflow] 将容器映像用作工作进程

TFX 0.26.0 及更高版本具有针对 Dataflow 工作进程使用自定义容器映像的实验性支持。

为了使用此功能,您必须:

  • 构建预装了 tfx 和用户自定义代码与依赖项的 Docker 映像。
    • 对于 (1) 使用 tfx>=0.26 和 (2) 使用 Python 3.7 开发流水线的用户,最简单的方式是扩展官方 tensorflow/tfx 映像的相应版本:
# You can use a build-arg to dynamically pass in the
# version of TFX being used to your Dockerfile.

ARG TFX_VERSION
FROM tensorflow/tfx:${TFX_VERSION}
# COPY your code and dependencies in
  • 将构建的映像推送到可以由 Dataflow 使用的项目访问的容器映像注册中心。
    • Google Cloud 用户可以考虑使用 Cloud Build,它可以很好地将上述步骤自动化。
  • 提供以下 beam_pipeline_args
beam_pipeline_args.extend([
    '--runner=DataflowRunner',
    '--project={project-id}',
    '--worker_harness_container_image={image-ref}',
    '--experiments=use_runner_v2',
])

TODO(b/171733562):当其为 Dataflow 的默认值时,移除 use_runner_v2

TODO(b/179738639):创建有关如何在 https://issues.apache.org/jira/browse/BEAM-5440 之后在本地测试自定义容器的文档。

Beam 流水线参数

一些 TFX 组件依赖 Beam 进行分布式数据处理。可以通过 beam_pipeline_args 对它们进行配置,该参数在流水线创建期间指定:

my_pipeline = Pipeline(
    ...,
    beam_pipeline_args=[...])

TFX 0.30 及更高版本添加了一个接口,with_beam_pipeline_args,用于扩展每个组件的流水线级别 Beam 参数:

example_gen = CsvExampleGen(input_base=data_root).with_beam_pipeline_args([...])