Tutorial sui componenti della funzione TFX PythonFX

Questo blocco appunti contiene esempi su come creare ed eseguire componenti di funzioni Python all'interno di TFX InteractiveContext e in una pipeline TFX orchestrata localmente.

Per ulteriori contesto e informazioni, vedere la componenti funzione personalizzata Python pagina del sito della documentazione TFX.

Impostare

Per prima cosa installeremo TFX e importeremo i moduli necessari. TFX richiede Python 3.

Controlla la versione di Python di sistema

import sys
sys.version
'3.7.5 (default, Feb 23 2021, 13:22:40) \n[GCC 8.4.0]'

Aggiorna Pip

Per evitare l'aggiornamento di Pip in un sistema durante l'esecuzione in locale, verifica che sia in esecuzione in Colab. I sistemi locali possono ovviamente essere aggiornati separatamente.

try:
  import colab
  !pip install --upgrade pip
except:
  pass

Installa TFX

pip install -U tfx

Hai riavviato il runtime?

Se stai utilizzando Google Colab, la prima volta che esegui la cella sopra, devi riavviare il runtime (Runtime > Riavvia runtime ...). Ciò è dovuto al modo in cui Colab carica i pacchetti.

Importa pacchetti

Importiamo TFX e ne controlliamo la versione.

# Check version
from tfx import v1 as tfx
tfx.__version__
'1.4.0'

Componenti della funzione Python personalizzati

In questa sezione creeremo componenti dalle funzioni Python. Non faremo alcun vero problema di ML: queste semplici funzioni vengono utilizzate solo per illustrare il processo di sviluppo dei componenti della funzione Python.

Vedi guida monocomponente a base funzione Python per più documentazione.

Crea componenti personalizzati Python

Iniziamo scrivendo una funzione che genera dei dati fittizi. Questo viene scritto nel proprio file di modulo Python.

%%writefile my_generator.py

import os
import tensorflow as tf  # Used for writing files.

from tfx import v1 as tfx

# Non-public APIs, just for showcase.
from tfx.types.experimental.simple_artifacts import Dataset

@tfx.dsl.components.component
def MyGenerator(data: tfx.dsl.components.OutputArtifact[Dataset]):
  """Create a file with dummy data in the output artifact."""
  with tf.io.gfile.GFile(os.path.join(data.uri, 'data_file.txt'), 'w') as f:
    f.write('Dummy data')

  # Set metadata and ensure that it gets passed to downstream components.
  data.set_string_custom_property('my_custom_field', 'my_custom_value')
Writing my_generator.py

Successivamente, scriviamo un secondo componente che utilizza i dati fittizi prodotti. Calcoleremo semplicemente l'hash dei dati e lo restituiremo.

%%writefile my_consumer.py

import hashlib
import os
import tensorflow as tf

from tfx import v1 as tfx

# Non-public APIs, just for showcase.
from tfx.types.experimental.simple_artifacts import Dataset
from tfx.types.standard_artifacts import String

@tfx.dsl.components.component
def MyConsumer(data: tfx.dsl.components.InputArtifact[Dataset],
               hash: tfx.dsl.components.OutputArtifact[String],
               algorithm: tfx.dsl.components.Parameter[str] = 'sha256'):
  """Reads the contents of data and calculate."""
  with tf.io.gfile.GFile(
      os.path.join(data.uri, 'data_file.txt'), 'r') as f:
    contents = f.read()
  h = hashlib.new(algorithm)
  h.update(tf.compat.as_bytes(contents))
  hash.value = h.hexdigest()

  # Read a custom property from the input artifact and set to the output.
  custom_value = data.get_string_custom_property('my_custom_field')
  hash.set_string_custom_property('input_custom_field', custom_value)
Writing my_consumer.py

Esegui nel notebook con InteractiveContext

Ora dimostreremo l'utilizzo dei nostri nuovi componenti in TFX InteractiveContext.

Per maggiori informazioni su cosa si può fare con il notebook TFX InteractiveContext, vedere l'in-notebook TFX Keras Component Tutorial .

from my_generator import MyGenerator
from my_consumer import MyConsumer

Costruisci l'InteractiveContext

# Here, we create an InteractiveContext using default parameters. This will
# use a temporary directory with an ephemeral ML Metadata database instance.
# To use your own pipeline root or database, the optional properties
# `pipeline_root` and `metadata_connection_config` may be passed to
# InteractiveContext. Calls to InteractiveContext are no-ops outside of the
# notebook.
from tfx.orchestration.experimental.interactive.interactive_context import InteractiveContext
context = InteractiveContext()
WARNING:absl:InteractiveContext pipeline_root argument not provided: using temporary directory /tmp/tfx-interactive-2021-12-05T10_37_04.715534-3q0k1y0m as root for pipeline outputs.
WARNING:absl:InteractiveContext metadata_connection_config not provided: using SQLite ML Metadata database at /tmp/tfx-interactive-2021-12-05T10_37_04.715534-3q0k1y0m/metadata.sqlite.

Eseguire il componente in modo interattivo con context.run()

Successivamente, abbiamo eseguito i nostri componenti in modo interattivo all'interno del notebook con context.run() . Il nostro componente consumer utilizza le uscite del componente generatore.

generator = MyGenerator()
context.run(generator)
WARNING: Logging before InitGoogleLogging() is written to STDERR
I1205 10:37:04.765872 28682 rdbms_metadata_access_object.cc:686] No property is defined for the Type
consumer = MyConsumer(
    data=generator.outputs['data'],
    algorithm='md5')
context.run(consumer)
I1205 10:37:04.808555 28682 rdbms_metadata_access_object.cc:686] No property is defined for the Type

Dopo l'esecuzione, possiamo ispezionare il contenuto dell'artefatto di output "hash" del componente consumer su disco.

tail -v {consumer.outputs['hash'].get()[0].uri}
==> /tmp/tfx-interactive-2021-12-05T10_37_04.715534-3q0k1y0m/MyConsumer/hash/2/value <==
0015fe7975d1a2794b59aa12635703f1

Questo è tutto e ora hai scritto ed eseguito i tuoi componenti personalizzati!

Scrivi una definizione di pipeline

Successivamente, creeremo una pipeline utilizzando questi stessi componenti. Durante l'utilizzo del InteractiveContext all'interno di un notebook funziona bene per la sperimentazione, la definizione di una pipeline consente di distribuire la pipeline su guide locali o remoti per l'utilizzo di produzione.

Qui dimostreremo l'utilizzo di LocalDagRunner in esecuzione localmente sul tuo computer. Per l'esecuzione della produzione, le guide Airflow o Kubeflow potrebbero essere più adatte.

Costruisci una conduttura

import os
import tempfile
from tfx import v1 as tfx

# Select a persistent TFX root directory to store your output artifacts.
# For demonstration purposes only, we use a temporary directory.
PIPELINE_ROOT = tempfile.mkdtemp()
# Select a pipeline name so that multiple runs of the same logical pipeline
# can be grouped.
PIPELINE_NAME = "function-based-pipeline"
# We use a ML Metadata configuration that uses a local SQLite database in
# the pipeline root directory. Other backends for ML Metadata are available
# for production usage.
METADATA_CONNECTION_CONFIG = tfx.orchestration.metadata.sqlite_metadata_connection_config(
    os.path.join(PIPELINE_ROOT, 'metadata.sqlite'))

def function_based_pipeline():
  # Here, we construct our generator and consumer components in the same way.
  generator = MyGenerator()
  consumer = MyConsumer(
      data=generator.outputs['data'],
      algorithm='md5')

  return tfx.dsl.Pipeline(
      pipeline_name=PIPELINE_NAME,
      pipeline_root=PIPELINE_ROOT,
      components=[generator, consumer],
      metadata_connection_config=METADATA_CONNECTION_CONFIG)

my_pipeline = function_based_pipeline()

Eseguire il gasdotto con l' LocalDagRunner

tfx.orchestration.LocalDagRunner().run(my_pipeline)
I1205 10:37:04.983860 28682 rdbms_metadata_access_object.cc:686] No property is defined for the Type
I1205 10:37:04.990442 28682 rdbms_metadata_access_object.cc:686] No property is defined for the Type
I1205 10:37:04.996665 28682 rdbms_metadata_access_object.cc:686] No property is defined for the Type
I1205 10:37:05.003470 28682 rdbms_metadata_access_object.cc:686] No property is defined for the Type
I1205 10:37:05.013659 28682 rdbms_metadata_access_object.cc:686] No property is defined for the Type
I1205 10:37:05.031374 28682 rdbms_metadata_access_object.cc:686] No property is defined for the Type
I1205 10:37:05.048280 28682 rdbms_metadata_access_object.cc:686] No property is defined for the Type
I1205 10:37:05.067972 28682 rdbms_metadata_access_object.cc:686] No property is defined for the Type

Possiamo ispezionare gli artefatti di output generati da questa esecuzione della pipeline.

find {PIPELINE_ROOT}
/tmp/tmpydmun02b
/tmp/tmpydmun02b/metadata.sqlite
/tmp/tmpydmun02b/MyConsumer
/tmp/tmpydmun02b/MyConsumer/.system
/tmp/tmpydmun02b/MyConsumer/.system/executor_execution
/tmp/tmpydmun02b/MyConsumer/.system/executor_execution/2
/tmp/tmpydmun02b/MyConsumer/hash
/tmp/tmpydmun02b/MyConsumer/hash/2
/tmp/tmpydmun02b/MyConsumer/hash/2/value
/tmp/tmpydmun02b/MyGenerator
/tmp/tmpydmun02b/MyGenerator/data
/tmp/tmpydmun02b/MyGenerator/data/1
/tmp/tmpydmun02b/MyGenerator/data/1/data_file.txt
/tmp/tmpydmun02b/MyGenerator/.system
/tmp/tmpydmun02b/MyGenerator/.system/executor_execution
/tmp/tmpydmun02b/MyGenerator/.system/executor_execution/1

Ora hai scritto i tuoi componenti personalizzati e orchestrato la loro esecuzione su LocalDagRunner! Per passi successivi, controlla tutorial e guide aggiuntive sul sito TFX .