Creazione di componenti completamente personalizzati

Questa guida descrive come utilizzare l'API TFX per creare un componente completamente personalizzato. I componenti completamente personalizzati consentono di creare componenti definendo la specifica del componente, l'esecutore e le classi dell'interfaccia del componente. Questo approccio consente di riutilizzare ed estendere un componente standard per adattarlo alle proprie esigenze.

Se non conosci le pipeline TFX, scopri di più sui concetti fondamentali delle pipeline TFX .

Esecutore personalizzato o componente personalizzato

Se è necessaria solo la logica di elaborazione personalizzata mentre gli input, gli output e le proprietà di esecuzione del componente sono gli stessi di un componente esistente, è sufficiente un esecutore personalizzato. È necessario un componente completamente personalizzato quando uno qualsiasi degli input, output o proprietà di esecuzione è diverso da qualsiasi componente TFX esistente.

Come creare un componente personalizzato?

Lo sviluppo di un componente completamente personalizzato richiede:

  • Un insieme definito di specifiche degli artefatti di input e output per il nuovo componente. In particolare, i tipi per gli artefatti di input dovrebbero essere coerenti con i tipi di artefatti di output dei componenti che producono gli artefatti e i tipi per gli artefatti di output dovrebbero essere coerenti con i tipi di artefatti di input dei componenti che consumano gli artefatti, se presenti.
  • I parametri di esecuzione non artefatto necessari per il nuovo componente.

Specifica componente

La classe ComponentSpec definisce il contratto del componente definendo gli artefatti di input e output per un componente nonché i parametri utilizzati per l'esecuzione del componente. Ha tre parti:

  • INPUT : un dizionario di parametri tipizzati per gli artefatti di input che vengono passati all'esecutore del componente. Normalmente gli artefatti di input sono gli output dei componenti a monte e quindi condividono lo stesso tipo.
  • OUTPUTS : un dizionario di parametri tipizzati per gli artefatti di output prodotti dal componente.
  • PARAMETRI : un dizionario di elementi ExecutionParameter aggiuntivi che verranno passati all'esecutore del componente. Si tratta di parametri non artefatti che vogliamo definire in modo flessibile nella pipeline DSL e passare in esecuzione.

Ecco un esempio di ComponentSpec:

class HelloComponentSpec(types.ComponentSpec):
  """ComponentSpec for Custom TFX Hello World Component."""

  PARAMETERS = {
      # These are parameters that will be passed in the call to
      # create an instance of this component.
      'name': ExecutionParameter(type=Text),
  }
  INPUTS = {
      # This will be a dictionary with input artifacts, including URIs
      'input_data': ChannelParameter(type=standard_artifacts.Examples),
  }
  OUTPUTS = {
      # This will be a dictionary which this component will populate
      'output_data': ChannelParameter(type=standard_artifacts.Examples),
  }

Esecutore

Successivamente, scrivi il codice dell'esecutore per il nuovo componente. Fondamentalmente, è necessario creare una nuova sottoclasse di base_executor.BaseExecutor con la sua funzione Do sovrascritta. Nella funzione Do , gli argomenti input_dict , output_dict e exec_properties passati nella mappano rispettivamente a INPUTS , OUTPUTS e PARAMETERS definiti in ComponentSpec. Per exec_properties , il valore può essere recuperato direttamente tramite una ricerca nel dizionario. Per gli artefatti in input_dict e output_dict , sono disponibili funzioni utili nella classe artifact_utils che possono essere utilizzate per recuperare l'istanza dell'artefatto o l'URI dell'artefatto.

class Executor(base_executor.BaseExecutor):
  """Executor for HelloComponent."""

  def Do(self, input_dict: Dict[Text, List[types.Artifact]],
         output_dict: Dict[Text, List[types.Artifact]],
         exec_properties: Dict[Text, Any]) -> None:
    ...

    split_to_instance = {}
    for artifact in input_dict['input_data']:
      for split in json.loads(artifact.split_names):
        uri = artifact_utils.get_split_uri([artifact], split)
        split_to_instance[split] = uri

    for split, instance in split_to_instance.items():
      input_dir = instance
      output_dir = artifact_utils.get_split_uri(
          output_dict['output_data'], split)
      for filename in tf.io.gfile.listdir(input_dir):
        input_uri = os.path.join(input_dir, filename)
        output_uri = os.path.join(output_dir, filename)
        io_utils.copy_file(src=input_uri, dst=output_uri, overwrite=True)

Test unitario di un esecutore personalizzato

È possibile creare unit test per l'esecutore personalizzato in modo simile a questo .

Interfaccia dei componenti

Ora che la parte più complessa è stata completata, il passaggio successivo consiste nell'assemblare questi pezzi in un'interfaccia componente, per consentire l'utilizzo del componente in una pipeline. Ci sono diversi passaggi:

  • Rendere l'interfaccia del componente una sottoclasse di base_component.BaseComponent
  • Assegna una variabile di classe SPEC_CLASS con la classe ComponentSpec definita in precedenza
  • Assegna una variabile di classe EXECUTOR_SPEC alla classe Executor definita in precedenza
  • Definire la funzione di costruzione __init__() utilizzando gli argomenti della funzione per costruire un'istanza della classe ComponentSpec e invocare la super funzione con quel valore, insieme a un nome facoltativo

Quando viene creata un'istanza del componente, verrà richiamata la logica di controllo del tipo nella classe base_component.BaseComponent per garantire che gli argomenti passati siano compatibili con le informazioni sul tipo definite nella classe ComponentSpec .

from tfx.types import standard_artifacts
from hello_component import executor

class HelloComponent(base_component.BaseComponent):
  """Custom TFX Hello World Component."""

  SPEC_CLASS = HelloComponentSpec
  EXECUTOR_SPEC = executor_spec.ExecutorClassSpec(executor.Executor)

  def __init__(self,
               input_data: types.Channel = None,
               output_data: types.Channel = None,
               name: Optional[Text] = None):
    if not output_data:
      examples_artifact = standard_artifacts.Examples()
      examples_artifact.split_names = input_data.get()[0].split_names
      output_data = channel_utils.as_channel([examples_artifact])

    spec = HelloComponentSpec(input_data=input_data,
                              output_data=output_data, name=name)
    super(HelloComponent, self).__init__(spec=spec)

Assemblare in una pipeline TFX

L'ultimo passaggio consiste nel collegare il nuovo componente personalizzato a una pipeline TFX. Oltre ad aggiungere un'istanza del nuovo componente, sono necessari anche quanto segue:

  • Cablare correttamente i componenti a monte e a valle del nuovo componente. Ciò viene fatto facendo riferimento agli output del componente a monte nel nuovo componente e facendo riferimento agli output del nuovo componente nei componenti a valle
  • Aggiungi la nuova istanza del componente all'elenco dei componenti durante la costruzione della pipeline.

L'esempio seguente evidenzia le modifiche sopra menzionate. L'esempio completo è disponibile nel repository TFX GitHub .

def _create_pipeline():
  ...
  example_gen = CsvExampleGen(input_base=examples)
  hello = component.HelloComponent(
      input_data=example_gen.outputs['examples'], name='HelloWorld')
  statistics_gen = StatisticsGen(examples=hello.outputs['output_data'])
  ...
  return pipeline.Pipeline(
      ...
      components=[example_gen, hello, statistics_gen, ...],
      ...
  )

Distribuisci un componente completamente personalizzato

Oltre alle modifiche al codice, tutte le parti appena aggiunte ( ComponentSpec , Executor , interfaccia del componente) devono essere accessibili nell'ambiente di esecuzione della pipeline per poter eseguire correttamente la pipeline.