Questa guida descrive come utilizzare l'API TFX per creare un componente completamente personalizzato. I componenti completamente personalizzati consentono di creare componenti definendo la specifica del componente, l'esecutore e le classi dell'interfaccia del componente. Questo approccio consente di riutilizzare ed estendere un componente standard per adattarlo alle proprie esigenze.
Se non conosci le pipeline TFX, scopri di più sui concetti fondamentali delle pipeline TFX .
Esecutore personalizzato o componente personalizzato
Se è necessaria solo la logica di elaborazione personalizzata mentre gli input, gli output e le proprietà di esecuzione del componente sono gli stessi di un componente esistente, è sufficiente un esecutore personalizzato. È necessario un componente completamente personalizzato quando uno qualsiasi degli input, output o proprietà di esecuzione è diverso da qualsiasi componente TFX esistente.
Come creare un componente personalizzato?
Lo sviluppo di un componente completamente personalizzato richiede:
- Un insieme definito di specifiche degli artefatti di input e output per il nuovo componente. In particolare, i tipi per gli artefatti di input dovrebbero essere coerenti con i tipi di artefatti di output dei componenti che producono gli artefatti e i tipi per gli artefatti di output dovrebbero essere coerenti con i tipi di artefatti di input dei componenti che consumano gli artefatti, se presenti.
- I parametri di esecuzione non artefatto necessari per il nuovo componente.
Specifica componente
La classe ComponentSpec
definisce il contratto del componente definendo gli artefatti di input e output per un componente nonché i parametri utilizzati per l'esecuzione del componente. Ha tre parti:
- INPUT : un dizionario di parametri tipizzati per gli artefatti di input che vengono passati all'esecutore del componente. Normalmente gli artefatti di input sono gli output dei componenti a monte e quindi condividono lo stesso tipo.
- OUTPUTS : un dizionario di parametri tipizzati per gli artefatti di output prodotti dal componente.
- PARAMETRI : un dizionario di elementi ExecutionParameter aggiuntivi che verranno passati all'esecutore del componente. Si tratta di parametri non artefatti che vogliamo definire in modo flessibile nella pipeline DSL e passare in esecuzione.
Ecco un esempio di ComponentSpec:
class HelloComponentSpec(types.ComponentSpec):
"""ComponentSpec for Custom TFX Hello World Component."""
PARAMETERS = {
# These are parameters that will be passed in the call to
# create an instance of this component.
'name': ExecutionParameter(type=Text),
}
INPUTS = {
# This will be a dictionary with input artifacts, including URIs
'input_data': ChannelParameter(type=standard_artifacts.Examples),
}
OUTPUTS = {
# This will be a dictionary which this component will populate
'output_data': ChannelParameter(type=standard_artifacts.Examples),
}
Esecutore
Successivamente, scrivi il codice dell'esecutore per il nuovo componente. Fondamentalmente, è necessario creare una nuova sottoclasse di base_executor.BaseExecutor
con la sua funzione Do
sovrascritta. Nella funzione Do
, gli argomenti input_dict
, output_dict
e exec_properties
passati nella mappano rispettivamente a INPUTS
, OUTPUTS
e PARAMETERS
definiti in ComponentSpec. Per exec_properties
, il valore può essere recuperato direttamente tramite una ricerca nel dizionario. Per gli artefatti in input_dict
e output_dict
, sono disponibili funzioni utili nella classe artifact_utils che possono essere utilizzate per recuperare l'istanza dell'artefatto o l'URI dell'artefatto.
class Executor(base_executor.BaseExecutor):
"""Executor for HelloComponent."""
def Do(self, input_dict: Dict[Text, List[types.Artifact]],
output_dict: Dict[Text, List[types.Artifact]],
exec_properties: Dict[Text, Any]) -> None:
...
split_to_instance = {}
for artifact in input_dict['input_data']:
for split in json.loads(artifact.split_names):
uri = artifact_utils.get_split_uri([artifact], split)
split_to_instance[split] = uri
for split, instance in split_to_instance.items():
input_dir = instance
output_dir = artifact_utils.get_split_uri(
output_dict['output_data'], split)
for filename in tf.io.gfile.listdir(input_dir):
input_uri = os.path.join(input_dir, filename)
output_uri = os.path.join(output_dir, filename)
io_utils.copy_file(src=input_uri, dst=output_uri, overwrite=True)
Test unitario di un esecutore personalizzato
È possibile creare unit test per l'esecutore personalizzato in modo simile a questo .
Interfaccia dei componenti
Ora che la parte più complessa è stata completata, il passaggio successivo consiste nell'assemblare questi pezzi in un'interfaccia componente, per consentire l'utilizzo del componente in una pipeline. Ci sono diversi passaggi:
- Rendere l'interfaccia del componente una sottoclasse di
base_component.BaseComponent
- Assegna una variabile di classe
SPEC_CLASS
con la classeComponentSpec
definita in precedenza - Assegna una variabile di classe
EXECUTOR_SPEC
alla classe Executor definita in precedenza - Definire la funzione di costruzione
__init__()
utilizzando gli argomenti della funzione per costruire un'istanza della classe ComponentSpec e invocare la super funzione con quel valore, insieme a un nome facoltativo
Quando viene creata un'istanza del componente, verrà richiamata la logica di controllo del tipo nella classe base_component.BaseComponent
per garantire che gli argomenti passati siano compatibili con le informazioni sul tipo definite nella classe ComponentSpec
.
from tfx.types import standard_artifacts
from hello_component import executor
class HelloComponent(base_component.BaseComponent):
"""Custom TFX Hello World Component."""
SPEC_CLASS = HelloComponentSpec
EXECUTOR_SPEC = executor_spec.ExecutorClassSpec(executor.Executor)
def __init__(self,
input_data: types.Channel = None,
output_data: types.Channel = None,
name: Optional[Text] = None):
if not output_data:
examples_artifact = standard_artifacts.Examples()
examples_artifact.split_names = input_data.get()[0].split_names
output_data = channel_utils.as_channel([examples_artifact])
spec = HelloComponentSpec(input_data=input_data,
output_data=output_data, name=name)
super(HelloComponent, self).__init__(spec=spec)
Assemblare in una pipeline TFX
L'ultimo passaggio consiste nel collegare il nuovo componente personalizzato a una pipeline TFX. Oltre ad aggiungere un'istanza del nuovo componente, sono necessari anche quanto segue:
- Cablare correttamente i componenti a monte e a valle del nuovo componente. Ciò viene fatto facendo riferimento agli output del componente a monte nel nuovo componente e facendo riferimento agli output del nuovo componente nei componenti a valle
- Aggiungi la nuova istanza del componente all'elenco dei componenti durante la costruzione della pipeline.
L'esempio seguente evidenzia le modifiche sopra menzionate. L'esempio completo è disponibile nel repository TFX GitHub .
def _create_pipeline():
...
example_gen = CsvExampleGen(input_base=examples)
hello = component.HelloComponent(
input_data=example_gen.outputs['examples'], name='HelloWorld')
statistics_gen = StatisticsGen(examples=hello.outputs['output_data'])
...
return pipeline.Pipeline(
...
components=[example_gen, hello, statistics_gen, ...],
...
)
Distribuisci un componente completamente personalizzato
Oltre alle modifiche al codice, tutte le parti appena aggiunte ( ComponentSpec
, Executor
, interfaccia del componente) devono essere accessibili nell'ambiente di esecuzione della pipeline per poter eseguire correttamente la pipeline.