Lokalne budowanie rurociągu TFX

TFX ułatwia orkiestrację przepływu pracy w uczeniu maszynowym (ML) jako potoku, aby:

  • Zautomatyzuj proces uczenia maszynowego, który umożliwia regularne ponowne szkolenie, ocenę i wdrażanie modelu.
  • Twórz potoki uczenia maszynowego, które obejmują głęboką analizę wydajności modelu i weryfikację nowo wyszkolonych modeli, aby zapewnić wydajność i niezawodność.
  • Monitoruj dane szkoleniowe pod kątem anomalii i eliminuj odchylenia w obsłudze szkoleń
  • Zwiększ prędkość eksperymentowania, uruchamiając potok z różnymi zestawami hiperparametrów.

Typowy proces opracowywania potoku rozpoczyna się na komputerze lokalnym, od analizy danych i konfiguracji komponentów, przed wdrożeniem do produkcji. W tym przewodniku opisano dwa sposoby lokalnego budowania potoku.

  • Dostosuj szablon potoku TFX, aby dopasować go do potrzeb przepływu pracy ML. Szablony potoków TFX to wstępnie zbudowane przepływy pracy, które demonstrują najlepsze praktyki przy użyciu standardowych komponentów TFX.
  • Zbuduj potok za pomocą TFX. W tym przypadku definiujesz potok bez rozpoczynania od szablonu.

Podczas opracowywania potoku możesz go uruchomić za pomocą LocalDagRunner . Następnie, gdy komponenty potoku zostaną dobrze zdefiniowane i przetestowane, można użyć koordynatora klasy produkcyjnej, takiego jak Kubeflow lub Airflow.

Zanim zaczniesz

TFX to pakiet Pythona, więc będziesz musiał skonfigurować środowisko programistyczne Pythona, takie jak środowisko wirtualne lub kontener Docker. Następnie:

pip install tfx

Jeśli nie masz doświadczenia z potokami TFX, zanim przejdziesz dalej , dowiedz się więcej o podstawowych koncepcjach potoków TFX .

Utwórz potok przy użyciu szablonu

Szablony potoków TFX ułatwiają rozpoczęcie opracowywania potoków, udostępniając wstępnie utworzony zestaw definicji potoków, które można dostosować do swojego przypadku użycia.

W poniższych sekcjach opisano, jak utworzyć kopię szablonu i dostosować ją do własnych potrzeb.

Utwórz kopię szablonu potoku

  1. Zobacz listę dostępnych szablonów potoków TFX:

    tfx template list
    
  2. Wybierz szablon z listy

    tfx template copy --model=template --pipeline_name=pipeline-name \
    --destination_path=destination-path
    

    Zastąp następujące elementy:

    • template : nazwa szablonu, który chcesz skopiować.
    • pipeline-name : Nazwa potoku, który ma zostać utworzony.
    • destination-path : Ścieżka, do której ma zostać skopiowany szablon.

    Dowiedz się więcej o poleceniu tfx template copy .

  3. Kopia szablonu potoku została utworzona w określonej ścieżce.

Zapoznaj się z szablonem potoku

Ta sekcja zawiera przegląd rusztowania utworzonego za pomocą szablonu.

  1. Przeglądaj katalogi i pliki, które zostały skopiowane do katalogu głównego potoku

    • Katalog potoków z
      • pipeline.py - definiuje potok i wyświetla listę używanych komponentów
      • configs.py — przechowuj szczegóły konfiguracji, takie jak miejsce, z którego pochodzą dane lub używany program Orchestrator
    • Katalog danych
      • Zwykle zawiera plik data.csv , który jest domyślnym źródłem dla ExampleGen . Możesz zmienić źródło danych w configs.py .
    • Katalog modeli zawierający kod przetwarzania wstępnego i implementacje modeli

    • Szablon kopiuje moduły DAG dla środowiska lokalnego i Kubeflow.

    • Niektóre szablony zawierają także notesy języka Python, dzięki którym można eksplorować dane i artefakty za pomocą metadanych uczenia maszynowego.

  2. Uruchom następujące polecenia w katalogu potoku:

    tfx pipeline create --pipeline_path local_runner.py
    
    tfx run create --pipeline_name pipeline_name
    

    Polecenie tworzy uruchomienie potoku przy użyciu LocalDagRunner , który dodaje do potoku następujące katalogi:

    • Katalog tfx_metadata zawierający magazyn metadanych ML używany lokalnie.
    • Katalog tfx_pipeline_output , który zawiera pliki wyjściowe potoku.
  3. Otwórz plik potoku pipeline/configs.py i przejrzyj jego zawartość. Ten skrypt definiuje opcje konfiguracyjne używane przez potok i funkcje komponentu. W tym miejscu można określić takie elementy, jak lokalizacja źródła danych lub liczba kroków szkoleniowych w przebiegu.

  4. Otwórz plik potoku pipeline/pipeline.py i przejrzyj jego zawartość. Ten skrypt tworzy potok TFX. Początkowo potok zawiera tylko komponent ExampleGen .

    • Postępuj zgodnie z instrukcjami zawartymi w komentarzach TODO w pipeline.py , aby dodać więcej kroków do potoku.
  5. Otwórz plik local_runner.py i przejrzyj zawartość. Ten skrypt tworzy uruchomienie potoku i określa parametry przebiegu, takie jak data_path i preprocessing_fn .

  6. Przejrzałeś rusztowanie utworzone przez szablon i utworzyłeś uruchomienie potoku przy użyciu LocalDagRunner . Następnie dostosuj szablon do swoich wymagań.

Dostosuj swój rurociąg

W tej sekcji omówiono, jak rozpocząć dostosowywanie szablonu.

  1. Zaprojektuj swój rurociąg. Rusztowanie udostępniane przez szablon pomaga zaimplementować potok dla danych tabelarycznych przy użyciu standardowych komponentów TFX. Jeśli przenosisz istniejący przepływ pracy ML do potoku, może być konieczna zmiana kodu, aby w pełni wykorzystać standardowe komponenty TFX . Może być również konieczne utworzenie niestandardowych komponentów , które implementują funkcje, które są unikalne dla Twojego przepływu pracy lub które nie są jeszcze obsługiwane przez standardowe komponenty TFX.

  2. Po zaprojektowaniu potoku iteracyjnie dostosuj potok, korzystając z poniższego procesu. Zacznij od komponentu, który pozyskuje dane do potoku, którym jest zazwyczaj komponent ExampleGen .

    1. Dostosuj potok lub komponent do swojego przypadku użycia. Te dostosowania mogą obejmować zmiany takie jak:

      • Zmiana parametrów rurociągu.
      • Dodawanie komponentów do rurociągu lub ich usuwanie.
      • Wymiana źródła wprowadzania danych. Tym źródłem danych może być plik lub zapytania do usług takich jak BigQuery.
      • Zmiana konfiguracji komponentu w potoku.
      • Zmiana funkcji dostosowywania komponentu.
    2. Uruchom komponent lokalnie, używając skryptu local_runner.py lub innego odpowiedniego modułu uruchamiającego DAG, jeśli używasz innego koordynatora. Jeśli skrypt nie powiedzie się, usuń błąd i spróbuj ponownie uruchomić skrypt.

    3. Gdy to dostosowanie zacznie działać, przejdź do następnego dostosowania.

  3. Pracując iteracyjnie, możesz dostosować każdy krok przepływu pracy szablonu do swoich potrzeb.

Utwórz niestandardowy potok

Skorzystaj z poniższych instrukcji, aby dowiedzieć się więcej na temat tworzenia niestandardowego potoku bez użycia szablonu.

  1. Zaprojektuj swój rurociąg. Standardowe komponenty TFX zapewniają sprawdzoną funkcjonalność, która pomaga wdrożyć kompletny przepływ pracy ML. Jeśli przenosisz istniejący przepływ pracy ML do potoku, może być konieczna zmiana kodu, aby w pełni wykorzystać standardowe komponenty TFX. Może być również konieczne utworzenie niestandardowych komponentów , które implementują takie funkcje, jak powiększanie danych.

  2. Utwórz plik skryptu, aby zdefiniować potok, korzystając z poniższego przykładu. W tym przewodniku ten plik jest określany jako my_pipeline.py .

    import os
    from typing import Optional, Text, List
    from absl import logging
    from ml_metadata.proto import metadata_store_pb2
    import tfx.v1 as tfx
    
    PIPELINE_NAME = 'my_pipeline'
    PIPELINE_ROOT = os.path.join('.', 'my_pipeline_output')
    METADATA_PATH = os.path.join('.', 'tfx_metadata', PIPELINE_NAME, 'metadata.db')
    ENABLE_CACHE = True
    
    def create_pipeline(
      pipeline_name: Text,
      pipeline_root:Text,
      enable_cache: bool,
      metadata_connection_config: Optional[
        metadata_store_pb2.ConnectionConfig] = None,
      beam_pipeline_args: Optional[List[Text]] = None
    ):
      components = []
    
      return tfx.dsl.Pipeline(
            pipeline_name=pipeline_name,
            pipeline_root=pipeline_root,
            components=components,
            enable_cache=enable_cache,
            metadata_connection_config=metadata_connection_config,
            beam_pipeline_args=beam_pipeline_args, 
        )
    
    def run_pipeline():
      my_pipeline = create_pipeline(
          pipeline_name=PIPELINE_NAME,
          pipeline_root=PIPELINE_ROOT,
          enable_cache=ENABLE_CACHE,
          metadata_connection_config=tfx.orchestration.metadata.sqlite_metadata_connection_config(METADATA_PATH)
          )
    
      tfx.orchestration.LocalDagRunner().run(my_pipeline)
    
    if __name__ == '__main__':
      logging.set_verbosity(logging.INFO)
      run_pipeline()
    

    W kolejnych krokach zdefiniujesz potok w create_pipeline i uruchomisz go lokalnie za pomocą lokalnego modułu uruchamiającego.

    Iteracyjnie utwórz potok, korzystając z następującego procesu.

    1. Dostosuj potok lub komponent do swojego przypadku użycia. Te dostosowania mogą obejmować zmiany takie jak:

      • Zmiana parametrów rurociągu.
      • Dodawanie komponentów do rurociągu lub ich usuwanie.
      • Zastępowanie pliku wejściowego danych.
      • Zmiana konfiguracji komponentu w potoku.
      • Zmiana funkcji dostosowywania komponentu.
    2. Uruchom komponent lokalnie, korzystając z lokalnego modułu uruchamiającego lub bezpośrednio uruchamiając skrypt. Jeśli skrypt nie powiedzie się, usuń błąd i spróbuj ponownie uruchomić skrypt.

    3. Gdy to dostosowanie zacznie działać, przejdź do następnego dostosowania.

    Zacznij od pierwszego węzła w przepływie pracy potoku. Zazwyczaj pierwszy węzeł pozyskuje dane do potoku.

  3. Dodaj pierwszy węzeł przepływu pracy do potoku. W tym przykładzie potok używa standardowego komponentu ExampleGen do ładowania pliku CSV z katalogu pod ./data .

    from tfx.components import CsvExampleGen
    
    DATA_PATH = os.path.join('.', 'data')
    
    def create_pipeline(
      pipeline_name: Text,
      pipeline_root:Text,
      data_path: Text,
      enable_cache: bool,
      metadata_connection_config: Optional[
        metadata_store_pb2.ConnectionConfig] = None,
      beam_pipeline_args: Optional[List[Text]] = None
    ):
      components = []
    
      example_gen = tfx.components.CsvExampleGen(input_base=data_path)
      components.append(example_gen)
    
      return tfx.dsl.Pipeline(
            pipeline_name=pipeline_name,
            pipeline_root=pipeline_root,
            components=components,
            enable_cache=enable_cache,
            metadata_connection_config=metadata_connection_config,
            beam_pipeline_args=beam_pipeline_args, 
        )
    
    def run_pipeline():
      my_pipeline = create_pipeline(
        pipeline_name=PIPELINE_NAME,
        pipeline_root=PIPELINE_ROOT,
        data_path=DATA_PATH,
        enable_cache=ENABLE_CACHE,
        metadata_connection_config=tfx.orchestration.metadata.sqlite_metadata_connection_config(METADATA_PATH)
        )
    
      tfx.orchestration.LocalDagRunner().run(my_pipeline)
    

    CsvExampleGen tworzy serializowane rekordy przykładowe przy użyciu danych w pliku CSV w określonej ścieżce danych. Ustawiając parametr input_base komponentu CsvExampleGen w katalogu głównym danych.

  4. Utwórz katalog data w tym samym katalogu co my_pipeline.py . Dodaj mały plik CSV do katalogu data .

  5. Użyj następującego polecenia, aby uruchomić skrypt my_pipeline.py .

    python my_pipeline.py
    

    Wynik powinien wyglądać mniej więcej tak:

    INFO:absl:Component CsvExampleGen depends on [].
    INFO:absl:Component CsvExampleGen is scheduled.
    INFO:absl:Component CsvExampleGen is running.
    INFO:absl:Running driver for CsvExampleGen
    INFO:absl:MetadataStore with DB connection initialized
    INFO:absl:Running executor for CsvExampleGen
    INFO:absl:Generating examples.
    INFO:absl:Using 1 process(es) for Local pipeline execution.
    INFO:absl:Processing input csv data ./data/* to TFExample.
    WARNING:root:Couldn't find python-snappy so the implementation of _TFRecordUtil._masked_crc32c is not as fast as it could be.
    INFO:absl:Examples generated.
    INFO:absl:Running publisher for CsvExampleGen
    INFO:absl:MetadataStore with DB connection initialized
    INFO:absl:Component CsvExampleGen is finished.
    
  6. Kontynuuj iteracyjne dodawanie komponentów do potoku.