В этом руководстве описывается, как использовать TFX API для создания полностью настраиваемого компонента. Полностью настраиваемые компоненты позволяют создавать компоненты, определяя классы спецификации компонента, исполнителя и интерфейса компонента. Такой подход позволяет повторно использовать и расширять стандартный компонент в соответствии с вашими потребностями.
Если вы новичок в конвейерах TFX, узнайте больше об основных концепциях конвейеров TFX .
Пользовательский исполнитель или пользовательский компонент
Если требуется только пользовательская логика обработки, а входные, выходные данные и свойства выполнения компонента такие же, как у существующего компонента, достаточно специального исполнителя. Полностью настраиваемый компонент необходим, когда какие-либо входные, выходные или свойства выполнения отличаются от любых существующих компонентов TFX.
Как создать собственный компонент?
Для разработки полностью пользовательского компонента требуется:
- Определенный набор спецификаций входных и выходных артефактов для нового компонента. В частности, типы входных артефактов должны соответствовать типам выходных артефактов компонентов, которые создают артефакты, а типы выходных артефактов должны соответствовать типам входных артефактов компонентов, которые используют артефакты, если таковые имеются.
- Параметры выполнения без артефактов, необходимые для нового компонента.
Спецификация компонента
Класс ComponentSpec
определяет контракт компонента, определяя входные и выходные артефакты компонента, а также параметры, которые используются для выполнения компонента. Он состоит из трех частей:
- ВХОДЫ : словарь типизированных параметров для входных артефактов, которые передаются исполнителю компонента. Обычно входные артефакты представляют собой выходные данные вышестоящих компонентов и, следовательно, имеют один и тот же тип.
- ВЫХОДЫ : словарь типизированных параметров для выходных артефактов, которые создает компонент.
- ПАРАМЕТРЫ : словарь дополнительных элементов ExecutionParameter , которые будут переданы исполнителю компонента. Это параметры, не являющиеся артефактами, которые мы хотим гибко определить в конвейере DSL и передать в выполнение.
Вот пример ComponentSpec:
class HelloComponentSpec(types.ComponentSpec):
"""ComponentSpec for Custom TFX Hello World Component."""
PARAMETERS = {
# These are parameters that will be passed in the call to
# create an instance of this component.
'name': ExecutionParameter(type=Text),
}
INPUTS = {
# This will be a dictionary with input artifacts, including URIs
'input_data': ChannelParameter(type=standard_artifacts.Examples),
}
OUTPUTS = {
# This will be a dictionary which this component will populate
'output_data': ChannelParameter(type=standard_artifacts.Examples),
}
Исполнитель
Затем напишите код исполнителя для нового компонента. По сути, необходимо создать новый подкласс base_executor.BaseExecutor
с переопределенной функцией Do
В функции Do
аргументы input_dict
, output_dict
и exec_properties
, которые передаются в сопоставлении с INPUTS
, OUTPUTS
и PARAMETERS
, которые определены в ComponentSpec соответственно. Для exec_properties
значение можно получить непосредственно посредством поиска в словаре. Для артефактов в input_dict
и output_dict
есть удобные функции, доступные в классе артефакта_utils , которые можно использовать для получения экземпляра артефакта или URI артефакта.
class Executor(base_executor.BaseExecutor):
"""Executor for HelloComponent."""
def Do(self, input_dict: Dict[Text, List[types.Artifact]],
output_dict: Dict[Text, List[types.Artifact]],
exec_properties: Dict[Text, Any]) -> None:
...
split_to_instance = {}
for artifact in input_dict['input_data']:
for split in json.loads(artifact.split_names):
uri = artifact_utils.get_split_uri([artifact], split)
split_to_instance[split] = uri
for split, instance in split_to_instance.items():
input_dir = instance
output_dir = artifact_utils.get_split_uri(
output_dict['output_data'], split)
for filename in tf.io.gfile.listdir(input_dir):
input_uri = os.path.join(input_dir, filename)
output_uri = os.path.join(output_dir, filename)
io_utils.copy_file(src=input_uri, dst=output_uri, overwrite=True)
Модульное тестирование пользовательского исполнителя
Модульные тесты для пользовательского исполнителя можно создавать аналогично этому .
Интерфейс компонента
Теперь, когда самая сложная часть завершена, следующим шагом будет сборка этих частей в интерфейс компонента, чтобы компонент можно было использовать в конвейере. Есть несколько шагов:
- Сделайте интерфейс компонента подклассом
base_component.BaseComponent
- Назначьте переменную класса
SPEC_CLASS
с классомComponentSpec
, который был определен ранее. - Назначьте переменную класса
EXECUTOR_SPEC
с классом Executor, который был определен ранее. - Определите функцию-конструктор
__init__()
используя аргументы функции для создания экземпляра класса ComponentSpec и вызовите суперфункцию с этим значением вместе с необязательным именем.
При создании экземпляра компонента будет вызвана логика проверки типов в классе base_component.BaseComponent
, чтобы убедиться, что переданные аргументы совместимы с информацией о типе, определенной в классе ComponentSpec
.
from tfx.types import standard_artifacts
from hello_component import executor
class HelloComponent(base_component.BaseComponent):
"""Custom TFX Hello World Component."""
SPEC_CLASS = HelloComponentSpec
EXECUTOR_SPEC = executor_spec.ExecutorClassSpec(executor.Executor)
def __init__(self,
input_data: types.Channel = None,
output_data: types.Channel = None,
name: Optional[Text] = None):
if not output_data:
examples_artifact = standard_artifacts.Examples()
examples_artifact.split_names = input_data.get()[0].split_names
output_data = channel_utils.as_channel([examples_artifact])
spec = HelloComponentSpec(input_data=input_data,
output_data=output_data, name=name)
super(HelloComponent, self).__init__(spec=spec)
Сборка в конвейер TFX
Последний шаг — подключить новый пользовательский компонент к конвейеру TFX. Помимо добавления экземпляра нового компонента, также необходимо следующее:
- Правильно подключите к нему вышестоящие и нисходящие компоненты нового компонента. Это делается путем ссылки на выходные данные вышестоящего компонента в новом компоненте и ссылки на выходные данные нового компонента в последующих компонентах.
- Добавьте новый экземпляр компонента в список компонентов при построении конвейера.
В приведенном ниже примере показаны вышеупомянутые изменения. Полный пример можно найти в репозитории TFX на GitHub .
def _create_pipeline():
...
example_gen = CsvExampleGen(input_base=examples)
hello = component.HelloComponent(
input_data=example_gen.outputs['examples'], name='HelloWorld')
statistics_gen = StatisticsGen(examples=hello.outputs['output_data'])
...
return pipeline.Pipeline(
...
components=[example_gen, hello, statistics_gen, ...],
...
)
Развертывание полностью пользовательского компонента
Помимо изменений кода, все вновь добавленные части ( ComponentSpec
, Executor
, интерфейс компонента) должны быть доступны в среде выполнения конвейера, чтобы конвейер работал правильно.