Apache Beam и TFX

Apache Beam предоставляет платформу для запуска заданий пакетной и потоковой обработки данных, которые выполняются на различных механизмах выполнения. Некоторые библиотеки TFX используют Beam для выполнения задач, что обеспечивает высокую степень масштабируемости в вычислительных кластерах. Beam включает поддержку различных механизмов выполнения или «бегунов», включая прямой исполнитель, который работает на одном вычислительном узле и очень полезен для разработки, тестирования или небольших развертываний. Beam предоставляет уровень абстракции, который позволяет TFX работать на любом поддерживаемом движке без изменений кода. TFX использует API Beam Python, поэтому он ограничен теми бегунами, которые поддерживаются API Python.

Развертывание и масштабируемость

По мере увеличения требований к рабочей нагрузке Beam может масштабироваться до очень крупных развертываний в крупных вычислительных кластерах. Это ограничено только масштабируемостью базового бегуна. Исполнители в крупных развертываниях обычно развертываются в системе оркестрации контейнеров, такой как Kubernetes или Apache Mesos, для автоматизации развертывания, масштабирования и управления приложениями.

Дополнительную информацию об Apache Beam см. в документации Apache Beam .

Для пользователей Google Cloud рекомендуется использовать Dataflow , который предоставляет бессерверную и экономичную платформу за счет автоматического масштабирования ресурсов, динамической ребалансировки работы, глубокой интеграции с другими сервисами Google Cloud, встроенной безопасности и мониторинга.

Пользовательский код Python и зависимости

Одной из заметных сложностей использования Beam в конвейере TFX является обработка пользовательского кода и/или зависимостей, необходимых от дополнительных модулей Python. Вот несколько примеров, когда это может быть проблемой:

  • preprocessing_fn должен ссылаться на собственный модуль Python пользователя.
  • специальный экстрактор для компонента Evaluator
  • пользовательские модули, которые являются подклассами компонента TFX

TFX полагается на поддержку Beam для управления зависимостями конвейера Python для обработки зависимостей Python. На данный момент есть два способа справиться с этим:

  1. Предоставление кода Python и зависимостей в качестве исходного пакета
  2. [Только для потока данных] Использование образа контейнера в качестве рабочего процесса

Они обсуждаются далее.

Предоставление кода Python и зависимостей в качестве исходного пакета

Рекомендуется для пользователей, которые:

  1. Знаете упаковку Python и
  2. Используйте только исходный код Python (т. е. никаких модулей C или общих библиотек).

Пожалуйста, следуйте одному из путей в разделе «Управление зависимостями конвейера Python» , чтобы предоставить это, используя один из следующих параметров beam_pipeline_args:

  • --setup_file
  • --extra_package
  • --requirements_file

Примечание. В любом из вышеперечисленных случаев убедитесь, что в качестве зависимости указана одна и та же версия tfx .

[Только для потока данных] Использование образа контейнера для работника

TFX 0.26.0 и более поздних версий имеет экспериментальную поддержку использования пользовательского образа контейнера для работников Dataflow.

Чтобы использовать это, вам необходимо:

  • Создайте образ Docker, в котором предварительно установлены как tfx , так и пользовательский код и зависимости.
    • Для пользователей, которые (1) используют tfx>=0.26 и (2) используют Python 3.7 для разработки своих конвейеров, самый простой способ сделать это — расширить соответствующую версию официального образа tensorflow/tfx :
# You can use a build-arg to dynamically pass in the
# version of TFX being used to your Dockerfile.

ARG TFX_VERSION
FROM tensorflow/tfx:${TFX_VERSION}
# COPY your code and dependencies in
  • Отправьте созданный образ в реестр образов контейнеров, доступный для проекта, используемого Dataflow.
    • Пользователи Google Cloud могут рассмотреть возможность использования Cloud Build , который прекрасно автоматизирует описанные выше шаги.
  • Предоставьте следующий beam_pipeline_args :
beam_pipeline_args.extend([
    '--runner=DataflowRunner',
    '--project={project-id}',
    '--worker_harness_container_image={image-ref}',
    '--experiments=use_runner_v2',
])

TODO(b/171733562): Удалите use_runner_v2, как только он будет использоваться по умолчанию для потока данных.

TODO(b/179738639): Создайте документацию по локальному тестированию пользовательского контейнера после https://issues.apache.org/jira/browse/BEAM-5440.

Аргументы лучевого конвейера

Некоторые компоненты TFX используют Beam для распределенной обработки данных. Они настраиваются с помощью beam_pipeline_args , который указывается при создании конвейера:

my_pipeline = Pipeline(
    ...,
    beam_pipeline_args=[...])

В TFX 0.30 и выше добавлен интерфейс with_beam_pipeline_args для расширения аргументов луча уровня конвейера для каждого компонента:

example_gen = CsvExampleGen(input_base=data_root).with_beam_pipeline_args([...])