Wstęp
Ten dokument zawiera instrukcje tworzenia potoku TensorFlow Extended (TFX) dla własnego zestawu danych przy użyciu szablonu pingwina, który jest dostarczany z pakietem TFX Python. Utworzony potok będzie początkowo korzystał z zestawu danych Palmer Penguins , ale przekształcimy potok dla Twojego zestawu danych.
Warunki wstępne
- Linux / MacOS
- Python 3,6-3,8
- Notatnik Jupytera
Krok 1. Skopiuj predefiniowany szablon do katalogu projektu.
W tym kroku stworzymy działający katalog i pliki projektu potoku, kopiując pliki z szablonu pingwina w TFX. Możesz myśleć o tym jako o rusztowaniu dla twojego projektu potoku TFX.
Zaktualizuj pip
Jeśli pracujemy w Colab, powinniśmy upewnić się, że mamy najnowszą wersję Pip. Systemy lokalne można oczywiście aktualizować osobno.
import sys
if 'google.colab' in sys.modules:
!pip install --upgrade pip
Zainstaluj wymagany pakiet
Najpierw zainstaluj TFX i TensorFlow Model Analysis (TFMA).
pip install -U tfx tensorflow-model-analysis
Sprawdźmy wersje TFX.
import tensorflow as tf
import tensorflow_model_analysis as tfma
import tfx
print('TF version: {}'.format(tf.__version__))
print('TFMA version: {}'.format(tfma.__version__))
print('TFX version: {}'.format(tfx.__version__))
TF version: 2.7.1 TFMA version: 0.37.0 TFX version: 1.6.0
Jesteśmy gotowi do stworzenia potoku.
Ustaw PROJECT_DIR
na odpowiednie miejsce docelowe dla Twojego środowiska. Wartość domyślna to ~/imported/${PIPELINE_NAME}
, która jest odpowiednia dla środowiska Notatnika Google Cloud AI Platform .
Możesz nadać swojemu potoku inną nazwę, zmieniając poniższą nazwę PIPELINE_NAME
. Będzie to również nazwa katalogu projektu, w którym zostaną umieszczone twoje pliki.
PIPELINE_NAME="my_pipeline"
import os
# Set this project directory to your new tfx pipeline project.
PROJECT_DIR=os.path.join(os.path.expanduser("~"), "imported", PIPELINE_NAME)
Skopiuj pliki szablonów.
TFX zawiera szablon penguin
z pakietem TFX Python. szablon penguin
zawiera wiele instrukcji, jak wprowadzić zestaw danych do potoku, co jest celem tego samouczka.
Polecenie interfejsu wiersza polecenia tfx template copy
kopiuje wstępnie zdefiniowane pliki szablonów do katalogu projektu.
# Set `PATH` to include user python binary directory and a directory containing `skaffold`.
PATH=%env PATH
%env PATH={PATH}:/home/jupyter/.local/bin
!tfx template copy \
--pipeline-name={PIPELINE_NAME} \
--destination-path={PROJECT_DIR} \
--model=penguin
env: PATH=/tmpfs/src/tf_docs_env/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/games:/usr/local/games:/snap/bin:/opt/puppetlabs/bin:/opt/android-studio/current/bin:/usr/local/go/bin:/usr/local/go/packages/bin:/opt/kubernetes/client/bin/:/home/kbuilder/.local/bin:/home/jupyter/.local/bin CLI Copying penguin pipeline template kubeflow_runner.py -> /home/kbuilder/imported/my_pipeline/kubeflow_runner.py configs.py -> /home/kbuilder/imported/my_pipeline/pipeline/configs.py pipeline.py -> /home/kbuilder/imported/my_pipeline/pipeline/pipeline.py __init__.py -> /home/kbuilder/imported/my_pipeline/pipeline/__init__.py model.py -> /home/kbuilder/imported/my_pipeline/models/model.py features.py -> /home/kbuilder/imported/my_pipeline/models/features.py features_test.py -> /home/kbuilder/imported/my_pipeline/models/features_test.py preprocessing_test.py -> /home/kbuilder/imported/my_pipeline/models/preprocessing_test.py preprocessing.py -> /home/kbuilder/imported/my_pipeline/models/preprocessing.py model_test.py -> /home/kbuilder/imported/my_pipeline/models/model_test.py __init__.py -> /home/kbuilder/imported/my_pipeline/models/__init__.py constants.py -> /home/kbuilder/imported/my_pipeline/models/constants.py local_runner.py -> /home/kbuilder/imported/my_pipeline/local_runner.py __init__.py -> /home/kbuilder/imported/my_pipeline/__init__.py
Zmień kontekst katalogu roboczego w tym notatniku na katalog projektu.
%cd {PROJECT_DIR}
/home/kbuilder/imported/my_pipeline
Przeglądaj skopiowane pliki źródłowe
Szablon TFX zawiera podstawowe pliki szkieletowe do zbudowania potoku, w tym kod źródłowy Pythona i przykładowe dane. Szablon penguin
wykorzystuje ten sam zbiór danych Palmer Penguins i model ML, co przykład Penguin .
Oto krótkie wprowadzenie do każdego z plików Pythona.
-
pipeline
— ten katalog zawiera definicję potoku-
configs.py
— definiuje wspólne stałe dla programów uruchamiających potok -
pipeline.py
— definiuje komponenty TFX i potok
-
-
models
— ten katalog zawiera definicje modeli ML-
features.py
,features_test.py
— definiuje cechy dla modelu -
preprocessing.py
,preprocessing_test.py
— definiuje procedury wstępnego przetwarzania danych -
constants.py
— definiuje stałe modelu -
model.py
,model_test.py
— definiuje model ML przy użyciu frameworków ML, takich jak TensorFlow
-
-
local_runner.py
— zdefiniuj runner dla środowiska lokalnego, który używa lokalnego silnika orkiestracji -
kubeflow_runner.py
— zdefiniuj moduł uruchamiający dla silnika orkiestracji Kubeflow Pipelines
Domyślnie szablon zawiera tylko standardowe komponenty TFX. Jeśli potrzebujesz niestandardowych akcji, możesz utworzyć niestandardowe składniki dla swojego potoku. Szczegółowe informacje można znaleźć w przewodniku po komponentach niestandardowych TFX .
Pliki testów jednostkowych.
Możesz zauważyć, że istnieją pliki z _test.py
w nazwie. Są to testy jednostkowe potoku i zaleca się dodanie większej liczby testów jednostkowych podczas implementowania własnych potoków. Testy jednostkowe można uruchamiać, podając nazwę modułu plików testowych z flagą -m
. Nazwę modułu można zwykle uzyskać, usuwając rozszerzenie .py
i zastępując /
.
. Na przykład:
import sys
!{sys.executable} -m models.features_test
Running tests under Python 3.7.5: /tmpfs/src/tf_docs_env/bin/python [ RUN ] FeaturesTest.testLabelKey INFO:tensorflow:time(__main__.FeaturesTest.testLabelKey): 0.0s I0203 11:08:46.306882 140258321348416 test_util.py:2309] time(__main__.FeaturesTest.testLabelKey): 0.0s [ OK ] FeaturesTest.testLabelKey [ RUN ] FeaturesTest.test_session [ SKIPPED ] FeaturesTest.test_session ---------------------------------------------------------------------- Ran 2 tests in 0.001s OK (skipped=1)
Utwórz potok TFX w środowisku lokalnym.
TFX obsługuje kilka silników orkiestracji do uruchamiania potoków. Użyjemy lokalnego silnika orkiestracji. Lokalny aparat aranżacji działa bez żadnych dalszych zależności i jest odpowiedni do programowania i debugowania, ponieważ działa w środowisku lokalnym, a nie zależy od zdalnych klastrów obliczeniowych.
Użyjemy local_runner.py
do uruchomienia potoku za pomocą lokalnego programu Orchestrator. Musisz utworzyć potok przed jego uruchomieniem. Możesz utworzyć potok za pomocą polecenia pipeline create
.
tfx pipeline create --engine=local --pipeline_path=local_runner.py
CLI Creating pipeline INFO:absl:Excluding no splits because exclude_splits is not set. INFO:absl:Excluding no splits because exclude_splits is not set. Pipeline "my_pipeline" created successfully.
Polecenie pipeline create
rejestruje potok zdefiniowany w local_runner.py
bez faktycznego uruchamiania go.
Utworzony potok uruchomisz za pomocą polecenia run create
w kolejnych krokach.
Krok 2. Pozyskaj SWOJE dane do potoku.
Początkowy potok pozyskuje zestaw danych pingwina, który jest zawarty w szablonie. Musisz umieścić swoje dane w potoku, a większość potoków TFX zaczyna się od komponentu ExampleGen.
Wybierz przykładową generację
Twoje dane mogą być przechowywane w dowolnym miejscu, do którego masz dostęp, w lokalnym lub rozproszonym systemie plików lub w systemie z możliwością zapytań. TFX zapewnia różne komponenty ExampleGen
, aby przenieść dane do potoku TFX. Możesz wybrać jeden z poniższych przykładowych elementów generujących.
- CsvExampleGen: odczytuje pliki CSV w katalogu. Używany w przykładzie pingwina i przykładu taksówki z Chicago .
- ImportExampleGen: przyjmuje pliki TFRecord w formacie danych Przykład TF. Używane w przykładach MNIST .
- FileBasedExampleGen dla formatu Avro lub Parquet .
- BigQueryExampleGen : odczytuje dane bezpośrednio w Google Cloud BigQuery. Używane w przykładach taksówek z Chicago .
Możesz także utworzyć własny ExampleGen, na przykład tfx zawiera niestandardowy ExecampleGen, który używa Presto jako źródła danych. Zobacz przewodnik , aby uzyskać więcej informacji o tym, jak używać i rozwijać niestandardowe executory.
Gdy zdecydujesz, którego przykładowego Gen użyć, będziesz musiał zmodyfikować definicję potoku, aby korzystać z danych.
Zmodyfikuj
DATA_PATH
wlocal_runner.py
i ustaw ją na lokalizację swoich plików.- Jeśli masz pliki w środowisku lokalnym, określ ścieżkę. Jest to najlepsza opcja do tworzenia lub debugowania potoku.
- Jeśli pliki są przechowywane w GCS, możesz użyć ścieżki zaczynającej się od
gs://{bucket_name}/...
. Upewnij się, że możesz uzyskać dostęp do GCS ze swojego terminala, na przykład za pomocągsutil
. W razie potrzeby postępuj zgodnie z przewodnikiem autoryzacji w Google Cloud . - Jeśli chcesz użyć obiektu ExampleGen opartego na zapytaniu, takiego jak BigQueryExampleGen, potrzebujesz instrukcji Query, aby wybrać dane ze źródła danych. Aby używać Google Cloud BigQuery jako źródła danych, musisz ustawić jeszcze kilka rzeczy.
- W
pipeline/configs.py
:- Zmień
GOOGLE_CLOUD_PROJECT
iGCS_BUCKET_NAME
na swój projekt GCP i nazwę zasobnika. Wiadro powinno istnieć przed uruchomieniem potoku. -
BIG_QUERY_WITH_DIRECT_RUNNER_BEAM_PIPELINE_ARGS
zmienną BIG_QUERY_WITH_DIRECT_RUNNER_BEAM_PIPELINE_ARGS. - Odkomentuj i ustaw zmienną
BIG_QUERY_QUERY
w swoim zapytaniu .
- Zmień
- W
local_runner.py
:- Skomentuj argument
data_path
i usuń komentarz z argumentuquery
wpipeline.create_pipeline()
.
- Skomentuj argument
- W
pipeline/pipeline.py
:- Skomentuj argument
data_path
i odkomentuj argumentquery
wcreate_pipeline()
. - Użyj BigQueryExampleGen zamiast CsvExampleGen.
- Skomentuj argument
Zastąp istniejący CsvExampleGen swoją klasą ExampleGen w
pipeline/pipeline.py
. Każda klasa ExampleGen ma inny podpis. Więcej szczegółów można znaleźć w przewodniku po komponentach ExampleGen . Nie zapomnij zaimportować wymaganych modułów za pomocą instrukcjiimport
wpipeline/pipeline.py
.
Początkowy potok składa się z czterech komponentów: ExampleGen
, StatisticsGen
, SchemaGen
i ExampleValidator
. Nie musimy nic zmieniać dla StatisticsGen
, SchemaGen
i ExampleValidator
. Uruchommy potok po raz pierwszy.
# Update and run the pipeline.
!tfx pipeline update --engine=local --pipeline_path=local_runner.py \
&& tfx run create --engine=local --pipeline_name={PIPELINE_NAME}
CLI Updating pipeline INFO:absl:Excluding no splits because exclude_splits is not set. INFO:absl:Excluding no splits because exclude_splits is not set. Pipeline "my_pipeline" updated successfully. CLI Creating a run for pipeline: my_pipeline INFO:absl:Excluding no splits because exclude_splits is not set. INFO:absl:Excluding no splits because exclude_splits is not set. INFO:absl:Using deployment config: executor_specs { key: "CsvExampleGen" value { beam_executable_spec { python_executor_spec { class_path: "tfx.components.example_gen.csv_example_gen.executor.Executor" } } } } executor_specs { key: "SchemaGen" value { python_class_executable_spec { class_path: "tfx.components.schema_gen.executor.Executor" } } } executor_specs { key: "StatisticsGen" value { beam_executable_spec { python_executor_spec { class_path: "tfx.components.statistics_gen.executor.Executor" } } } } custom_driver_specs { key: "CsvExampleGen" value { python_class_executable_spec { class_path: "tfx.components.example_gen.driver.FileBasedDriver" } } } metadata_connection_config { database_connection_config { sqlite { filename_uri: "./tfx_metadata/my_pipeline/metadata.db" connection_mode: READWRITE_OPENCREATE } } } INFO:absl:Using connection config: sqlite { filename_uri: "./tfx_metadata/my_pipeline/metadata.db" connection_mode: READWRITE_OPENCREATE } INFO:absl:Component CsvExampleGen is running. INFO:absl:Running launcher for node_info { type { name: "tfx.components.example_gen.csv_example_gen.component.CsvExampleGen" } id: "CsvExampleGen" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:09:12.120566" } } } contexts { type { name: "node" } name { field_value { string_value: "my_pipeline.CsvExampleGen" } } } } outputs { outputs { key: "examples" value { artifact_spec { type { name: "Examples" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } properties { key: "version" value: INT } base_type: DATASET } } } } } parameters { parameters { key: "input_base" value { field_value { string_value: "/home/kbuilder/imported/my_pipeline/data" } } } parameters { key: "input_config" value { field_value { string_value: "{\n \"splits\": [\n {\n \"name\": \"single_split\",\n \"pattern\": \"*\"\n }\n ]\n}" } } } parameters { key: "output_config" value { field_value { string_value: "{\n \"split_config\": {\n \"splits\": [\n {\n \"hash_buckets\": 2,\n \"name\": \"train\"\n },\n {\n \"hash_buckets\": 1,\n \"name\": \"eval\"\n }\n ]\n }\n}" } } } parameters { key: "output_data_format" value { field_value { int_value: 6 } } } parameters { key: "output_file_format" value { field_value { int_value: 5 } } } } downstream_nodes: "StatisticsGen" execution_options { caching_options { } } INFO:absl:MetadataStore with DB connection initialized INFO:absl:select span and version = (0, None) INFO:absl:latest span and version = (0, None) INFO:absl:MetadataStore with DB connection initialized INFO:absl:Going to run a new execution 1 INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=1, input_dict={}, output_dict=defaultdict(<class 'list'>, {'examples': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/1" custom_properties { key: "input_fingerprint" value { string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522" } } custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:09:12.120566:CsvExampleGen:examples:0" } } custom_properties { key: "span" value { int_value: 0 } } , artifact_type: name: "Examples" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } properties { key: "version" value: INT } base_type: DATASET )]}), exec_properties={'input_config': '{\n "splits": [\n {\n "name": "single_split",\n "pattern": "*"\n }\n ]\n}', 'output_data_format': 6, 'output_file_format': 5, 'output_config': '{\n "split_config": {\n "splits": [\n {\n "hash_buckets": 2,\n "name": "train"\n },\n {\n "hash_buckets": 1,\n "name": "eval"\n }\n ]\n }\n}', 'input_base': '/home/kbuilder/imported/my_pipeline/data', 'span': 0, 'version': None, 'input_fingerprint': 'split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522'}, execution_output_uri='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/executor_execution/1/executor_output.pb', stateful_working_dir='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/stateful_working_dir/2022-02-03T11:09:12.120566', tmp_dir='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/executor_execution/1/.temp/', pipeline_node=node_info { type { name: "tfx.components.example_gen.csv_example_gen.component.CsvExampleGen" } id: "CsvExampleGen" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:09:12.120566" } } } contexts { type { name: "node" } name { field_value { string_value: "my_pipeline.CsvExampleGen" } } } } outputs { outputs { key: "examples" value { artifact_spec { type { name: "Examples" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } properties { key: "version" value: INT } base_type: DATASET } } } } } parameters { parameters { key: "input_base" value { field_value { string_value: "/home/kbuilder/imported/my_pipeline/data" } } } parameters { key: "input_config" value { field_value { string_value: "{\n \"splits\": [\n {\n \"name\": \"single_split\",\n \"pattern\": \"*\"\n }\n ]\n}" } } } parameters { key: "output_config" value { field_value { string_value: "{\n \"split_config\": {\n \"splits\": [\n {\n \"hash_buckets\": 2,\n \"name\": \"train\"\n },\n {\n \"hash_buckets\": 1,\n \"name\": \"eval\"\n }\n ]\n }\n}" } } } parameters { key: "output_data_format" value { field_value { int_value: 6 } } } parameters { key: "output_file_format" value { field_value { int_value: 5 } } } } downstream_nodes: "StatisticsGen" execution_options { caching_options { } } , pipeline_info=id: "my_pipeline" , pipeline_run_id='2022-02-03T11:09:12.120566') INFO:absl:Generating examples. INFO:absl:Processing input csv data /home/kbuilder/imported/my_pipeline/data/* to TFExample. WARNING:root:Make sure that locally built Python SDK docker image has Python 3.7 interpreter. E0203 11:09:12.848598153 5127 fork_posix.cc:70] Fork support is only compatible with the epoll1 and poll polling strategies WARNING:apache_beam.io.tfrecordio:Couldn't find python-snappy so the implementation of _TFRecordUtil._masked_crc32c is not as fast as it could be. INFO:absl:Examples generated. INFO:absl:Value type <class 'NoneType'> of key version in exec_properties is not supported, going to drop it INFO:absl:Value type <class 'list'> of key _beam_pipeline_args in exec_properties is not supported, going to drop it INFO:absl:Cleaning up stateless execution info. INFO:absl:Execution 1 succeeded. INFO:absl:Cleaning up stateful execution info. INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'examples': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/1" custom_properties { key: "input_fingerprint" value { string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522" } } custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:09:12.120566:CsvExampleGen:examples:0" } } custom_properties { key: "span" value { int_value: 0 } } custom_properties { key: "tfx_version" value { string_value: "1.6.0" } } , artifact_type: name: "Examples" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } properties { key: "version" value: INT } base_type: DATASET )]}) for execution 1 INFO:absl:MetadataStore with DB connection initialized INFO:absl:Component CsvExampleGen is finished. INFO:absl:Component StatisticsGen is running. INFO:absl:Running launcher for node_info { type { name: "tfx.components.statistics_gen.component.StatisticsGen" base_type: PROCESS } id: "StatisticsGen" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:09:12.120566" } } } contexts { type { name: "node" } name { field_value { string_value: "my_pipeline.StatisticsGen" } } } } inputs { inputs { key: "examples" value { channels { producer_node_query { id: "CsvExampleGen" } context_queries { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } context_queries { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:09:12.120566" } } } context_queries { type { name: "node" } name { field_value { string_value: "my_pipeline.CsvExampleGen" } } } artifact_query { type { name: "Examples" base_type: DATASET } } output_key: "examples" } min_count: 1 } } } outputs { outputs { key: "statistics" value { artifact_spec { type { name: "ExampleStatistics" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } base_type: STATISTICS } } } } } parameters { parameters { key: "exclude_splits" value { field_value { string_value: "[]" } } } } upstream_nodes: "CsvExampleGen" downstream_nodes: "SchemaGen" execution_options { caching_options { } } INFO:absl:MetadataStore with DB connection initialized INFO:absl:MetadataStore with DB connection initialized INFO:absl:Going to run a new execution 2 INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=2, input_dict={'examples': [Artifact(artifact: id: 1 type_id: 15 uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/1" properties { key: "split_names" value { string_value: "[\"train\", \"eval\"]" } } custom_properties { key: "file_format" value { string_value: "tfrecords_gzip" } } custom_properties { key: "input_fingerprint" value { string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522" } } custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:09:12.120566:CsvExampleGen:examples:0" } } custom_properties { key: "payload_format" value { string_value: "FORMAT_TF_EXAMPLE" } } custom_properties { key: "span" value { int_value: 0 } } custom_properties { key: "tfx_version" value { string_value: "1.6.0" } } state: LIVE create_time_since_epoch: 1643886553302 last_update_time_since_epoch: 1643886553302 , artifact_type: id: 15 name: "Examples" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } properties { key: "version" value: INT } base_type: DATASET )]}, output_dict=defaultdict(<class 'list'>, {'statistics': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/2" custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:09:12.120566:StatisticsGen:statistics:0" } } , artifact_type: name: "ExampleStatistics" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } base_type: STATISTICS )]}), exec_properties={'exclude_splits': '[]'}, execution_output_uri='./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/executor_execution/2/executor_output.pb', stateful_working_dir='./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/stateful_working_dir/2022-02-03T11:09:12.120566', tmp_dir='./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/executor_execution/2/.temp/', pipeline_node=node_info { type { name: "tfx.components.statistics_gen.component.StatisticsGen" base_type: PROCESS } id: "StatisticsGen" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:09:12.120566" } } } contexts { type { name: "node" } name { field_value { string_value: "my_pipeline.StatisticsGen" } } } } inputs { inputs { key: "examples" value { channels { producer_node_query { id: "CsvExampleGen" } context_queries { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } context_queries { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:09:12.120566" } } } context_queries { type { name: "node" } name { field_value { string_value: "my_pipeline.CsvExampleGen" } } } artifact_query { type { name: "Examples" base_type: DATASET } } output_key: "examples" } min_count: 1 } } } outputs { outputs { key: "statistics" value { artifact_spec { type { name: "ExampleStatistics" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } base_type: STATISTICS } } } } } parameters { parameters { key: "exclude_splits" value { field_value { string_value: "[]" } } } } upstream_nodes: "CsvExampleGen" downstream_nodes: "SchemaGen" execution_options { caching_options { } } , pipeline_info=id: "my_pipeline" , pipeline_run_id='2022-02-03T11:09:12.120566') INFO:absl:Generating statistics for split train. INFO:absl:Statistics for split train written to ./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/2/Split-train. INFO:absl:Generating statistics for split eval. INFO:absl:Statistics for split eval written to ./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/2/Split-eval. WARNING:root:Make sure that locally built Python SDK docker image has Python 3.7 interpreter. INFO:absl:Cleaning up stateless execution info. INFO:absl:Execution 2 succeeded. INFO:absl:Cleaning up stateful execution info. INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'statistics': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/2" custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:09:12.120566:StatisticsGen:statistics:0" } } custom_properties { key: "tfx_version" value { string_value: "1.6.0" } } , artifact_type: name: "ExampleStatistics" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } base_type: STATISTICS )]}) for execution 2 INFO:absl:MetadataStore with DB connection initialized INFO:absl:Component StatisticsGen is finished. INFO:absl:Component SchemaGen is running. INFO:absl:Running launcher for node_info { type { name: "tfx.components.schema_gen.component.SchemaGen" base_type: PROCESS } id: "SchemaGen" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:09:12.120566" } } } contexts { type { name: "node" } name { field_value { string_value: "my_pipeline.SchemaGen" } } } } inputs { inputs { key: "statistics" value { channels { producer_node_query { id: "StatisticsGen" } context_queries { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } context_queries { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:09:12.120566" } } } context_queries { type { name: "node" } name { field_value { string_value: "my_pipeline.StatisticsGen" } } } artifact_query { type { name: "ExampleStatistics" base_type: STATISTICS } } output_key: "statistics" } min_count: 1 } } } outputs { outputs { key: "schema" value { artifact_spec { type { name: "Schema" } } } } } parameters { parameters { key: "exclude_splits" value { field_value { string_value: "[]" } } } parameters { key: "infer_feature_shape" value { field_value { int_value: 1 } } } } upstream_nodes: "StatisticsGen" execution_options { caching_options { } } INFO:absl:MetadataStore with DB connection initialized INFO:absl:MetadataStore with DB connection initialized INFO:absl:Going to run a new execution 3 INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=3, input_dict={'statistics': [Artifact(artifact: id: 2 type_id: 17 uri: "./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/2" properties { key: "split_names" value { string_value: "[\"train\", \"eval\"]" } } custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:09:12.120566:StatisticsGen:statistics:0" } } custom_properties { key: "tfx_version" value { string_value: "1.6.0" } } state: LIVE create_time_since_epoch: 1643886556588 last_update_time_since_epoch: 1643886556588 , artifact_type: id: 17 name: "ExampleStatistics" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } base_type: STATISTICS )]}, output_dict=defaultdict(<class 'list'>, {'schema': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/SchemaGen/schema/3" custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:09:12.120566:SchemaGen:schema:0" } } , artifact_type: name: "Schema" )]}), exec_properties={'infer_feature_shape': 1, 'exclude_splits': '[]'}, execution_output_uri='./tfx_pipeline_output/my_pipeline/SchemaGen/.system/executor_execution/3/executor_output.pb', stateful_working_dir='./tfx_pipeline_output/my_pipeline/SchemaGen/.system/stateful_working_dir/2022-02-03T11:09:12.120566', tmp_dir='./tfx_pipeline_output/my_pipeline/SchemaGen/.system/executor_execution/3/.temp/', pipeline_node=node_info { type { name: "tfx.components.schema_gen.component.SchemaGen" base_type: PROCESS } id: "SchemaGen" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:09:12.120566" } } } contexts { type { name: "node" } name { field_value { string_value: "my_pipeline.SchemaGen" } } } } inputs { inputs { key: "statistics" value { channels { producer_node_query { id: "StatisticsGen" } context_queries { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } context_queries { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:09:12.120566" } } } context_queries { type { name: "node" } name { field_value { string_value: "my_pipeline.StatisticsGen" } } } artifact_query { type { name: "ExampleStatistics" base_type: STATISTICS } } output_key: "statistics" } min_count: 1 } } } outputs { outputs { key: "schema" value { artifact_spec { type { name: "Schema" } } } } } parameters { parameters { key: "exclude_splits" value { field_value { string_value: "[]" } } } parameters { key: "infer_feature_shape" value { field_value { int_value: 1 } } } } upstream_nodes: "StatisticsGen" execution_options { caching_options { } } , pipeline_info=id: "my_pipeline" , pipeline_run_id='2022-02-03T11:09:12.120566') INFO:absl:Processing schema from statistics for split train. INFO:absl:Processing schema from statistics for split eval. INFO:absl:Schema written to ./tfx_pipeline_output/my_pipeline/SchemaGen/schema/3/schema.pbtxt. INFO:absl:Cleaning up stateless execution info. INFO:absl:Execution 3 succeeded. INFO:absl:Cleaning up stateful execution info. INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'schema': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/SchemaGen/schema/3" custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:09:12.120566:SchemaGen:schema:0" } } custom_properties { key: "tfx_version" value { string_value: "1.6.0" } } , artifact_type: name: "Schema" )]}) for execution 3 INFO:absl:MetadataStore with DB connection initialized INFO:absl:Component SchemaGen is finished.
Powinieneś zobaczyć komunikat „Component ExampleValidator” został zakończony. jeśli potok został pomyślnie uruchomiony.
Zbadaj wydajność rurociągu.
Potok TFX generuje dwa rodzaje danych wyjściowych, artefakty i bazę danych metadanych (MLMD) , która zawiera metadane artefaktów i wykonań potoku. Lokalizacja wyjścia jest zdefiniowana w local_runner.py
. Domyślnie artefakty są przechowywane w katalogu tfx_pipeline_output
, a metadane są przechowywane jako baza danych sqlite w katalogu tfx_metadata
.
Możesz użyć interfejsów API MLMD do zbadania tych danych wyjściowych. Najpierw zdefiniujemy kilka funkcji narzędziowych do wyszukiwania artefaktów wyjściowych, które właśnie zostały utworzone.
import tensorflow as tf
import tfx
from ml_metadata import errors
from ml_metadata.proto import metadata_store_pb2
from tfx.types import artifact_utils
# TODO(b/171447278): Move these functions into TFX library.
def get_latest_executions(store, pipeline_name, component_id = None):
"""Fetch all pipeline runs."""
if component_id is None: # Find entire pipeline runs.
run_contexts = [
c for c in store.get_contexts_by_type('run')
if c.properties['pipeline_name'].string_value == pipeline_name
]
else: # Find specific component runs.
run_contexts = [
c for c in store.get_contexts_by_type('component_run')
if c.properties['pipeline_name'].string_value == pipeline_name and
c.properties['component_id'].string_value == component_id
]
if not run_contexts:
return []
# Pick the latest run context.
latest_context = max(run_contexts,
key=lambda c: c.last_update_time_since_epoch)
return store.get_executions_by_context(latest_context.id)
def get_latest_artifacts(store, pipeline_name, component_id = None):
"""Fetch all artifacts from latest pipeline execution."""
executions = get_latest_executions(store, pipeline_name, component_id)
# Fetch all artifacts produced from the given executions.
execution_ids = [e.id for e in executions]
events = store.get_events_by_execution_ids(execution_ids)
artifact_ids = [
event.artifact_id for event in events
if event.type == metadata_store_pb2.Event.OUTPUT
]
return store.get_artifacts_by_id(artifact_ids)
def find_latest_artifacts_by_type(store, artifacts, artifact_type):
"""Get the latest artifacts of a specified type."""
# Get type information from MLMD
try:
artifact_type = store.get_artifact_type(artifact_type)
except errors.NotFoundError:
return []
# Filter artifacts with type.
filtered_artifacts = [aritfact for aritfact in artifacts
if aritfact.type_id == artifact_type.id]
# Convert MLMD artifact data into TFX Artifact instances.
return [artifact_utils.deserialize_artifact(artifact_type, artifact)
for artifact in filtered_artifacts]
from tfx.orchestration.experimental.interactive import visualizations
def visualize_artifacts(artifacts):
"""Visualizes artifacts using standard visualization modules."""
for artifact in artifacts:
visualization = visualizations.get_registry().get_visualization(
artifact.type_name)
if visualization:
visualization.display(artifact)
from tfx.orchestration.experimental.interactive import standard_visualizations
standard_visualizations.register_standard_visualizations()
import pprint
from tfx.orchestration import metadata
from tfx.types import artifact_utils
from tfx.types import standard_artifacts
def preview_examples(artifacts):
"""Preview a few records from Examples artifacts."""
pp = pprint.PrettyPrinter()
for artifact in artifacts:
print("==== Examples artifact:{}({})".format(artifact.name, artifact.uri))
for split in artifact_utils.decode_split_names(artifact.split_names):
print("==== Reading from split:{}".format(split))
split_uri = artifact_utils.get_split_uri([artifact], split)
# Get the list of files in this directory (all compressed TFRecord files)
tfrecord_filenames = [os.path.join(split_uri, name)
for name in os.listdir(split_uri)]
# Create a `TFRecordDataset` to read these files
dataset = tf.data.TFRecordDataset(tfrecord_filenames,
compression_type="GZIP")
# Iterate over the first 2 records and decode them.
for tfrecord in dataset.take(2):
serialized_example = tfrecord.numpy()
example = tf.train.Example()
example.ParseFromString(serialized_example)
pp.pprint(example)
import local_runner
metadata_connection_config = metadata.sqlite_metadata_connection_config(
local_runner.METADATA_PATH)
Teraz możemy odczytać metadane artefaktów wyjściowych z MLMD.
with metadata.Metadata(metadata_connection_config) as metadata_handler:
# Search all aritfacts from the previous pipeline run.
artifacts = get_latest_artifacts(metadata_handler.store, PIPELINE_NAME)
# Find artifacts of Examples type.
examples_artifacts = find_latest_artifacts_by_type(
metadata_handler.store, artifacts,
standard_artifacts.Examples.TYPE_NAME)
# Find artifacts generated from StatisticsGen.
stats_artifacts = find_latest_artifacts_by_type(
metadata_handler.store, artifacts,
standard_artifacts.ExampleStatistics.TYPE_NAME)
# Find artifacts generated from SchemaGen.
schema_artifacts = find_latest_artifacts_by_type(
metadata_handler.store, artifacts,
standard_artifacts.Schema.TYPE_NAME)
# Find artifacts generated from ExampleValidator.
anomalies_artifacts = find_latest_artifacts_by_type(
metadata_handler.store, artifacts,
standard_artifacts.ExampleAnomalies.TYPE_NAME)
Teraz możemy zbadać dane wyjściowe z każdego komponentu. Walidacja danych Tensorflow (TFDV) jest używana w StatisticsGen
, SchemaGen
i ExampleValidator
, a TFDV może służyć do wizualizacji danych wyjściowych z tych komponentów.
W tym samouczku użyjemy metod pomocniczych wizualizacji w TFX, które wewnętrznie używają TFDV do pokazania wizualizacji. Proszę zapoznać się z samouczkiem dotyczącym komponentów TFX, aby dowiedzieć się więcej o każdym komponencie.
Sprawdź formularz wyjściowy PrzykładGen
Przeanalizujmy dane wyjściowe z ExampleGen. Spójrz na pierwsze dwa przykłady dla każdego podziału:
preview_examples(examples_artifacts)
Domyślnie TFX ExampleGen dzieli przykłady na dwa podziały, train i eval , ale możesz dostosować konfigurację podziału .
Sprawdź dane wyjściowe z StatisticsGen
visualize_artifacts(stats_artifacts)
Statystyki te są dostarczane do SchemaGen w celu automatycznego skonstruowania schematu danych.
Sprawdź dane wyjściowe ze SchemaGen
visualize_artifacts(schema_artifacts)
Ten schemat jest automatycznie wywnioskowany z danych wyjściowych StatisticsGen. W tym samouczku użyjemy tego wygenerowanego schematu, ale możesz również modyfikować i dostosowywać schemat .
Sprawdź dane wyjściowe z ExampleValidator
visualize_artifacts(anomalies_artifacts)
Jeśli zostaną znalezione jakiekolwiek anomalie, możesz przejrzeć swoje dane, aby wszystkie przykłady były zgodne z Twoimi założeniami. Przydatne mogą być dane wyjściowe z innych komponentów, takich jak StatistcsGen. Znalezione anomalie nie blokują wykonywania potoku.
Możesz zobaczyć dostępne funkcje z danych wyjściowych SchemaGen
. Jeśli Twoje funkcje mogą być użyte do skonstruowania modelu ML bezpośrednio w Trainer
, możesz pominąć następny krok i przejść do kroku 4. W przeciwnym razie możesz wykonać pewne prace inżynieryjne w następnym kroku. Składnik Transform
jest potrzebny, gdy wymagane są operacje pełnego przebiegu, takie jak obliczanie średnich, zwłaszcza gdy trzeba skalować.
Krok 3. (Opcjonalnie) Inżynieria funkcji z komponentem Transform.
W tym kroku zdefiniujesz różne zadania inżynierii funkcji, które będą używane przez komponent Transform
w potoku. Więcej informacji można znaleźć w przewodniku po komponentach Transformacja .
Jest to konieczne tylko wtedy, gdy kod treningowy wymaga dodatkowych funkcji, które nie są dostępne w danych wyjściowych ExampleGen. W przeciwnym razie możesz szybko przejść do następnego kroku korzystania z Trainera.
Określ cechy modelu
models/features.py
zawiera stałe do definiowania cech modelu, w tym nazwy cech, rozmiar słownictwa i tak dalej. Domyślnie szablon penguin
ma dwa kosztanty, FEATURE_KEYS
i LABEL_KEY
, ponieważ nasz model penguin
rozwiązuje problem klasyfikacji przy użyciu nadzorowanego uczenia się, a wszystkie funkcje są ciągłymi cechami numerycznymi. Zobacz definicje funkcji z przykładu chicago taxi na inny przykład.
Zaimplementuj preprocessing do trenowania/serwowania w preprocessing_fn().
Rzeczywista inżynieria funkcji ma miejsce w funkcji preprocessing_fn()
w models/preprocessing.py
.
W preprocessing_fn
możesz zdefiniować szereg funkcji, które manipulują wejściowym dyktem tensorów, aby wytworzyć dyktat wyjściowy tensorów. Istnieją funkcje pomocnicze, takie jak scale_to_0_1
i compute_and_apply_vocabulary
w API TensorFlow Transform lub możesz po prostu użyć zwykłych funkcji TensorFlow. Domyślnie szablon penguin
zawiera przykładowe zastosowania funkcji tft.scale_to_z_score do normalizacji wartości funkcji.
Więcej informacji na temat tworzenia preprocessing_fn
można znaleźć w przewodniku po transformacji Tensflow .
Dodaj składnik Transform do potoku.
Jeśli preprocessing_fn jest gotowy, dodaj składnik Transform
do potoku.
- W pliku
pipeline/pipeline.py
usuń komentarz# components.append(transform)
, aby dodać komponent do potoku.
Możesz zaktualizować potok i uruchomić go ponownie.
!tfx pipeline update --engine=local --pipeline_path=local_runner.py \
&& tfx run create --engine=local --pipeline_name={PIPELINE_NAME}
CLI Updating pipeline INFO:absl:Excluding no splits because exclude_splits is not set. INFO:absl:Excluding no splits because exclude_splits is not set. Pipeline "my_pipeline" updated successfully. CLI Creating a run for pipeline: my_pipeline INFO:absl:Excluding no splits because exclude_splits is not set. INFO:absl:Excluding no splits because exclude_splits is not set. INFO:absl:Using deployment config: executor_specs { key: "CsvExampleGen" value { beam_executable_spec { python_executor_spec { class_path: "tfx.components.example_gen.csv_example_gen.executor.Executor" } } } } executor_specs { key: "SchemaGen" value { python_class_executable_spec { class_path: "tfx.components.schema_gen.executor.Executor" } } } executor_specs { key: "StatisticsGen" value { beam_executable_spec { python_executor_spec { class_path: "tfx.components.statistics_gen.executor.Executor" } } } } custom_driver_specs { key: "CsvExampleGen" value { python_class_executable_spec { class_path: "tfx.components.example_gen.driver.FileBasedDriver" } } } metadata_connection_config { database_connection_config { sqlite { filename_uri: "./tfx_metadata/my_pipeline/metadata.db" connection_mode: READWRITE_OPENCREATE } } } INFO:absl:Using connection config: sqlite { filename_uri: "./tfx_metadata/my_pipeline/metadata.db" connection_mode: READWRITE_OPENCREATE } INFO:absl:Component CsvExampleGen is running. INFO:absl:Running launcher for node_info { type { name: "tfx.components.example_gen.csv_example_gen.component.CsvExampleGen" } id: "CsvExampleGen" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:09:37.055994" } } } contexts { type { name: "node" } name { field_value { string_value: "my_pipeline.CsvExampleGen" } } } } outputs { outputs { key: "examples" value { artifact_spec { type { name: "Examples" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } properties { key: "version" value: INT } base_type: DATASET } } } } } parameters { parameters { key: "input_base" value { field_value { string_value: "/home/kbuilder/imported/my_pipeline/data" } } } parameters { key: "input_config" value { field_value { string_value: "{\n \"splits\": [\n {\n \"name\": \"single_split\",\n \"pattern\": \"*\"\n }\n ]\n}" } } } parameters { key: "output_config" value { field_value { string_value: "{\n \"split_config\": {\n \"splits\": [\n {\n \"hash_buckets\": 2,\n \"name\": \"train\"\n },\n {\n \"hash_buckets\": 1,\n \"name\": \"eval\"\n }\n ]\n }\n}" } } } parameters { key: "output_data_format" value { field_value { int_value: 6 } } } parameters { key: "output_file_format" value { field_value { int_value: 5 } } } } downstream_nodes: "StatisticsGen" execution_options { caching_options { } } INFO:absl:MetadataStore with DB connection initialized INFO:absl:select span and version = (0, None) INFO:absl:latest span and version = (0, None) INFO:absl:MetadataStore with DB connection initialized INFO:absl:Going to run a new execution 4 INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=4, input_dict={}, output_dict=defaultdict(<class 'list'>, {'examples': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/4" custom_properties { key: "input_fingerprint" value { string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522" } } custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:09:37.055994:CsvExampleGen:examples:0" } } custom_properties { key: "span" value { int_value: 0 } } , artifact_type: name: "Examples" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } properties { key: "version" value: INT } base_type: DATASET )]}), exec_properties={'output_file_format': 5, 'input_base': '/home/kbuilder/imported/my_pipeline/data', 'output_config': '{\n "split_config": {\n "splits": [\n {\n "hash_buckets": 2,\n "name": "train"\n },\n {\n "hash_buckets": 1,\n "name": "eval"\n }\n ]\n }\n}', 'output_data_format': 6, 'input_config': '{\n "splits": [\n {\n "name": "single_split",\n "pattern": "*"\n }\n ]\n}', 'span': 0, 'version': None, 'input_fingerprint': 'split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522'}, execution_output_uri='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/executor_execution/4/executor_output.pb', stateful_working_dir='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/stateful_working_dir/2022-02-03T11:09:37.055994', tmp_dir='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/executor_execution/4/.temp/', pipeline_node=node_info { type { name: "tfx.components.example_gen.csv_example_gen.component.CsvExampleGen" } id: "CsvExampleGen" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:09:37.055994" } } } contexts { type { name: "node" } name { field_value { string_value: "my_pipeline.CsvExampleGen" } } } } outputs { outputs { key: "examples" value { artifact_spec { type { name: "Examples" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } properties { key: "version" value: INT } base_type: DATASET } } } } } parameters { parameters { key: "input_base" value { field_value { string_value: "/home/kbuilder/imported/my_pipeline/data" } } } parameters { key: "input_config" value { field_value { string_value: "{\n \"splits\": [\n {\n \"name\": \"single_split\",\n \"pattern\": \"*\"\n }\n ]\n}" } } } parameters { key: "output_config" value { field_value { string_value: "{\n \"split_config\": {\n \"splits\": [\n {\n \"hash_buckets\": 2,\n \"name\": \"train\"\n },\n {\n \"hash_buckets\": 1,\n \"name\": \"eval\"\n }\n ]\n }\n}" } } } parameters { key: "output_data_format" value { field_value { int_value: 6 } } } parameters { key: "output_file_format" value { field_value { int_value: 5 } } } } downstream_nodes: "StatisticsGen" execution_options { caching_options { } } , pipeline_info=id: "my_pipeline" , pipeline_run_id='2022-02-03T11:09:37.055994') INFO:absl:Generating examples. INFO:absl:Processing input csv data /home/kbuilder/imported/my_pipeline/data/* to TFExample. WARNING:root:Make sure that locally built Python SDK docker image has Python 3.7 interpreter. E0203 11:09:37.596944686 5287 fork_posix.cc:70] Fork support is only compatible with the epoll1 and poll polling strategies WARNING:apache_beam.io.tfrecordio:Couldn't find python-snappy so the implementation of _TFRecordUtil._masked_crc32c is not as fast as it could be. INFO:absl:Examples generated. INFO:absl:Value type <class 'NoneType'> of key version in exec_properties is not supported, going to drop it INFO:absl:Value type <class 'list'> of key _beam_pipeline_args in exec_properties is not supported, going to drop it INFO:absl:Cleaning up stateless execution info. INFO:absl:Execution 4 succeeded. INFO:absl:Cleaning up stateful execution info. INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'examples': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/4" custom_properties { key: "input_fingerprint" value { string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522" } } custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:09:37.055994:CsvExampleGen:examples:0" } } custom_properties { key: "span" value { int_value: 0 } } custom_properties { key: "tfx_version" value { string_value: "1.6.0" } } , artifact_type: name: "Examples" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } properties { key: "version" value: INT } base_type: DATASET )]}) for execution 4 INFO:absl:MetadataStore with DB connection initialized INFO:absl:Component CsvExampleGen is finished. INFO:absl:Component StatisticsGen is running. INFO:absl:Running launcher for node_info { type { name: "tfx.components.statistics_gen.component.StatisticsGen" base_type: PROCESS } id: "StatisticsGen" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:09:37.055994" } } } contexts { type { name: "node" } name { field_value { string_value: "my_pipeline.StatisticsGen" } } } } inputs { inputs { key: "examples" value { channels { producer_node_query { id: "CsvExampleGen" } context_queries { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } context_queries { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:09:37.055994" } } } context_queries { type { name: "node" } name { field_value { string_value: "my_pipeline.CsvExampleGen" } } } artifact_query { type { name: "Examples" base_type: DATASET } } output_key: "examples" } min_count: 1 } } } outputs { outputs { key: "statistics" value { artifact_spec { type { name: "ExampleStatistics" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } base_type: STATISTICS } } } } } parameters { parameters { key: "exclude_splits" value { field_value { string_value: "[]" } } } } upstream_nodes: "CsvExampleGen" downstream_nodes: "SchemaGen" execution_options { caching_options { } } INFO:absl:MetadataStore with DB connection initialized INFO:absl:MetadataStore with DB connection initialized INFO:absl:Going to run a new execution 5 INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=5, input_dict={'examples': [Artifact(artifact: id: 4 type_id: 15 uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/4" properties { key: "split_names" value { string_value: "[\"train\", \"eval\"]" } } custom_properties { key: "file_format" value { string_value: "tfrecords_gzip" } } custom_properties { key: "input_fingerprint" value { string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522" } } custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:09:37.055994:CsvExampleGen:examples:0" } } custom_properties { key: "payload_format" value { string_value: "FORMAT_TF_EXAMPLE" } } custom_properties { key: "span" value { int_value: 0 } } custom_properties { key: "tfx_version" value { string_value: "1.6.0" } } state: LIVE create_time_since_epoch: 1643886578210 last_update_time_since_epoch: 1643886578210 , artifact_type: id: 15 name: "Examples" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } properties { key: "version" value: INT } base_type: DATASET )]}, output_dict=defaultdict(<class 'list'>, {'statistics': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/5" custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:09:37.055994:StatisticsGen:statistics:0" } } , artifact_type: name: "ExampleStatistics" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } base_type: STATISTICS )]}), exec_properties={'exclude_splits': '[]'}, execution_output_uri='./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/executor_execution/5/executor_output.pb', stateful_working_dir='./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/stateful_working_dir/2022-02-03T11:09:37.055994', tmp_dir='./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/executor_execution/5/.temp/', pipeline_node=node_info { type { name: "tfx.components.statistics_gen.component.StatisticsGen" base_type: PROCESS } id: "StatisticsGen" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:09:37.055994" } } } contexts { type { name: "node" } name { field_value { string_value: "my_pipeline.StatisticsGen" } } } } inputs { inputs { key: "examples" value { channels { producer_node_query { id: "CsvExampleGen" } context_queries { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } context_queries { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:09:37.055994" } } } context_queries { type { name: "node" } name { field_value { string_value: "my_pipeline.CsvExampleGen" } } } artifact_query { type { name: "Examples" base_type: DATASET } } output_key: "examples" } min_count: 1 } } } outputs { outputs { key: "statistics" value { artifact_spec { type { name: "ExampleStatistics" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } base_type: STATISTICS } } } } } parameters { parameters { key: "exclude_splits" value { field_value { string_value: "[]" } } } } upstream_nodes: "CsvExampleGen" downstream_nodes: "SchemaGen" execution_options { caching_options { } } , pipeline_info=id: "my_pipeline" , pipeline_run_id='2022-02-03T11:09:37.055994') INFO:absl:Generating statistics for split train. INFO:absl:Statistics for split train written to ./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/5/Split-train. INFO:absl:Generating statistics for split eval. INFO:absl:Statistics for split eval written to ./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/5/Split-eval. WARNING:root:Make sure that locally built Python SDK docker image has Python 3.7 interpreter. INFO:absl:Cleaning up stateless execution info. INFO:absl:Execution 5 succeeded. INFO:absl:Cleaning up stateful execution info. INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'statistics': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/5" custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:09:37.055994:StatisticsGen:statistics:0" } } custom_properties { key: "tfx_version" value { string_value: "1.6.0" } } , artifact_type: name: "ExampleStatistics" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } base_type: STATISTICS )]}) for execution 5 INFO:absl:MetadataStore with DB connection initialized INFO:absl:Component StatisticsGen is finished. INFO:absl:Component SchemaGen is running. INFO:absl:Running launcher for node_info { type { name: "tfx.components.schema_gen.component.SchemaGen" base_type: PROCESS } id: "SchemaGen" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:09:37.055994" } } } contexts { type { name: "node" } name { field_value { string_value: "my_pipeline.SchemaGen" } } } } inputs { inputs { key: "statistics" value { channels { producer_node_query { id: "StatisticsGen" } context_queries { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } context_queries { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:09:37.055994" } } } context_queries { type { name: "node" } name { field_value { string_value: "my_pipeline.StatisticsGen" } } } artifact_query { type { name: "ExampleStatistics" base_type: STATISTICS } } output_key: "statistics" } min_count: 1 } } } outputs { outputs { key: "schema" value { artifact_spec { type { name: "Schema" } } } } } parameters { parameters { key: "exclude_splits" value { field_value { string_value: "[]" } } } parameters { key: "infer_feature_shape" value { field_value { int_value: 1 } } } } upstream_nodes: "StatisticsGen" execution_options { caching_options { } } INFO:absl:MetadataStore with DB connection initialized INFO:absl:MetadataStore with DB connection initialized INFO:absl:Going to run a new execution 6 INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=6, input_dict={'statistics': [Artifact(artifact: id: 5 type_id: 17 uri: "./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/5" properties { key: "split_names" value { string_value: "[\"train\", \"eval\"]" } } custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:09:37.055994:StatisticsGen:statistics:0" } } custom_properties { key: "tfx_version" value { string_value: "1.6.0" } } state: LIVE create_time_since_epoch: 1643886581527 last_update_time_since_epoch: 1643886581527 , artifact_type: id: 17 name: "ExampleStatistics" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } base_type: STATISTICS )]}, output_dict=defaultdict(<class 'list'>, {'schema': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/SchemaGen/schema/6" custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:09:37.055994:SchemaGen:schema:0" } } , artifact_type: name: "Schema" )]}), exec_properties={'exclude_splits': '[]', 'infer_feature_shape': 1}, execution_output_uri='./tfx_pipeline_output/my_pipeline/SchemaGen/.system/executor_execution/6/executor_output.pb', stateful_working_dir='./tfx_pipeline_output/my_pipeline/SchemaGen/.system/stateful_working_dir/2022-02-03T11:09:37.055994', tmp_dir='./tfx_pipeline_output/my_pipeline/SchemaGen/.system/executor_execution/6/.temp/', pipeline_node=node_info { type { name: "tfx.components.schema_gen.component.SchemaGen" base_type: PROCESS } id: "SchemaGen" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:09:37.055994" } } } contexts { type { name: "node" } name { field_value { string_value: "my_pipeline.SchemaGen" } } } } inputs { inputs { key: "statistics" value { channels { producer_node_query { id: "StatisticsGen" } context_queries { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } context_queries { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:09:37.055994" } } } context_queries { type { name: "node" } name { field_value { string_value: "my_pipeline.StatisticsGen" } } } artifact_query { type { name: "ExampleStatistics" base_type: STATISTICS } } output_key: "statistics" } min_count: 1 } } } outputs { outputs { key: "schema" value { artifact_spec { type { name: "Schema" } } } } } parameters { parameters { key: "exclude_splits" value { field_value { string_value: "[]" } } } parameters { key: "infer_feature_shape" value { field_value { int_value: 1 } } } } upstream_nodes: "StatisticsGen" execution_options { caching_options { } } , pipeline_info=id: "my_pipeline" , pipeline_run_id='2022-02-03T11:09:37.055994') INFO:absl:Processing schema from statistics for split train. INFO:absl:Processing schema from statistics for split eval. INFO:absl:Schema written to ./tfx_pipeline_output/my_pipeline/SchemaGen/schema/6/schema.pbtxt. INFO:absl:Cleaning up stateless execution info. INFO:absl:Execution 6 succeeded. INFO:absl:Cleaning up stateful execution info. INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'schema': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/SchemaGen/schema/6" custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:09:37.055994:SchemaGen:schema:0" } } custom_properties { key: "tfx_version" value { string_value: "1.6.0" } } , artifact_type: name: "Schema" )]}) for execution 6 INFO:absl:MetadataStore with DB connection initialized INFO:absl:Component SchemaGen is finished.
Jeśli potok został uruchomiony pomyślnie, powinien zostać wyświetlony komunikat „Przekształcanie składników zakończone”. gdzieś w dzienniku. Ponieważ składnik Transform
i składnik ExampleValidator
nie są od siebie zależne, kolejność wykonywania nie jest stała. To powiedziawszy, zarówno Transform
, jak i ExampleValidator
mogą być ostatnim składnikiem w wykonaniu potoku.
Sprawdź dane wyjściowe z Transform
Komponent Transform tworzy dwa rodzaje danych wyjściowych, wykres Tensorflow i przekształcone przykłady. Przekształcone przykłady to typ artefaktu Przykłady, który jest również wytwarzany przez ExampleGen, ale ten zawiera zamiast tego przekształcone wartości funkcji.
Możesz je zbadać tak, jak zrobiliśmy to w poprzednim kroku.
with metadata.Metadata(metadata_connection_config) as metadata_handler:
# Search all aritfacts from the previous run of Transform component.
artifacts = get_latest_artifacts(metadata_handler.store,
PIPELINE_NAME, "Transform")
# Find artifacts of Examples type.
transformed_examples_artifacts = find_latest_artifacts_by_type(
metadata_handler.store, artifacts,
standard_artifacts.Examples.TYPE_NAME)
preview_examples(transformed_examples_artifacts)
Krok 4. Trenuj swój model za pomocą komponentu Trainer.
Zbudujemy model ML z wykorzystaniem komponentu Trainer
. Więcej informacji można znaleźć w przewodniku po komponentach trenera . Musisz podać swój kod modelu do komponentu Trainer.
Zdefiniuj swój model.
W szablonie pingwina, models.model.run_fn
, jest używany jako argument run_fn
dla komponentu Trainer
. Oznacza to, że run_fn()
w models/model.py
zostanie wywołana po uruchomieniu komponentu Trainer
. Możesz zobaczyć kod do skonstruowania prostego modelu DNN przy użyciu keras
API w danym kodzie. Zobacz TensorFlow 2.x w przewodniku TFX , aby uzyskać więcej informacji na temat używania Keras API w TFX.
W tym run_fn
należy zbudować model i zapisać go w katalogu wskazywanym przez fn_args.serving_model_dir
, który jest określony przez komponent. Możesz użyć innych argumentów w fn_args
, które są przekazywane do run_fn
. Zobacz powiązane kody, aby uzyskać pełną listę argumentów w fn_args
.
Zdefiniuj swoje cechy w models/features.py
i używaj ich w razie potrzeby. Jeśli przekształciłeś swoje elementy w kroku 3, powinieneś użyć przekształconych elementów jako danych wejściowych do swojego modelu.
Dodaj składnik Trainer do potoku.
Jeśli twój run_fn jest gotowy, dodaj składnik Trainer
do potoku.
- W pliku
pipeline/pipeline.py
usuń komentarz# components.append(trainer)
, aby dodać komponent do potoku.
Argumenty dla komponentu trenera mogą zależeć od tego, czy używasz komponentu Transform, czy nie.
- Jeśli NIE używasz komponentu
Transform
, nie musisz zmieniać argumentów. Jeśli używasz komponentu
Transform
, musisz zmienić argumenty podczas tworzenia instancji komponentuTrainer
.- Zmień argument
examples
naexamples=transform.outputs['transformed_examples'],
. Do szkolenia musimy używać przekształconych przykładów. - Dodaj argument
transform_graph
, taki jaktransform_graph=transform.outputs['transform_graph'],
. Ten wykres zawiera wykres TensorFlow dla operacji transformacji. - Po powyższych zmianach kod do tworzenia komponentu Trainera będzie wyglądał następująco.
# If you use a Transform component. trainer = Trainer( run_fn=run_fn, examples=transform.outputs['transformed_examples'], transform_graph=transform.outputs['transform_graph'], schema=schema_gen.outputs['schema'], ...
- Zmień argument
Możesz zaktualizować potok i uruchomić go ponownie.
!tfx pipeline update --engine=local --pipeline_path=local_runner.py \
&& tfx run create --engine=local --pipeline_name={PIPELINE_NAME}
CLI Updating pipeline INFO:absl:Excluding no splits because exclude_splits is not set. INFO:absl:Excluding no splits because exclude_splits is not set. Pipeline "my_pipeline" updated successfully. CLI Creating a run for pipeline: my_pipeline INFO:absl:Excluding no splits because exclude_splits is not set. INFO:absl:Excluding no splits because exclude_splits is not set. INFO:absl:Using deployment config: executor_specs { key: "CsvExampleGen" value { beam_executable_spec { python_executor_spec { class_path: "tfx.components.example_gen.csv_example_gen.executor.Executor" } } } } executor_specs { key: "SchemaGen" value { python_class_executable_spec { class_path: "tfx.components.schema_gen.executor.Executor" } } } executor_specs { key: "StatisticsGen" value { beam_executable_spec { python_executor_spec { class_path: "tfx.components.statistics_gen.executor.Executor" } } } } custom_driver_specs { key: "CsvExampleGen" value { python_class_executable_spec { class_path: "tfx.components.example_gen.driver.FileBasedDriver" } } } metadata_connection_config { database_connection_config { sqlite { filename_uri: "./tfx_metadata/my_pipeline/metadata.db" connection_mode: READWRITE_OPENCREATE } } } INFO:absl:Using connection config: sqlite { filename_uri: "./tfx_metadata/my_pipeline/metadata.db" connection_mode: READWRITE_OPENCREATE } INFO:absl:Component CsvExampleGen is running. INFO:absl:Running launcher for node_info { type { name: "tfx.components.example_gen.csv_example_gen.component.CsvExampleGen" } id: "CsvExampleGen" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:10:00.469382" } } } contexts { type { name: "node" } name { field_value { string_value: "my_pipeline.CsvExampleGen" } } } } outputs { outputs { key: "examples" value { artifact_spec { type { name: "Examples" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } properties { key: "version" value: INT } base_type: DATASET } } } } } parameters { parameters { key: "input_base" value { field_value { string_value: "/home/kbuilder/imported/my_pipeline/data" } } } parameters { key: "input_config" value { field_value { string_value: "{\n \"splits\": [\n {\n \"name\": \"single_split\",\n \"pattern\": \"*\"\n }\n ]\n}" } } } parameters { key: "output_config" value { field_value { string_value: "{\n \"split_config\": {\n \"splits\": [\n {\n \"hash_buckets\": 2,\n \"name\": \"train\"\n },\n {\n \"hash_buckets\": 1,\n \"name\": \"eval\"\n }\n ]\n }\n}" } } } parameters { key: "output_data_format" value { field_value { int_value: 6 } } } parameters { key: "output_file_format" value { field_value { int_value: 5 } } } } downstream_nodes: "StatisticsGen" execution_options { caching_options { } } INFO:absl:MetadataStore with DB connection initialized INFO:absl:select span and version = (0, None) INFO:absl:latest span and version = (0, None) INFO:absl:MetadataStore with DB connection initialized INFO:absl:Going to run a new execution 7 INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=7, input_dict={}, output_dict=defaultdict(<class 'list'>, {'examples': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/7" custom_properties { key: "input_fingerprint" value { string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522" } } custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:10:00.469382:CsvExampleGen:examples:0" } } custom_properties { key: "span" value { int_value: 0 } } , artifact_type: name: "Examples" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } properties { key: "version" value: INT } base_type: DATASET )]}), exec_properties={'input_base': '/home/kbuilder/imported/my_pipeline/data', 'output_config': '{\n "split_config": {\n "splits": [\n {\n "hash_buckets": 2,\n "name": "train"\n },\n {\n "hash_buckets": 1,\n "name": "eval"\n }\n ]\n }\n}', 'output_data_format': 6, 'output_file_format': 5, 'input_config': '{\n "splits": [\n {\n "name": "single_split",\n "pattern": "*"\n }\n ]\n}', 'span': 0, 'version': None, 'input_fingerprint': 'split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522'}, execution_output_uri='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/executor_execution/7/executor_output.pb', stateful_working_dir='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/stateful_working_dir/2022-02-03T11:10:00.469382', tmp_dir='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/executor_execution/7/.temp/', pipeline_node=node_info { type { name: "tfx.components.example_gen.csv_example_gen.component.CsvExampleGen" } id: "CsvExampleGen" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:10:00.469382" } } } contexts { type { name: "node" } name { field_value { string_value: "my_pipeline.CsvExampleGen" } } } } outputs { outputs { key: "examples" value { artifact_spec { type { name: "Examples" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } properties { key: "version" value: INT } base_type: DATASET } } } } } parameters { parameters { key: "input_base" value { field_value { string_value: "/home/kbuilder/imported/my_pipeline/data" } } } parameters { key: "input_config" value { field_value { string_value: "{\n \"splits\": [\n {\n \"name\": \"single_split\",\n \"pattern\": \"*\"\n }\n ]\n}" } } } parameters { key: "output_config" value { field_value { string_value: "{\n \"split_config\": {\n \"splits\": [\n {\n \"hash_buckets\": 2,\n \"name\": \"train\"\n },\n {\n \"hash_buckets\": 1,\n \"name\": \"eval\"\n }\n ]\n }\n}" } } } parameters { key: "output_data_format" value { field_value { int_value: 6 } } } parameters { key: "output_file_format" value { field_value { int_value: 5 } } } } downstream_nodes: "StatisticsGen" execution_options { caching_options { } } , pipeline_info=id: "my_pipeline" , pipeline_run_id='2022-02-03T11:10:00.469382') INFO:absl:Generating examples. INFO:absl:Processing input csv data /home/kbuilder/imported/my_pipeline/data/* to TFExample. WARNING:root:Make sure that locally built Python SDK docker image has Python 3.7 interpreter. E0203 11:10:01.173700221 5436 fork_posix.cc:70] Fork support is only compatible with the epoll1 and poll polling strategies WARNING:apache_beam.io.tfrecordio:Couldn't find python-snappy so the implementation of _TFRecordUtil._masked_crc32c is not as fast as it could be. INFO:absl:Examples generated. INFO:absl:Value type <class 'NoneType'> of key version in exec_properties is not supported, going to drop it INFO:absl:Value type <class 'list'> of key _beam_pipeline_args in exec_properties is not supported, going to drop it INFO:absl:Cleaning up stateless execution info. INFO:absl:Execution 7 succeeded. INFO:absl:Cleaning up stateful execution info. INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'examples': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/7" custom_properties { key: "input_fingerprint" value { string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522" } } custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:10:00.469382:CsvExampleGen:examples:0" } } custom_properties { key: "span" value { int_value: 0 } } custom_properties { key: "tfx_version" value { string_value: "1.6.0" } } , artifact_type: name: "Examples" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } properties { key: "version" value: INT } base_type: DATASET )]}) for execution 7 INFO:absl:MetadataStore with DB connection initialized INFO:absl:Component CsvExampleGen is finished. INFO:absl:Component StatisticsGen is running. INFO:absl:Running launcher for node_info { type { name: "tfx.components.statistics_gen.component.StatisticsGen" base_type: PROCESS } id: "StatisticsGen" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:10:00.469382" } } } contexts { type { name: "node" } name { field_value { string_value: "my_pipeline.StatisticsGen" } } } } inputs { inputs { key: "examples" value { channels { producer_node_query { id: "CsvExampleGen" } context_queries { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } context_queries { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:10:00.469382" } } } context_queries { type { name: "node" } name { field_value { string_value: "my_pipeline.CsvExampleGen" } } } artifact_query { type { name: "Examples" base_type: DATASET } } output_key: "examples" } min_count: 1 } } } outputs { outputs { key: "statistics" value { artifact_spec { type { name: "ExampleStatistics" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } base_type: STATISTICS } } } } } parameters { parameters { key: "exclude_splits" value { field_value { string_value: "[]" } } } } upstream_nodes: "CsvExampleGen" downstream_nodes: "SchemaGen" execution_options { caching_options { } } INFO:absl:MetadataStore with DB connection initialized INFO:absl:MetadataStore with DB connection initialized INFO:absl:Going to run a new execution 8 INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=8, input_dict={'examples': [Artifact(artifact: id: 7 type_id: 15 uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/7" properties { key: "split_names" value { string_value: "[\"train\", \"eval\"]" } } custom_properties { key: "file_format" value { string_value: "tfrecords_gzip" } } custom_properties { key: "input_fingerprint" value { string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522" } } custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:10:00.469382:CsvExampleGen:examples:0" } } custom_properties { key: "payload_format" value { string_value: "FORMAT_TF_EXAMPLE" } } custom_properties { key: "span" value { int_value: 0 } } custom_properties { key: "tfx_version" value { string_value: "1.6.0" } } state: LIVE create_time_since_epoch: 1643886601629 last_update_time_since_epoch: 1643886601629 , artifact_type: id: 15 name: "Examples" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } properties { key: "version" value: INT } base_type: DATASET )]}, output_dict=defaultdict(<class 'list'>, {'statistics': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/8" custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:10:00.469382:StatisticsGen:statistics:0" } } , artifact_type: name: "ExampleStatistics" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } base_type: STATISTICS )]}), exec_properties={'exclude_splits': '[]'}, execution_output_uri='./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/executor_execution/8/executor_output.pb', stateful_working_dir='./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/stateful_working_dir/2022-02-03T11:10:00.469382', tmp_dir='./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/executor_execution/8/.temp/', pipeline_node=node_info { type { name: "tfx.components.statistics_gen.component.StatisticsGen" base_type: PROCESS } id: "StatisticsGen" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:10:00.469382" } } } contexts { type { name: "node" } name { field_value { string_value: "my_pipeline.StatisticsGen" } } } } inputs { inputs { key: "examples" value { channels { producer_node_query { id: "CsvExampleGen" } context_queries { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } context_queries { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:10:00.469382" } } } context_queries { type { name: "node" } name { field_value { string_value: "my_pipeline.CsvExampleGen" } } } artifact_query { type { name: "Examples" base_type: DATASET } } output_key: "examples" } min_count: 1 } } } outputs { outputs { key: "statistics" value { artifact_spec { type { name: "ExampleStatistics" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } base_type: STATISTICS } } } } } parameters { parameters { key: "exclude_splits" value { field_value { string_value: "[]" } } } } upstream_nodes: "CsvExampleGen" downstream_nodes: "SchemaGen" execution_options { caching_options { } } , pipeline_info=id: "my_pipeline" , pipeline_run_id='2022-02-03T11:10:00.469382') INFO:absl:Generating statistics for split train. INFO:absl:Statistics for split train written to ./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/8/Split-train. INFO:absl:Generating statistics for split eval. INFO:absl:Statistics for split eval written to ./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/8/Split-eval. WARNING:root:Make sure that locally built Python SDK docker image has Python 3.7 interpreter. INFO:absl:Cleaning up stateless execution info. INFO:absl:Execution 8 succeeded. INFO:absl:Cleaning up stateful execution info. INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'statistics': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/8" custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:10:00.469382:StatisticsGen:statistics:0" } } custom_properties { key: "tfx_version" value { string_value: "1.6.0" } } , artifact_type: name: "ExampleStatistics" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } base_type: STATISTICS )]}) for execution 8 INFO:absl:MetadataStore with DB connection initialized INFO:absl:Component StatisticsGen is finished. INFO:absl:Component SchemaGen is running. INFO:absl:Running launcher for node_info { type { name: "tfx.components.schema_gen.component.SchemaGen" base_type: PROCESS } id: "SchemaGen" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:10:00.469382" } } } contexts { type { name: "node" } name { field_value { string_value: "my_pipeline.SchemaGen" } } } } inputs { inputs { key: "statistics" value { channels { producer_node_query { id: "StatisticsGen" } context_queries { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } context_queries { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:10:00.469382" } } } context_queries { type { name: "node" } name { field_value { string_value: "my_pipeline.StatisticsGen" } } } artifact_query { type { name: "ExampleStatistics" base_type: STATISTICS } } output_key: "statistics" } min_count: 1 } } } outputs { outputs { key: "schema" value { artifact_spec { type { name: "Schema" } } } } } parameters { parameters { key: "exclude_splits" value { field_value { string_value: "[]" } } } parameters { key: "infer_feature_shape" value { field_value { int_value: 1 } } } } upstream_nodes: "StatisticsGen" execution_options { caching_options { } } INFO:absl:MetadataStore with DB connection initialized INFO:absl:MetadataStore with DB connection initialized INFO:absl:Going to run a new execution 9 INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=9, input_dict={'statistics': [Artifact(artifact: id: 8 type_id: 17 uri: "./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/8" properties { key: "split_names" value { string_value: "[\"train\", \"eval\"]" } } custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:10:00.469382:StatisticsGen:statistics:0" } } custom_properties { key: "tfx_version" value { string_value: "1.6.0" } } state: LIVE create_time_since_epoch: 1643886605023 last_update_time_since_epoch: 1643886605023 , artifact_type: id: 17 name: "ExampleStatistics" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } base_type: STATISTICS )]}, output_dict=defaultdict(<class 'list'>, {'schema': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/SchemaGen/schema/9" custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:10:00.469382:SchemaGen:schema:0" } } , artifact_type: name: "Schema" )]}), exec_properties={'exclude_splits': '[]', 'infer_feature_shape': 1}, execution_output_uri='./tfx_pipeline_output/my_pipeline/SchemaGen/.system/executor_execution/9/executor_output.pb', stateful_working_dir='./tfx_pipeline_output/my_pipeline/SchemaGen/.system/stateful_working_dir/2022-02-03T11:10:00.469382', tmp_dir='./tfx_pipeline_output/my_pipeline/SchemaGen/.system/executor_execution/9/.temp/', pipeline_node=node_info { type { name: "tfx.components.schema_gen.component.SchemaGen" base_type: PROCESS } id: "SchemaGen" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:10:00.469382" } } } contexts { type { name: "node" } name { field_value { string_value: "my_pipeline.SchemaGen" } } } } inputs { inputs { key: "statistics" value { channels { producer_node_query { id: "StatisticsGen" } context_queries { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } context_queries { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:10:00.469382" } } } context_queries { type { name: "node" } name { field_value { string_value: "my_pipeline.StatisticsGen" } } } artifact_query { type { name: "ExampleStatistics" base_type: STATISTICS } } output_key: "statistics" } min_count: 1 } } } outputs { outputs { key: "schema" value { artifact_spec { type { name: "Schema" } } } } } parameters { parameters { key: "exclude_splits" value { field_value { string_value: "[]" } } } parameters { key: "infer_feature_shape" value { field_value { int_value: 1 } } } } upstream_nodes: "StatisticsGen" execution_options { caching_options { } } , pipeline_info=id: "my_pipeline" , pipeline_run_id='2022-02-03T11:10:00.469382') INFO:absl:Processing schema from statistics for split train. INFO:absl:Processing schema from statistics for split eval. INFO:absl:Schema written to ./tfx_pipeline_output/my_pipeline/SchemaGen/schema/9/schema.pbtxt. INFO:absl:Cleaning up stateless execution info. INFO:absl:Execution 9 succeeded. INFO:absl:Cleaning up stateful execution info. INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'schema': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/SchemaGen/schema/9" custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:10:00.469382:SchemaGen:schema:0" } } custom_properties { key: "tfx_version" value { string_value: "1.6.0" } } , artifact_type: name: "Schema" )]}) for execution 9 INFO:absl:MetadataStore with DB connection initialized INFO:absl:Component SchemaGen is finished.
Po pomyślnym wykonaniu tego wykonania utworzyłeś i uruchomiłeś swój pierwszy potok TFX dla swojego modelu. Gratulacje!
Twój nowy model będzie zlokalizowany w jakimś miejscu w katalogu wyjściowym, ale byłoby lepiej mieć model w stałej lokalizacji lub usłudze poza potokiem TFX, który zawiera wiele wyników pośrednich. Jeszcze lepiej dzięki ciągłej ocenie zbudowanego modelu, co ma kluczowe znaczenie w systemach produkcyjnych ML. Zobaczymy, jak ciągła ocena i wdrożenia działają w TFX w następnym kroku.
Krok 5. (Opcjonalnie) Oceń model za pomocą narzędzia Evaluator i opublikuj za pomocą popychacza.
Komponent Evaluator
stale ocenia każdy zbudowany model z programu Trainer
, a Pusher
kopiuje model do predefiniowanej lokalizacji w systemie plików lub nawet do modeli Google Cloud AI Platform .
Dodaje składnik Evaluator do potoku.
W pliku pipeline/pipeline.py
:
-
# components.append(model_resolver)
, aby dodać najnowszy mechanizm rozpoznawania modelu do potoku. Narzędzie Evaluator może służyć do porównania modelu ze starym modelem podstawowym, który przeszedł pomyślnie narzędzie Evaluator w ostatnim przebiegu potoku.LatestBlessedModelResolver
wyszukuje najnowszy model, który przeszedł ocenę. - Ustaw odpowiednią
tfma.MetricsSpec
dla swojego modelu. Ocena może być inna dla każdego modelu ML. W szablonie pingwinaSparseCategoricalAccuracy
, ponieważ rozwiązujemy problem klasyfikacji wielokategorii. Musisz również określićtfma.SliceSpec
, aby przeanalizować model pod kątem określonych wycinków. Aby uzyskać więcej informacji, zobacz Przewodnik po komponentach oceniającego . - Usuń komentarz
# components.append(evaluator)
, aby dodać komponent do potoku.
Możesz zaktualizować potok i uruchomić go ponownie.
# Update and run the pipeline.
!tfx pipeline update --engine=local --pipeline_path=local_runner.py \
&& tfx run create --engine=local --pipeline_name={PIPELINE_NAME}
CLI Updating pipeline INFO:absl:Excluding no splits because exclude_splits is not set. INFO:absl:Excluding no splits because exclude_splits is not set. Pipeline "my_pipeline" updated successfully. CLI Creating a run for pipeline: my_pipeline INFO:absl:Excluding no splits because exclude_splits is not set. INFO:absl:Excluding no splits because exclude_splits is not set. INFO:absl:Using deployment config: executor_specs { key: "CsvExampleGen" value { beam_executable_spec { python_executor_spec { class_path: "tfx.components.example_gen.csv_example_gen.executor.Executor" } } } } executor_specs { key: "SchemaGen" value { python_class_executable_spec { class_path: "tfx.components.schema_gen.executor.Executor" } } } executor_specs { key: "StatisticsGen" value { beam_executable_spec { python_executor_spec { class_path: "tfx.components.statistics_gen.executor.Executor" } } } } custom_driver_specs { key: "CsvExampleGen" value { python_class_executable_spec { class_path: "tfx.components.example_gen.driver.FileBasedDriver" } } } metadata_connection_config { database_connection_config { sqlite { filename_uri: "./tfx_metadata/my_pipeline/metadata.db" connection_mode: READWRITE_OPENCREATE } } } INFO:absl:Using connection config: sqlite { filename_uri: "./tfx_metadata/my_pipeline/metadata.db" connection_mode: READWRITE_OPENCREATE } INFO:absl:Component CsvExampleGen is running. INFO:absl:Running launcher for node_info { type { name: "tfx.components.example_gen.csv_example_gen.component.CsvExampleGen" } id: "CsvExampleGen" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:10:24.358660" } } } contexts { type { name: "node" } name { field_value { string_value: "my_pipeline.CsvExampleGen" } } } } outputs { outputs { key: "examples" value { artifact_spec { type { name: "Examples" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } properties { key: "version" value: INT } base_type: DATASET } } } } } parameters { parameters { key: "input_base" value { field_value { string_value: "/home/kbuilder/imported/my_pipeline/data" } } } parameters { key: "input_config" value { field_value { string_value: "{\n \"splits\": [\n {\n \"name\": \"single_split\",\n \"pattern\": \"*\"\n }\n ]\n}" } } } parameters { key: "output_config" value { field_value { string_value: "{\n \"split_config\": {\n \"splits\": [\n {\n \"hash_buckets\": 2,\n \"name\": \"train\"\n },\n {\n \"hash_buckets\": 1,\n \"name\": \"eval\"\n }\n ]\n }\n}" } } } parameters { key: "output_data_format" value { field_value { int_value: 6 } } } parameters { key: "output_file_format" value { field_value { int_value: 5 } } } } downstream_nodes: "StatisticsGen" execution_options { caching_options { } } INFO:absl:MetadataStore with DB connection initialized INFO:absl:select span and version = (0, None) INFO:absl:latest span and version = (0, None) INFO:absl:MetadataStore with DB connection initialized INFO:absl:Going to run a new execution 10 INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=10, input_dict={}, output_dict=defaultdict(<class 'list'>, {'examples': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/10" custom_properties { key: "input_fingerprint" value { string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522" } } custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:10:24.358660:CsvExampleGen:examples:0" } } custom_properties { key: "span" value { int_value: 0 } } , artifact_type: name: "Examples" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } properties { key: "version" value: INT } base_type: DATASET )]}), exec_properties={'output_file_format': 5, 'output_config': '{\n "split_config": {\n "splits": [\n {\n "hash_buckets": 2,\n "name": "train"\n },\n {\n "hash_buckets": 1,\n "name": "eval"\n }\n ]\n }\n}', 'output_data_format': 6, 'input_base': '/home/kbuilder/imported/my_pipeline/data', 'input_config': '{\n "splits": [\n {\n "name": "single_split",\n "pattern": "*"\n }\n ]\n}', 'span': 0, 'version': None, 'input_fingerprint': 'split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522'}, execution_output_uri='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/executor_execution/10/executor_output.pb', stateful_working_dir='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/stateful_working_dir/2022-02-03T11:10:24.358660', tmp_dir='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/executor_execution/10/.temp/', pipeline_node=node_info { type { name: "tfx.components.example_gen.csv_example_gen.component.CsvExampleGen" } id: "CsvExampleGen" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:10:24.358660" } } } contexts { type { name: "node" } name { field_value { string_value: "my_pipeline.CsvExampleGen" } } } } outputs { outputs { key: "examples" value { artifact_spec { type { name: "Examples" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } properties { key: "version" value: INT } base_type: DATASET } } } } } parameters { parameters { key: "input_base" value { field_value { string_value: "/home/kbuilder/imported/my_pipeline/data" } } } parameters { key: "input_config" value { field_value { string_value: "{\n \"splits\": [\n {\n \"name\": \"single_split\",\n \"pattern\": \"*\"\n }\n ]\n}" } } } parameters { key: "output_config" value { field_value { string_value: "{\n \"split_config\": {\n \"splits\": [\n {\n \"hash_buckets\": 2,\n \"name\": \"train\"\n },\n {\n \"hash_buckets\": 1,\n \"name\": \"eval\"\n }\n ]\n }\n}" } } } parameters { key: "output_data_format" value { field_value { int_value: 6 } } } parameters { key: "output_file_format" value { field_value { int_value: 5 } } } } downstream_nodes: "StatisticsGen" execution_options { caching_options { } } , pipeline_info=id: "my_pipeline" , pipeline_run_id='2022-02-03T11:10:24.358660') INFO:absl:Generating examples. INFO:absl:Processing input csv data /home/kbuilder/imported/my_pipeline/data/* to TFExample. WARNING:root:Make sure that locally built Python SDK docker image has Python 3.7 interpreter. E0203 11:10:24.894390124 5584 fork_posix.cc:70] Fork support is only compatible with the epoll1 and poll polling strategies WARNING:apache_beam.io.tfrecordio:Couldn't find python-snappy so the implementation of _TFRecordUtil._masked_crc32c is not as fast as it could be. INFO:absl:Examples generated. INFO:absl:Value type <class 'NoneType'> of key version in exec_properties is not supported, going to drop it INFO:absl:Value type <class 'list'> of key _beam_pipeline_args in exec_properties is not supported, going to drop it INFO:absl:Cleaning up stateless execution info. INFO:absl:Execution 10 succeeded. INFO:absl:Cleaning up stateful execution info. INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'examples': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/10" custom_properties { key: "input_fingerprint" value { string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522" } } custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:10:24.358660:CsvExampleGen:examples:0" } } custom_properties { key: "span" value { int_value: 0 } } custom_properties { key: "tfx_version" value { string_value: "1.6.0" } } , artifact_type: name: "Examples" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } properties { key: "version" value: INT } base_type: DATASET )]}) for execution 10 INFO:absl:MetadataStore with DB connection initialized INFO:absl:Component CsvExampleGen is finished. INFO:absl:Component StatisticsGen is running. INFO:absl:Running launcher for node_info { type { name: "tfx.components.statistics_gen.component.StatisticsGen" base_type: PROCESS } id: "StatisticsGen" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:10:24.358660" } } } contexts { type { name: "node" } name { field_value { string_value: "my_pipeline.StatisticsGen" } } } } inputs { inputs { key: "examples" value { channels { producer_node_query { id: "CsvExampleGen" } context_queries { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } context_queries { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:10:24.358660" } } } context_queries { type { name: "node" } name { field_value { string_value: "my_pipeline.CsvExampleGen" } } } artifact_query { type { name: "Examples" base_type: DATASET } } output_key: "examples" } min_count: 1 } } } outputs { outputs { key: "statistics" value { artifact_spec { type { name: "ExampleStatistics" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } base_type: STATISTICS } } } } } parameters { parameters { key: "exclude_splits" value { field_value { string_value: "[]" } } } } upstream_nodes: "CsvExampleGen" downstream_nodes: "SchemaGen" execution_options { caching_options { } } INFO:absl:MetadataStore with DB connection initialized INFO:absl:MetadataStore with DB connection initialized INFO:absl:Going to run a new execution 11 INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=11, input_dict={'examples': [Artifact(artifact: id: 10 type_id: 15 uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/10" properties { key: "split_names" value { string_value: "[\"train\", \"eval\"]" } } custom_properties { key: "file_format" value { string_value: "tfrecords_gzip" } } custom_properties { key: "input_fingerprint" value { string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522" } } custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:10:24.358660:CsvExampleGen:examples:0" } } custom_properties { key: "payload_format" value { string_value: "FORMAT_TF_EXAMPLE" } } custom_properties { key: "span" value { int_value: 0 } } custom_properties { key: "tfx_version" value { string_value: "1.6.0" } } state: LIVE create_time_since_epoch: 1643886625515 last_update_time_since_epoch: 1643886625515 , artifact_type: id: 15 name: "Examples" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } properties { key: "version" value: INT } base_type: DATASET )]}, output_dict=defaultdict(<class 'list'>, {'statistics': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/11" custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:10:24.358660:StatisticsGen:statistics:0" } } , artifact_type: name: "ExampleStatistics" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } base_type: STATISTICS )]}), exec_properties={'exclude_splits': '[]'}, execution_output_uri='./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/executor_execution/11/executor_output.pb', stateful_working_dir='./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/stateful_working_dir/2022-02-03T11:10:24.358660', tmp_dir='./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/executor_execution/11/.temp/', pipeline_node=node_info { type { name: "tfx.components.statistics_gen.component.StatisticsGen" base_type: PROCESS } id: "StatisticsGen" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:10:24.358660" } } } contexts { type { name: "node" } name { field_value { string_value: "my_pipeline.StatisticsGen" } } } } inputs { inputs { key: "examples" value { channels { producer_node_query { id: "CsvExampleGen" } context_queries { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } context_queries { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:10:24.358660" } } } context_queries { type { name: "node" } name { field_value { string_value: "my_pipeline.CsvExampleGen" } } } artifact_query { type { name: "Examples" base_type: DATASET } } output_key: "examples" } min_count: 1 } } } outputs { outputs { key: "statistics" value { artifact_spec { type { name: "ExampleStatistics" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } base_type: STATISTICS } } } } } parameters { parameters { key: "exclude_splits" value { field_value { string_value: "[]" } } } } upstream_nodes: "CsvExampleGen" downstream_nodes: "SchemaGen" execution_options { caching_options { } } , pipeline_info=id: "my_pipeline" , pipeline_run_id='2022-02-03T11:10:24.358660') INFO:absl:Generating statistics for split train. INFO:absl:Statistics for split train written to ./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/11/Split-train. INFO:absl:Generating statistics for split eval. INFO:absl:Statistics for split eval written to ./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/11/Split-eval. WARNING:root:Make sure that locally built Python SDK docker image has Python 3.7 interpreter. INFO:absl:Cleaning up stateless execution info. INFO:absl:Execution 11 succeeded. INFO:absl:Cleaning up stateful execution info. INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'statistics': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/11" custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:10:24.358660:StatisticsGen:statistics:0" } } custom_properties { key: "tfx_version" value { string_value: "1.6.0" } } , artifact_type: name: "ExampleStatistics" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } base_type: STATISTICS )]}) for execution 11 INFO:absl:MetadataStore with DB connection initialized INFO:absl:Component StatisticsGen is finished. INFO:absl:Component SchemaGen is running. INFO:absl:Running launcher for node_info { type { name: "tfx.components.schema_gen.component.SchemaGen" base_type: PROCESS } id: "SchemaGen" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:10:24.358660" } } } contexts { type { name: "node" } name { field_value { string_value: "my_pipeline.SchemaGen" } } } } inputs { inputs { key: "statistics" value { channels { producer_node_query { id: "StatisticsGen" } context_queries { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } context_queries { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:10:24.358660" } } } context_queries { type { name: "node" } name { field_value { string_value: "my_pipeline.StatisticsGen" } } } artifact_query { type { name: "ExampleStatistics" base_type: STATISTICS } } output_key: "statistics" } min_count: 1 } } } outputs { outputs { key: "schema" value { artifact_spec { type { name: "Schema" } } } } } parameters { parameters { key: "exclude_splits" value { field_value { string_value: "[]" } } } parameters { key: "infer_feature_shape" value { field_value { int_value: 1 } } } } upstream_nodes: "StatisticsGen" execution_options { caching_options { } } INFO:absl:MetadataStore with DB connection initialized INFO:absl:MetadataStore with DB connection initialized INFO:absl:Going to run a new execution 12 INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=12, input_dict={'statistics': [Artifact(artifact: id: 11 type_id: 17 uri: "./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/11" properties { key: "split_names" value { string_value: "[\"train\", \"eval\"]" } } custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:10:24.358660:StatisticsGen:statistics:0" } } custom_properties { key: "tfx_version" value { string_value: "1.6.0" } } state: LIVE create_time_since_epoch: 1643886628941 last_update_time_since_epoch: 1643886628941 , artifact_type: id: 17 name: "ExampleStatistics" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } base_type: STATISTICS )]}, output_dict=defaultdict(<class 'list'>, {'schema': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/SchemaGen/schema/12" custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:10:24.358660:SchemaGen:schema:0" } } , artifact_type: name: "Schema" )]}), exec_properties={'infer_feature_shape': 1, 'exclude_splits': '[]'}, execution_output_uri='./tfx_pipeline_output/my_pipeline/SchemaGen/.system/executor_execution/12/executor_output.pb', stateful_working_dir='./tfx_pipeline_output/my_pipeline/SchemaGen/.system/stateful_working_dir/2022-02-03T11:10:24.358660', tmp_dir='./tfx_pipeline_output/my_pipeline/SchemaGen/.system/executor_execution/12/.temp/', pipeline_node=node_info { type { name: "tfx.components.schema_gen.component.SchemaGen" base_type: PROCESS } id: "SchemaGen" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:10:24.358660" } } } contexts { type { name: "node" } name { field_value { string_value: "my_pipeline.SchemaGen" } } } } inputs { inputs { key: "statistics" value { channels { producer_node_query { id: "StatisticsGen" } context_queries { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } context_queries { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:10:24.358660" } } } context_queries { type { name: "node" } name { field_value { string_value: "my_pipeline.StatisticsGen" } } } artifact_query { type { name: "ExampleStatistics" base_type: STATISTICS } } output_key: "statistics" } min_count: 1 } } } outputs { outputs { key: "schema" value { artifact_spec { type { name: "Schema" } } } } } parameters { parameters { key: "exclude_splits" value { field_value { string_value: "[]" } } } parameters { key: "infer_feature_shape" value { field_value { int_value: 1 } } } } upstream_nodes: "StatisticsGen" execution_options { caching_options { } } , pipeline_info=id: "my_pipeline" , pipeline_run_id='2022-02-03T11:10:24.358660') INFO:absl:Processing schema from statistics for split train. INFO:absl:Processing schema from statistics for split eval. INFO:absl:Schema written to ./tfx_pipeline_output/my_pipeline/SchemaGen/schema/12/schema.pbtxt. INFO:absl:Cleaning up stateless execution info. INFO:absl:Execution 12 succeeded. INFO:absl:Cleaning up stateful execution info. INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'schema': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/SchemaGen/schema/12" custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:10:24.358660:SchemaGen:schema:0" } } custom_properties { key: "tfx_version" value { string_value: "1.6.0" } } , artifact_type: name: "Schema" )]}) for execution 12 INFO:absl:MetadataStore with DB connection initialized INFO:absl:Component SchemaGen is finished.
Zbadaj wyniki Ewaluatora
Ten krok wymaga rozszerzenia notatnika TensorFlow Model Analysis (TFMA) Jupyter. Pamiętaj, że wersja rozszerzenia notebooka TFMA powinna być identyczna z wersją pakietu TFMA Python.
Następujące polecenie zainstaluje rozszerzenie notebooka TFMA z rejestru NPM. Może to potrwać kilka minut.
# Install TFMA notebook extension.
jupyter labextension install tensorflow_model_analysis@{tfma.__version__}
usage: jupyter [-h] [--version] [--config-dir] [--data-dir] [--runtime-dir] [--paths] [--json] [--debug] [subcommand] Jupyter: Interactive Computing positional arguments: subcommand the subcommand to launch optional arguments: -h, --help show this help message and exit --version show the versions of core jupyter packages and exit --config-dir show Jupyter config dir --data-dir show Jupyter data dir --runtime-dir show Jupyter runtime dir --paths show all Jupyter paths. Add --json for machine-readable format. --json output paths as machine-readable json --debug output debug information about paths Available subcommands: bundlerextension console dejavu execute kernel kernelspec migrate nbconvert nbextension notebook qtconsole run serverextension troubleshoot trust Jupyter command `jupyter-labextension` not found.
Jeśli instalacja została zakończona, załaduj ponownie przeglądarkę , aby rozszerzenie zaczęło działać.
with metadata.Metadata(metadata_connection_config) as metadata_handler:
# Search all aritfacts from the previous pipeline run.
artifacts = get_latest_artifacts(metadata_handler.store, PIPELINE_NAME)
model_evaluation_artifacts = find_latest_artifacts_by_type(
metadata_handler.store, artifacts,
standard_artifacts.ModelEvaluation.TYPE_NAME)
if model_evaluation_artifacts:
tfma_result = tfma.load_eval_result(model_evaluation_artifacts[0].uri)
tfma.view.render_slicing_metrics(tfma_result)
Dodaje składnik Pusher do potoku.
Jeśli model wygląda obiecująco, musimy go opublikować. Komponent Pusher może opublikować model w lokalizacji w systemie plików lub w modelach GCP AI Platform przy użyciu niestandardowego modułu wykonawczego .
Komponent Evaluator
stale ocenia każdy zbudowany model z programu Trainer
, a Pusher
kopiuje model do predefiniowanej lokalizacji w systemie plików lub nawet do modeli Google Cloud AI Platform .
- W
local_runner.py
ustawSERVING_MODEL_DIR
na katalog do opublikowania. - W pliku
pipeline/pipeline.py
usuń komentarz# components.append(pusher)
, aby dodać Pusher do potoku.
Możesz zaktualizować potok i uruchomić go ponownie.
# Update and run the pipeline.
!tfx pipeline update --engine=local --pipeline_path=local_runner.py \
&& tfx run create --engine=local --pipeline_name={PIPELINE_NAME}
CLI Updating pipeline INFO:absl:Excluding no splits because exclude_splits is not set. INFO:absl:Excluding no splits because exclude_splits is not set. Pipeline "my_pipeline" updated successfully. CLI Creating a run for pipeline: my_pipeline INFO:absl:Excluding no splits because exclude_splits is not set. INFO:absl:Excluding no splits because exclude_splits is not set. INFO:absl:Using deployment config: executor_specs { key: "CsvExampleGen" value { beam_executable_spec { python_executor_spec { class_path: "tfx.components.example_gen.csv_example_gen.executor.Executor" } } } } executor_specs { key: "SchemaGen" value { python_class_executable_spec { class_path: "tfx.components.schema_gen.executor.Executor" } } } executor_specs { key: "StatisticsGen" value { beam_executable_spec { python_executor_spec { class_path: "tfx.components.statistics_gen.executor.Executor" } } } } custom_driver_specs { key: "CsvExampleGen" value { python_class_executable_spec { class_path: "tfx.components.example_gen.driver.FileBasedDriver" } } } metadata_connection_config { database_connection_config { sqlite { filename_uri: "./tfx_metadata/my_pipeline/metadata.db" connection_mode: READWRITE_OPENCREATE } } } INFO:absl:Using connection config: sqlite { filename_uri: "./tfx_metadata/my_pipeline/metadata.db" connection_mode: READWRITE_OPENCREATE } INFO:absl:Component CsvExampleGen is running. INFO:absl:Running launcher for node_info { type { name: "tfx.components.example_gen.csv_example_gen.component.CsvExampleGen" } id: "CsvExampleGen" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:10:48.556314" } } } contexts { type { name: "node" } name { field_value { string_value: "my_pipeline.CsvExampleGen" } } } } outputs { outputs { key: "examples" value { artifact_spec { type { name: "Examples" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } properties { key: "version" value: INT } base_type: DATASET } } } } } parameters { parameters { key: "input_base" value { field_value { string_value: "/home/kbuilder/imported/my_pipeline/data" } } } parameters { key: "input_config" value { field_value { string_value: "{\n \"splits\": [\n {\n \"name\": \"single_split\",\n \"pattern\": \"*\"\n }\n ]\n}" } } } parameters { key: "output_config" value { field_value { string_value: "{\n \"split_config\": {\n \"splits\": [\n {\n \"hash_buckets\": 2,\n \"name\": \"train\"\n },\n {\n \"hash_buckets\": 1,\n \"name\": \"eval\"\n }\n ]\n }\n}" } } } parameters { key: "output_data_format" value { field_value { int_value: 6 } } } parameters { key: "output_file_format" value { field_value { int_value: 5 } } } } downstream_nodes: "StatisticsGen" execution_options { caching_options { } } INFO:absl:MetadataStore with DB connection initialized INFO:absl:select span and version = (0, None) INFO:absl:latest span and version = (0, None) INFO:absl:MetadataStore with DB connection initialized INFO:absl:Going to run a new execution 13 INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=13, input_dict={}, output_dict=defaultdict(<class 'list'>, {'examples': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/13" custom_properties { key: "input_fingerprint" value { string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522" } } custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:10:48.556314:CsvExampleGen:examples:0" } } custom_properties { key: "span" value { int_value: 0 } } , artifact_type: name: "Examples" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } properties { key: "version" value: INT } base_type: DATASET )]}), exec_properties={'input_config': '{\n "splits": [\n {\n "name": "single_split",\n "pattern": "*"\n }\n ]\n}', 'output_config': '{\n "split_config": {\n "splits": [\n {\n "hash_buckets": 2,\n "name": "train"\n },\n {\n "hash_buckets": 1,\n "name": "eval"\n }\n ]\n }\n}', 'output_file_format': 5, 'output_data_format': 6, 'input_base': '/home/kbuilder/imported/my_pipeline/data', 'span': 0, 'version': None, 'input_fingerprint': 'split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522'}, execution_output_uri='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/executor_execution/13/executor_output.pb', stateful_working_dir='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/stateful_working_dir/2022-02-03T11:10:48.556314', tmp_dir='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/executor_execution/13/.temp/', pipeline_node=node_info { type { name: "tfx.components.example_gen.csv_example_gen.component.CsvExampleGen" } id: "CsvExampleGen" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:10:48.556314" } } } contexts { type { name: "node" } name { field_value { string_value: "my_pipeline.CsvExampleGen" } } } } outputs { outputs { key: "examples" value { artifact_spec { type { name: "Examples" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } properties { key: "version" value: INT } base_type: DATASET } } } } } parameters { parameters { key: "input_base" value { field_value { string_value: "/home/kbuilder/imported/my_pipeline/data" } } } parameters { key: "input_config" value { field_value { string_value: "{\n \"splits\": [\n {\n \"name\": \"single_split\",\n \"pattern\": \"*\"\n }\n ]\n}" } } } parameters { key: "output_config" value { field_value { string_value: "{\n \"split_config\": {\n \"splits\": [\n {\n \"hash_buckets\": 2,\n \"name\": \"train\"\n },\n {\n \"hash_buckets\": 1,\n \"name\": \"eval\"\n }\n ]\n }\n}" } } } parameters { key: "output_data_format" value { field_value { int_value: 6 } } } parameters { key: "output_file_format" value { field_value { int_value: 5 } } } } downstream_nodes: "StatisticsGen" execution_options { caching_options { } } , pipeline_info=id: "my_pipeline" , pipeline_run_id='2022-02-03T11:10:48.556314') INFO:absl:Generating examples. INFO:absl:Processing input csv data /home/kbuilder/imported/my_pipeline/data/* to TFExample. WARNING:root:Make sure that locally built Python SDK docker image has Python 3.7 interpreter. E0203 11:10:49.163841363 5734 fork_posix.cc:70] Fork support is only compatible with the epoll1 and poll polling strategies WARNING:apache_beam.io.tfrecordio:Couldn't find python-snappy so the implementation of _TFRecordUtil._masked_crc32c is not as fast as it could be. INFO:absl:Examples generated. INFO:absl:Value type <class 'NoneType'> of key version in exec_properties is not supported, going to drop it INFO:absl:Value type <class 'list'> of key _beam_pipeline_args in exec_properties is not supported, going to drop it INFO:absl:Cleaning up stateless execution info. INFO:absl:Execution 13 succeeded. INFO:absl:Cleaning up stateful execution info. INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'examples': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/13" custom_properties { key: "input_fingerprint" value { string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522" } } custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:10:48.556314:CsvExampleGen:examples:0" } } custom_properties { key: "span" value { int_value: 0 } } custom_properties { key: "tfx_version" value { string_value: "1.6.0" } } , artifact_type: name: "Examples" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } properties { key: "version" value: INT } base_type: DATASET )]}) for execution 13 INFO:absl:MetadataStore with DB connection initialized INFO:absl:Component CsvExampleGen is finished. INFO:absl:Component StatisticsGen is running. INFO:absl:Running launcher for node_info { type { name: "tfx.components.statistics_gen.component.StatisticsGen" base_type: PROCESS } id: "StatisticsGen" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:10:48.556314" } } } contexts { type { name: "node" } name { field_value { string_value: "my_pipeline.StatisticsGen" } } } } inputs { inputs { key: "examples" value { channels { producer_node_query { id: "CsvExampleGen" } context_queries { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } context_queries { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:10:48.556314" } } } context_queries { type { name: "node" } name { field_value { string_value: "my_pipeline.CsvExampleGen" } } } artifact_query { type { name: "Examples" base_type: DATASET } } output_key: "examples" } min_count: 1 } } } outputs { outputs { key: "statistics" value { artifact_spec { type { name: "ExampleStatistics" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } base_type: STATISTICS } } } } } parameters { parameters { key: "exclude_splits" value { field_value { string_value: "[]" } } } } upstream_nodes: "CsvExampleGen" downstream_nodes: "SchemaGen" execution_options { caching_options { } } INFO:absl:MetadataStore with DB connection initialized INFO:absl:MetadataStore with DB connection initialized INFO:absl:Going to run a new execution 14 INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=14, input_dict={'examples': [Artifact(artifact: id: 13 type_id: 15 uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/13" properties { key: "split_names" value { string_value: "[\"train\", \"eval\"]" } } custom_properties { key: "file_format" value { string_value: "tfrecords_gzip" } } custom_properties { key: "input_fingerprint" value { string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522" } } custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:10:48.556314:CsvExampleGen:examples:0" } } custom_properties { key: "payload_format" value { string_value: "FORMAT_TF_EXAMPLE" } } custom_properties { key: "span" value { int_value: 0 } } custom_properties { key: "tfx_version" value { string_value: "1.6.0" } } state: LIVE create_time_since_epoch: 1643886649739 last_update_time_since_epoch: 1643886649739 , artifact_type: id: 15 name: "Examples" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } properties { key: "version" value: INT } base_type: DATASET )]}, output_dict=defaultdict(<class 'list'>, {'statistics': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/14" custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:10:48.556314:StatisticsGen:statistics:0" } } , artifact_type: name: "ExampleStatistics" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } base_type: STATISTICS )]}), exec_properties={'exclude_splits': '[]'}, execution_output_uri='./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/executor_execution/14/executor_output.pb', stateful_working_dir='./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/stateful_working_dir/2022-02-03T11:10:48.556314', tmp_dir='./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/executor_execution/14/.temp/', pipeline_node=node_info { type { name: "tfx.components.statistics_gen.component.StatisticsGen" base_type: PROCESS } id: "StatisticsGen" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:10:48.556314" } } } contexts { type { name: "node" } name { field_value { string_value: "my_pipeline.StatisticsGen" } } } } inputs { inputs { key: "examples" value { channels { producer_node_query { id: "CsvExampleGen" } context_queries { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } context_queries { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:10:48.556314" } } } context_queries { type { name: "node" } name { field_value { string_value: "my_pipeline.CsvExampleGen" } } } artifact_query { type { name: "Examples" base_type: DATASET } } output_key: "examples" } min_count: 1 } } } outputs { outputs { key: "statistics" value { artifact_spec { type { name: "ExampleStatistics" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } base_type: STATISTICS } } } } } parameters { parameters { key: "exclude_splits" value { field_value { string_value: "[]" } } } } upstream_nodes: "CsvExampleGen" downstream_nodes: "SchemaGen" execution_options { caching_options { } } , pipeline_info=id: "my_pipeline" , pipeline_run_id='2022-02-03T11:10:48.556314') INFO:absl:Generating statistics for split train. INFO:absl:Statistics for split train written to ./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/14/Split-train. INFO:absl:Generating statistics for split eval. INFO:absl:Statistics for split eval written to ./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/14/Split-eval. WARNING:root:Make sure that locally built Python SDK docker image has Python 3.7 interpreter. INFO:absl:Cleaning up stateless execution info. INFO:absl:Execution 14 succeeded. INFO:absl:Cleaning up stateful execution info. INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'statistics': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/14" custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:10:48.556314:StatisticsGen:statistics:0" } } custom_properties { key: "tfx_version" value { string_value: "1.6.0" } } , artifact_type: name: "ExampleStatistics" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } base_type: STATISTICS )]}) for execution 14 INFO:absl:MetadataStore with DB connection initialized INFO:absl:Component StatisticsGen is finished. INFO:absl:Component SchemaGen is running. INFO:absl:Running launcher for node_info { type { name: "tfx.components.schema_gen.component.SchemaGen" base_type: PROCESS } id: "SchemaGen" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:10:48.556314" } } } contexts { type { name: "node" } name { field_value { string_value: "my_pipeline.SchemaGen" } } } } inputs { inputs { key: "statistics" value { channels { producer_node_query { id: "StatisticsGen" } context_queries { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } context_queries { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:10:48.556314" } } } context_queries { type { name: "node" } name { field_value { string_value: "my_pipeline.StatisticsGen" } } } artifact_query { type { name: "ExampleStatistics" base_type: STATISTICS } } output_key: "statistics" } min_count: 1 } } } outputs { outputs { key: "schema" value { artifact_spec { type { name: "Schema" } } } } } parameters { parameters { key: "exclude_splits" value { field_value { string_value: "[]" } } } parameters { key: "infer_feature_shape" value { field_value { int_value: 1 } } } } upstream_nodes: "StatisticsGen" execution_options { caching_options { } } INFO:absl:MetadataStore with DB connection initialized INFO:absl:MetadataStore with DB connection initialized INFO:absl:Going to run a new execution 15 INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=15, input_dict={'statistics': [Artifact(artifact: id: 14 type_id: 17 uri: "./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/14" properties { key: "split_names" value { string_value: "[\"train\", \"eval\"]" } } custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:10:48.556314:StatisticsGen:statistics:0" } } custom_properties { key: "tfx_version" value { string_value: "1.6.0" } } state: LIVE create_time_since_epoch: 1643886653128 last_update_time_since_epoch: 1643886653128 , artifact_type: id: 17 name: "ExampleStatistics" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } base_type: STATISTICS )]}, output_dict=defaultdict(<class 'list'>, {'schema': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/SchemaGen/schema/15" custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:10:48.556314:SchemaGen:schema:0" } } , artifact_type: name: "Schema" )]}), exec_properties={'infer_feature_shape': 1, 'exclude_splits': '[]'}, execution_output_uri='./tfx_pipeline_output/my_pipeline/SchemaGen/.system/executor_execution/15/executor_output.pb', stateful_working_dir='./tfx_pipeline_output/my_pipeline/SchemaGen/.system/stateful_working_dir/2022-02-03T11:10:48.556314', tmp_dir='./tfx_pipeline_output/my_pipeline/SchemaGen/.system/executor_execution/15/.temp/', pipeline_node=node_info { type { name: "tfx.components.schema_gen.component.SchemaGen" base_type: PROCESS } id: "SchemaGen" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:10:48.556314" } } } contexts { type { name: "node" } name { field_value { string_value: "my_pipeline.SchemaGen" } } } } inputs { inputs { key: "statistics" value { channels { producer_node_query { id: "StatisticsGen" } context_queries { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } context_queries { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:10:48.556314" } } } context_queries { type { name: "node" } name { field_value { string_value: "my_pipeline.StatisticsGen" } } } artifact_query { type { name: "ExampleStatistics" base_type: STATISTICS } } output_key: "statistics" } min_count: 1 } } } outputs { outputs { key: "schema" value { artifact_spec { type { name: "Schema" } } } } } parameters { parameters { key: "exclude_splits" value { field_value { string_value: "[]" } } } parameters { key: "infer_feature_shape" value { field_value { int_value: 1 } } } } upstream_nodes: "StatisticsGen" execution_options { caching_options { } } , pipeline_info=id: "my_pipeline" , pipeline_run_id='2022-02-03T11:10:48.556314') INFO:absl:Processing schema from statistics for split train. INFO:absl:Processing schema from statistics for split eval. INFO:absl:Schema written to ./tfx_pipeline_output/my_pipeline/SchemaGen/schema/15/schema.pbtxt. INFO:absl:Cleaning up stateless execution info. INFO:absl:Execution 15 succeeded. INFO:absl:Cleaning up stateful execution info. INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'schema': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/SchemaGen/schema/15" custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:10:48.556314:SchemaGen:schema:0" } } custom_properties { key: "tfx_version" value { string_value: "1.6.0" } } , artifact_type: name: "Schema" )]}) for execution 15 INFO:absl:MetadataStore with DB connection initialized INFO:absl:Component SchemaGen is finished.
Powinieneś być w stanie znaleźć swój nowy model pod adresem SERVING_MODEL_DIR
.
Krok 6. (Opcjonalnie) Wdróż swój potok w Kubeflow Pipelines w GCP.
Jak wspomniano wcześniej, local_runner.py
jest dobry do debugowania lub celów programistycznych, ale nie jest najlepszym rozwiązaniem dla obciążeń produkcyjnych. W tym kroku wdrożymy potok w Kubeflow Pipelines w Google Cloud.
Przygotowanie
Potrzebujemy pakietu kfp
python i programu skaffold
, aby wdrożyć potok w klastrze Kubeflow Pipelines.
pip install --upgrade -q kfp
# Download skaffold and set it executable.
curl -Lo skaffold https://storage.googleapis.com/skaffold/releases/latest/skaffold-linux-amd64 && chmod +x skaffold
Musisz przenieść plik binarny skaffold
do miejsca, w którym twoja powłoka może go znaleźć. Lub możesz określić ścieżkę do skaffold podczas uruchamiania pliku binarnego tfx
z --skaffold-cmd
.
# Move skaffold binary into your path
mv skaffold /home/jupyter/.local/bin/
mv: cannot move 'skaffold' to '/home/jupyter/.local/bin/': No such file or directory
Do uruchomienia potoku potrzebny jest również klaster Kubeflow Pipelines. Wykonaj kroki 1 i 2 w samouczku TFX on Cloud AI Platform Pipelines .
Gdy klaster będzie gotowy, otwórz pulpit nawigacyjny potoku, klikając opcję Otwórz pulpit nawigacyjny potoków na stronie Pipelines
w konsoli Google Cloud . Adres URL tej strony to ENDPOINT
, aby zażądać uruchomienia potoku. Wartość punktu końcowego to wszystko w adresie URL po https://, aż do googleusercontent.com włącznie. Umieść swój punkt końcowy w następującym bloku kodu.
ENDPOINT='' # Enter your ENDPOINT here.
Aby uruchomić nasz kod w klastrze Kubeflow Pipelines, musimy spakować nasz kod do obrazu kontenera. Obraz zostanie zbudowany automatycznie podczas wdrażania naszego potoku, a wystarczy ustawić nazwę i rejestr kontenerów dla obrazu. W naszym przykładzie użyjemy rejestru kontenerów Google i nazwiemy go tfx-pipeline
.
# Read GCP project id from env.
shell_output=!gcloud config list --format 'value(core.project)' 2>/dev/null
GOOGLE_CLOUD_PROJECT=shell_output[0]
# Docker image name for the pipeline image.
CUSTOM_TFX_IMAGE='gcr.io/' + GOOGLE_CLOUD_PROJECT + '/tfx-pipeline'
Ustaw lokalizację danych.
Twoje dane powinny być dostępne z klastra Kubeflow Pipelines. Jeśli używałeś danych w swoim środowisku lokalnym, może być konieczne przesłanie ich do pamięci zdalnej, takiej jak Google Cloud Storage. Na przykład możemy przesłać dane pingwinów do domyślnego zasobnika, który jest tworzony automatycznie po wdrożeniu klastra Kubeflow Pipelines, jak poniżej.
gsutil cp data/data.csv gs://{GOOGLE_CLOUD_PROJECT}-kubeflowpipelines-default/tfx-template/data/penguin/
Copying file://data/data.csv [Content-Type=text/csv]... NotFoundException: 404 The destination bucket gs://tf-benchmark-dashboard-kubeflowpipelines-default does not exist or the write to the destination must be restarted
Zaktualizuj lokalizację danych przechowywaną w DATA_PATH
w kubeflow_runner.py
.
Jeśli używasz BigQueryExampleGen, nie musisz przesyłać pliku danych, ale upewnij się, że kubeflow_runner.py
używa tego samego query
i argumentu beam_pipeline_args
dla funkcji pipeline.create_pipeline()
.
Wdróż potok.
Jeśli wszystko jest gotowe, możesz utworzyć potok za pomocą polecenia tfx pipeline create
.
!tfx pipeline create \
--engine=kubeflow \
--pipeline-path=kubeflow_runner.py \
--endpoint={ENDPOINT} \
--build-target-image={CUSTOM_TFX_IMAGE}
CLI [Error] --build-target-image flag was DELETED. You should specify the build target image at the `KubeflowDagRunnerConfig` class instead, and use --build-image flag without argument to build a container image when creating or updating a pipeline.
Teraz rozpocznij wykonanie z nowo utworzonym potoku za pomocą polecenia tfx run create
.
tfx run create --engine=kubeflow --pipeline-name={PIPELINE_NAME} --endpoint={ENDPOINT}
CLI Creating a run for pipeline: my_pipeline Failed to load kube config. Traceback (most recent call last): File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/urllib3/connection.py", line 175, in _new_conn (self._dns_host, self.port), self.timeout, **extra_kw File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/urllib3/util/connection.py", line 95, in create_connection raise err File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/urllib3/util/connection.py", line 85, in create_connection sock.connect(sa) ConnectionRefusedError: [Errno 111] Connection refused During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/urllib3/connectionpool.py", line 710, in urlopen chunked=chunked, File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/urllib3/connectionpool.py", line 398, in _make_request conn.request(method, url, **httplib_request_kw) File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/urllib3/connection.py", line 239, in request super(HTTPConnection, self).request(method, url, body=body, headers=headers) File "/usr/lib/python3.7/http/client.py", line 1256, in request self._send_request(method, url, body, headers, encode_chunked) File "/usr/lib/python3.7/http/client.py", line 1302, in _send_request self.endheaders(body, encode_chunked=encode_chunked) File "/usr/lib/python3.7/http/client.py", line 1251, in endheaders self._send_output(message_body, encode_chunked=encode_chunked) File "/usr/lib/python3.7/http/client.py", line 1030, in _send_output self.send(msg) File "/usr/lib/python3.7/http/client.py", line 970, in send self.connect() File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/urllib3/connection.py", line 205, in connect conn = self._new_conn() File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/urllib3/connection.py", line 187, in _new_conn self, "Failed to establish a new connection: %s" % e urllib3.exceptions.NewConnectionError: <urllib3.connection.HTTPConnection object at 0x7ff729e34190>: Failed to establish a new connection: [Errno 111] Connection refused During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/tmpfs/src/tf_docs_env/bin/tfx", line 8, in <module> sys.exit(cli_group()) File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/click/core.py", line 829, in __call__ return self.main(*args, **kwargs) File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/click/core.py", line 782, in main rv = self.invoke(ctx) File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/click/core.py", line 1259, in invoke return _process_result(sub_ctx.command.invoke(sub_ctx)) File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/click/core.py", line 1259, in invoke return _process_result(sub_ctx.command.invoke(sub_ctx)) File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/click/core.py", line 1066, in invoke return ctx.invoke(self.callback, **ctx.params) File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/click/core.py", line 610, in invoke return callback(*args, **kwargs) File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/click/decorators.py", line 73, in new_func return ctx.invoke(f, obj, *args, **kwargs) File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/click/core.py", line 610, in invoke return callback(*args, **kwargs) File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tfx/tools/cli/commands/run.py", line 94, in create_run handler = handler_factory.create_handler(ctx.flags_dict) File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tfx/tools/cli/handler/handler_factory.py", line 93, in create_handler return kubeflow_handler.KubeflowHandler(flags_dict) File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tfx/tools/cli/handler/kubeflow_handler.py", line 62, in __init__ namespace=self.flags_dict[labels.NAMESPACE]) File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/kfp/_client.py", line 197, in __init__ if not self._context_setting['namespace'] and self.get_kfp_healthz( File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/kfp/_client.py", line 411, in get_kfp_healthz response = self._healthz_api.get_healthz() File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/kfp_server_api/api/healthz_service_api.py", line 63, in get_healthz return self.get_healthz_with_http_info(**kwargs) # noqa: E501 File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/kfp_server_api/api/healthz_service_api.py", line 148, in get_healthz_with_http_info collection_formats=collection_formats) File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/kfp_server_api/api_client.py", line 369, in call_api _preload_content, _request_timeout, _host) File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/kfp_server_api/api_client.py", line 185, in __call_api _request_timeout=_request_timeout) File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/kfp_server_api/api_client.py", line 393, in request headers=headers) File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/kfp_server_api/rest.py", line 234, in GET query_params=query_params) File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/kfp_server_api/rest.py", line 212, in request headers=headers) File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/urllib3/request.py", line 75, in request method, url, fields=fields, headers=headers, **urlopen_kw File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/urllib3/request.py", line 96, in request_encode_url return self.urlopen(method, url, **extra_kw) File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/urllib3/poolmanager.py", line 375, in urlopen response = conn.urlopen(method, u.request_uri, **kw) File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/urllib3/connectionpool.py", line 826, in urlopen **response_kw File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/urllib3/connectionpool.py", line 826, in urlopen **response_kw File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/urllib3/connectionpool.py", line 826, in urlopen **response_kw File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/urllib3/connectionpool.py", line 786, in urlopen method, url, error=e, _pool=self, _stacktrace=sys.exc_info()[2] File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/urllib3/util/retry.py", line 592, in increment raise MaxRetryError(_pool, url, error or ResponseError(cause)) urllib3.exceptions.MaxRetryError: HTTPConnectionPool(host='localhost', port=80): Max retries exceeded with url: /apis/v1beta1/healthz (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7ff729e34190>: Failed to establish a new connection: [Errno 111] Connection refused'))
Możesz też uruchomić potok na pulpicie nawigacyjnym Kubeflow Pipelines. Nowe uruchomienie zostanie wyświetlone w obszarze Experiments
na pulpicie nawigacyjnym Kubeflow Pipelines. Kliknięcie w eksperyment pozwoli Ci monitorować postęp i wizualizować artefakty utworzone podczas wykonywania.
Jeśli interesuje Cię uruchomienie potoku w Kubeflow Pipelines, więcej instrukcji znajdziesz w samouczku TFX on Cloud AI Platform Pipelines .
Sprzątanie
Aby wyczyścić wszystkie zasoby Google Cloud użyte w tym kroku, możesz usunąć projekt Google Cloud użyty w samouczku.
Alternatywnie możesz wyczyścić poszczególne zasoby, odwiedzając każdą konsolę: