序章
このドキュメントでは、TFX Pythonパッケージで提供されるペンギンテンプレートを使用して、独自のデータセット用のTensorFlow Extended(TFX)パイプラインを作成する手順について説明します。作成されたパイプラインは、最初はPalmer Penguinsデータセットを使用しますが、データセットのパイプラインを変換します。
前提条件
- Linux / MacOS
- Python 3.6-3.8
- Jupyterノートブック
手順1.事前定義されたテンプレートをプロジェクトディレクトリにコピーします。
このステップでは、TFXのペンギンテンプレートからファイルをコピーして、作業パイプラインプロジェクトディレクトリとファイルを作成します。これは、TFXパイプラインプロジェクトの足場と考えることができます。
ピップを更新
Colabで実行している場合は、最新バージョンのPipがあることを確認する必要があります。もちろん、ローカルシステムは個別に更新できます。
import sys
if 'google.colab' in sys.modules:
!pip install --upgrade pip
必要なパッケージをインストールする
まず、TFXとTensorFlow Model Analysis(TFMA)をインストールします。
pip install -U tfx tensorflow-model-analysis
TFXのバージョンを確認してみましょう。
import tensorflow as tf
import tensorflow_model_analysis as tfma
import tfx
print('TF version: {}'.format(tf.__version__))
print('TFMA version: {}'.format(tfma.__version__))
print('TFX version: {}'.format(tfx.__version__))
TF version: 2.7.1 TFMA version: 0.37.0 TFX version: 1.6.0
パイプラインを作成する準備が整いました。
PROJECT_DIR
を環境に適した宛先に設定します。デフォルト値は~/imported/${PIPELINE_NAME}
で、これはGoogle Cloud AI PlatformNotebook環境に適しています。
以下のPIPELINE_NAME
を変更することで、パイプラインに別の名前を付けることができます。これは、ファイルが配置されるプロジェクトディレクトリの名前にもなります。
PIPELINE_NAME="my_pipeline"
import os
# Set this project directory to your new tfx pipeline project.
PROJECT_DIR=os.path.join(os.path.expanduser("~"), "imported", PIPELINE_NAME)
テンプレートファイルをコピーします。
TFXには、TFXpythonパッケージにpenguin
テンプレートが含まれています。 penguin
テンプレートには、このチュートリアルの目的であるパイプラインにデータセットを取り込むための多くの手順が含まれています。
tfx template copy
CLIコマンドは、事前定義されたテンプレートファイルをプロジェクトディレクトリにコピーします。
# Set `PATH` to include user python binary directory and a directory containing `skaffold`.
PATH=%env PATH
%env PATH={PATH}:/home/jupyter/.local/bin
!tfx template copy \
--pipeline-name={PIPELINE_NAME} \
--destination-path={PROJECT_DIR} \
--model=penguin
env: PATH=/tmpfs/src/tf_docs_env/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/games:/usr/local/games:/snap/bin:/opt/puppetlabs/bin:/opt/android-studio/current/bin:/usr/local/go/bin:/usr/local/go/packages/bin:/opt/kubernetes/client/bin/:/home/kbuilder/.local/bin:/home/jupyter/.local/bin CLI Copying penguin pipeline template kubeflow_runner.py -> /home/kbuilder/imported/my_pipeline/kubeflow_runner.py configs.py -> /home/kbuilder/imported/my_pipeline/pipeline/configs.py pipeline.py -> /home/kbuilder/imported/my_pipeline/pipeline/pipeline.py __init__.py -> /home/kbuilder/imported/my_pipeline/pipeline/__init__.py model.py -> /home/kbuilder/imported/my_pipeline/models/model.py features.py -> /home/kbuilder/imported/my_pipeline/models/features.py features_test.py -> /home/kbuilder/imported/my_pipeline/models/features_test.py preprocessing_test.py -> /home/kbuilder/imported/my_pipeline/models/preprocessing_test.py preprocessing.py -> /home/kbuilder/imported/my_pipeline/models/preprocessing.py model_test.py -> /home/kbuilder/imported/my_pipeline/models/model_test.py __init__.py -> /home/kbuilder/imported/my_pipeline/models/__init__.py constants.py -> /home/kbuilder/imported/my_pipeline/models/constants.py local_runner.py -> /home/kbuilder/imported/my_pipeline/local_runner.py __init__.py -> /home/kbuilder/imported/my_pipeline/__init__.py
このノートブックの作業ディレクトリコンテキストをプロジェクトディレクトリに変更します。
%cd {PROJECT_DIR}
/home/kbuilder/imported/my_pipeline
コピーしたソースファイルを参照する
TFXテンプレートは、Pythonソースコードやサンプルデータなど、パイプラインを構築するための基本的なスキャフォールドファイルを提供します。 penguin
テンプレートは、ペンギンの例と同じパーマーペンギンデータセットとMLモデルを使用します。
ここでは、各Pythonファイルの簡単な紹介をします。
-
pipeline
-このディレクトリには、パイプラインの定義が含まれていますconfigs.py
—パイプラインランナーの共通定数を定義します- Pipeline.py —TFXコンポーネントと
pipeline.py
ラインを定義します
models
-このディレクトリにはMLモデル定義が含まれていますfeatures.py
、features_test.py
—モデルの機能を定義しますpreprocessing.py
、preprocessing_test.py
—データの前処理ルーチンを定義しますconstants.py
—モデルの定数を定義しますmodel.py
、model_test.py
などのMLフレームワークを使用してMLモデルを定義します
local_runner.py
—ローカルオーケストレーションエンジンを使用するローカル環境のランナーを定義しますkubeflow_runner.py
—Kubeflowパイプラインオーケストレーションエンジンのランナーを定義します
デフォルトでは、テンプレートには標準のTFXコンポーネントのみが含まれています。カスタマイズされたアクションが必要な場合は、パイプラインのカスタムコンポーネントを作成できます。詳細については、 TFXカスタムコンポーネントガイドを参照してください。
ユニットテストファイル。
名前に_test.py
が含まれているファイルがあることに気付くかもしれません。これらはパイプラインの単体テストであり、独自のパイプラインを実装するときに単体テストを追加することをお勧めします。テストファイルのモジュール名に-m
フラグを指定すると、単体テストを実行できます。通常、モジュール名は.py
拡張子を削除し、 /
を。に置き換えることで取得できます.
。例えば:
import sys
!{sys.executable} -m models.features_test
Running tests under Python 3.7.5: /tmpfs/src/tf_docs_env/bin/python [ RUN ] FeaturesTest.testLabelKey INFO:tensorflow:time(__main__.FeaturesTest.testLabelKey): 0.0s I0203 11:08:46.306882 140258321348416 test_util.py:2309] time(__main__.FeaturesTest.testLabelKey): 0.0s [ OK ] FeaturesTest.testLabelKey [ RUN ] FeaturesTest.test_session [ SKIPPED ] FeaturesTest.test_session ---------------------------------------------------------------------- Ran 2 tests in 0.001s OK (skipped=1)
ローカル環境でTFXパイプラインを作成します。
TFXは、パイプラインを実行するためにいくつかのオーケストレーションエンジンをサポートしています。ローカルオーケストレーションエンジンを使用します。ローカルオーケストレーションエンジンは、それ以上の依存関係なしで実行され、リモートコンピューティングクラスターに依存するのではなく、ローカル環境で実行されるため、開発とデバッグに適しています。
local_runner.py
を使用して、ローカルオーケストレーターを使用してパイプラインを実行します。パイプラインを実行する前に、パイプラインを作成する必要があります。 pipeline create
コマンドを使用してパイプラインを作成できます。
tfx pipeline create --engine=local --pipeline_path=local_runner.py
CLI Creating pipeline INFO:absl:Excluding no splits because exclude_splits is not set. INFO:absl:Excluding no splits because exclude_splits is not set. Pipeline "my_pipeline" created successfully.
pipeline create
コマンドは、 local_runner.py
で定義されたパイプラインを実際に実行せずに登録します。
次の手順で、runcreateコマンドを使用して作成したパイプラインをrun create
ます。
ステップ2.データをパイプラインに取り込みます。
最初のパイプラインは、テンプレートに含まれているペンギンのデータセットを取り込みます。データをパイプラインに入れる必要があり、ほとんどのTFXパイプラインはExampleGenコンポーネントで始まります。
ExampleGenを選択してください
データは、パイプラインがアクセスできる場所であればどこにでも、ローカルまたは分散ファイルシステム、あるいはクエリ可能なシステムに保存できます。 TFXは、データをTFXパイプラインに取り込むためのさまざまなExampleGen
コンポーネントを提供します。次の生成コンポーネントの例から1つを選択できます。
- CsvExampleGen:ディレクトリ内のCSVファイルを読み取ります。ペンギンの例とシカゴのタクシーの例で使用されます。
- ImportExampleGen:TFサンプルデータ形式のTFRecordファイルを取得します。 MNISTの例で使用されます。
- AvroまたはParquet形式のFileBasedExampleGen。
- BigQueryExampleGen :Google CloudBigQueryのデータを直接読み取ります。シカゴのタクシーの例で使用されます。
独自のExampleGenを作成することもできます。たとえば、tfxには、Prestoをデータソースとして使用するカスタムExecampleGenが含まれています。カスタムエグゼキュータの使用方法と開発方法の詳細については、ガイドを参照してください。
使用するExampleGenを決定したら、データを使用するようにパイプライン定義を変更する必要があります。
local_runner.py
のDATA_PATH
を変更し、ファイルの場所に設定します。- ローカル環境にファイルがある場合は、パスを指定します。これは、パイプラインを開発またはデバッグするための最良のオプションです。
- ファイルがGCSに保存されている場合は、
gs://{bucket_name}/...
で始まるパスを使用できます。たとえば、gsutil
を使用して、端末からGCSにアクセスできることを確認してください。必要に応じて、GoogleCloudの認証ガイドに従ってください。 - BigQueryExampleGenのようなクエリベースのExampleGenを使用する場合は、データソースからデータを選択するためのQueryステートメントが必要です。 Google Cloud BigQueryをデータソースとして使用するには、さらにいくつか設定する必要があります。
-
pipeline/configs.py
:-
GOOGLE_CLOUD_PROJECT
とGCS_BUCKET_NAME
をGCPプロジェクトとバケット名に変更します。パイプラインを実行する前に、バケットが存在している必要があります。 -
BIG_QUERY_WITH_DIRECT_RUNNER_BEAM_PIPELINE_ARGS
変数のコメントを解除します。 - コメントを外し、
BIG_QUERY_QUERY
変数をクエリステートメントに設定します。
-
-
local_runner.py
:- 代わりに、
pipeline.create_pipeline()
でdata_path
引数をコメントアウトし、query
引数のコメントを解除します。
- 代わりに、
-
pipeline/pipeline.py
:-
create_pipeline()
でdata_path
引数をコメントアウトし、query
引数のコメントを解除します。 - CsvExampleGenの代わりにBigQueryExampleGenを使用します。
-
既存のCsvExampleGenを
pipeline/pipeline.py
のExampleGenクラスに置き換えます。 ExampleGenクラスごとに異なるシグネチャがあります。詳細については、 ExampleGenコンポーネントガイドを参照してください。pipeline/pipeline.py
のimport
ステートメントを使用して必要なモジュールをインポートすることを忘れないでください。
最初のパイプラインは、 ExampleGen
、 StatisticsGen
、 SchemaGen
、 ExampleValidator
の4つのコンポーネントで構成されています。 StatisticsGen
、 SchemaGen
、 ExampleValidator
については何も変更する必要はありません。初めてパイプラインを実行してみましょう。
# Update and run the pipeline.
!tfx pipeline update --engine=local --pipeline_path=local_runner.py \
&& tfx run create --engine=local --pipeline_name={PIPELINE_NAME}
CLI Updating pipeline INFO:absl:Excluding no splits because exclude_splits is not set. INFO:absl:Excluding no splits because exclude_splits is not set. Pipeline "my_pipeline" updated successfully. CLI Creating a run for pipeline: my_pipeline INFO:absl:Excluding no splits because exclude_splits is not set. INFO:absl:Excluding no splits because exclude_splits is not set. INFO:absl:Using deployment config: executor_specs { key: "CsvExampleGen" value { beam_executable_spec { python_executor_spec { class_path: "tfx.components.example_gen.csv_example_gen.executor.Executor" } } } } executor_specs { key: "SchemaGen" value { python_class_executable_spec { class_path: "tfx.components.schema_gen.executor.Executor" } } } executor_specs { key: "StatisticsGen" value { beam_executable_spec { python_executor_spec { class_path: "tfx.components.statistics_gen.executor.Executor" } } } } custom_driver_specs { key: "CsvExampleGen" value { python_class_executable_spec { class_path: "tfx.components.example_gen.driver.FileBasedDriver" } } } metadata_connection_config { database_connection_config { sqlite { filename_uri: "./tfx_metadata/my_pipeline/metadata.db" connection_mode: READWRITE_OPENCREATE } } } INFO:absl:Using connection config: sqlite { filename_uri: "./tfx_metadata/my_pipeline/metadata.db" connection_mode: READWRITE_OPENCREATE } INFO:absl:Component CsvExampleGen is running. INFO:absl:Running launcher for node_info { type { name: "tfx.components.example_gen.csv_example_gen.component.CsvExampleGen" } id: "CsvExampleGen" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:09:12.120566" } } } contexts { type { name: "node" } name { field_value { string_value: "my_pipeline.CsvExampleGen" } } } } outputs { outputs { key: "examples" value { artifact_spec { type { name: "Examples" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } properties { key: "version" value: INT } base_type: DATASET } } } } } parameters { parameters { key: "input_base" value { field_value { string_value: "/home/kbuilder/imported/my_pipeline/data" } } } parameters { key: "input_config" value { field_value { string_value: "{\n \"splits\": [\n {\n \"name\": \"single_split\",\n \"pattern\": \"*\"\n }\n ]\n}" } } } parameters { key: "output_config" value { field_value { string_value: "{\n \"split_config\": {\n \"splits\": [\n {\n \"hash_buckets\": 2,\n \"name\": \"train\"\n },\n {\n \"hash_buckets\": 1,\n \"name\": \"eval\"\n }\n ]\n }\n}" } } } parameters { key: "output_data_format" value { field_value { int_value: 6 } } } parameters { key: "output_file_format" value { field_value { int_value: 5 } } } } downstream_nodes: "StatisticsGen" execution_options { caching_options { } } INFO:absl:MetadataStore with DB connection initialized INFO:absl:select span and version = (0, None) INFO:absl:latest span and version = (0, None) INFO:absl:MetadataStore with DB connection initialized INFO:absl:Going to run a new execution 1 INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=1, input_dict={}, output_dict=defaultdict(<class 'list'>, {'examples': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/1" custom_properties { key: "input_fingerprint" value { string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522" } } custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:09:12.120566:CsvExampleGen:examples:0" } } custom_properties { key: "span" value { int_value: 0 } } , artifact_type: name: "Examples" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } properties { key: "version" value: INT } base_type: DATASET )]}), exec_properties={'input_config': '{\n "splits": [\n {\n "name": "single_split",\n "pattern": "*"\n }\n ]\n}', 'output_data_format': 6, 'output_file_format': 5, 'output_config': '{\n "split_config": {\n "splits": [\n {\n "hash_buckets": 2,\n "name": "train"\n },\n {\n "hash_buckets": 1,\n "name": "eval"\n }\n ]\n }\n}', 'input_base': '/home/kbuilder/imported/my_pipeline/data', 'span': 0, 'version': None, 'input_fingerprint': 'split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522'}, execution_output_uri='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/executor_execution/1/executor_output.pb', stateful_working_dir='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/stateful_working_dir/2022-02-03T11:09:12.120566', tmp_dir='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/executor_execution/1/.temp/', pipeline_node=node_info { type { name: "tfx.components.example_gen.csv_example_gen.component.CsvExampleGen" } id: "CsvExampleGen" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:09:12.120566" } } } contexts { type { name: "node" } name { field_value { string_value: "my_pipeline.CsvExampleGen" } } } } outputs { outputs { key: "examples" value { artifact_spec { type { name: "Examples" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } properties { key: "version" value: INT } base_type: DATASET } } } } } parameters { parameters { key: "input_base" value { field_value { string_value: "/home/kbuilder/imported/my_pipeline/data" } } } parameters { key: "input_config" value { field_value { string_value: "{\n \"splits\": [\n {\n \"name\": \"single_split\",\n \"pattern\": \"*\"\n }\n ]\n}" } } } parameters { key: "output_config" value { field_value { string_value: "{\n \"split_config\": {\n \"splits\": [\n {\n \"hash_buckets\": 2,\n \"name\": \"train\"\n },\n {\n \"hash_buckets\": 1,\n \"name\": \"eval\"\n }\n ]\n }\n}" } } } parameters { key: "output_data_format" value { field_value { int_value: 6 } } } parameters { key: "output_file_format" value { field_value { int_value: 5 } } } } downstream_nodes: "StatisticsGen" execution_options { caching_options { } } , pipeline_info=id: "my_pipeline" , pipeline_run_id='2022-02-03T11:09:12.120566') INFO:absl:Generating examples. INFO:absl:Processing input csv data /home/kbuilder/imported/my_pipeline/data/* to TFExample. WARNING:root:Make sure that locally built Python SDK docker image has Python 3.7 interpreter. E0203 11:09:12.848598153 5127 fork_posix.cc:70] Fork support is only compatible with the epoll1 and poll polling strategies WARNING:apache_beam.io.tfrecordio:Couldn't find python-snappy so the implementation of _TFRecordUtil._masked_crc32c is not as fast as it could be. INFO:absl:Examples generated. INFO:absl:Value type <class 'NoneType'> of key version in exec_properties is not supported, going to drop it INFO:absl:Value type <class 'list'> of key _beam_pipeline_args in exec_properties is not supported, going to drop it INFO:absl:Cleaning up stateless execution info. INFO:absl:Execution 1 succeeded. INFO:absl:Cleaning up stateful execution info. INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'examples': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/1" custom_properties { key: "input_fingerprint" value { string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522" } } custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:09:12.120566:CsvExampleGen:examples:0" } } custom_properties { key: "span" value { int_value: 0 } } custom_properties { key: "tfx_version" value { string_value: "1.6.0" } } , artifact_type: name: "Examples" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } properties { key: "version" value: INT } base_type: DATASET )]}) for execution 1 INFO:absl:MetadataStore with DB connection initialized INFO:absl:Component CsvExampleGen is finished. INFO:absl:Component StatisticsGen is running. INFO:absl:Running launcher for node_info { type { name: "tfx.components.statistics_gen.component.StatisticsGen" base_type: PROCESS } id: "StatisticsGen" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:09:12.120566" } } } contexts { type { name: "node" } name { field_value { string_value: "my_pipeline.StatisticsGen" } } } } inputs { inputs { key: "examples" value { channels { producer_node_query { id: "CsvExampleGen" } context_queries { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } context_queries { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:09:12.120566" } } } context_queries { type { name: "node" } name { field_value { string_value: "my_pipeline.CsvExampleGen" } } } artifact_query { type { name: "Examples" base_type: DATASET } } output_key: "examples" } min_count: 1 } } } outputs { outputs { key: "statistics" value { artifact_spec { type { name: "ExampleStatistics" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } base_type: STATISTICS } } } } } parameters { parameters { key: "exclude_splits" value { field_value { string_value: "[]" } } } } upstream_nodes: "CsvExampleGen" downstream_nodes: "SchemaGen" execution_options { caching_options { } } INFO:absl:MetadataStore with DB connection initialized INFO:absl:MetadataStore with DB connection initialized INFO:absl:Going to run a new execution 2 INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=2, input_dict={'examples': [Artifact(artifact: id: 1 type_id: 15 uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/1" properties { key: "split_names" value { string_value: "[\"train\", \"eval\"]" } } custom_properties { key: "file_format" value { string_value: "tfrecords_gzip" } } custom_properties { key: "input_fingerprint" value { string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522" } } custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:09:12.120566:CsvExampleGen:examples:0" } } custom_properties { key: "payload_format" value { string_value: "FORMAT_TF_EXAMPLE" } } custom_properties { key: "span" value { int_value: 0 } } custom_properties { key: "tfx_version" value { string_value: "1.6.0" } } state: LIVE create_time_since_epoch: 1643886553302 last_update_time_since_epoch: 1643886553302 , artifact_type: id: 15 name: "Examples" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } properties { key: "version" value: INT } base_type: DATASET )]}, output_dict=defaultdict(<class 'list'>, {'statistics': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/2" custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:09:12.120566:StatisticsGen:statistics:0" } } , artifact_type: name: "ExampleStatistics" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } base_type: STATISTICS )]}), exec_properties={'exclude_splits': '[]'}, execution_output_uri='./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/executor_execution/2/executor_output.pb', stateful_working_dir='./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/stateful_working_dir/2022-02-03T11:09:12.120566', tmp_dir='./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/executor_execution/2/.temp/', pipeline_node=node_info { type { name: "tfx.components.statistics_gen.component.StatisticsGen" base_type: PROCESS } id: "StatisticsGen" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:09:12.120566" } } } contexts { type { name: "node" } name { field_value { string_value: "my_pipeline.StatisticsGen" } } } } inputs { inputs { key: "examples" value { channels { producer_node_query { id: "CsvExampleGen" } context_queries { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } context_queries { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:09:12.120566" } } } context_queries { type { name: "node" } name { field_value { string_value: "my_pipeline.CsvExampleGen" } } } artifact_query { type { name: "Examples" base_type: DATASET } } output_key: "examples" } min_count: 1 } } } outputs { outputs { key: "statistics" value { artifact_spec { type { name: "ExampleStatistics" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } base_type: STATISTICS } } } } } parameters { parameters { key: "exclude_splits" value { field_value { string_value: "[]" } } } } upstream_nodes: "CsvExampleGen" downstream_nodes: "SchemaGen" execution_options { caching_options { } } , pipeline_info=id: "my_pipeline" , pipeline_run_id='2022-02-03T11:09:12.120566') INFO:absl:Generating statistics for split train. INFO:absl:Statistics for split train written to ./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/2/Split-train. INFO:absl:Generating statistics for split eval. INFO:absl:Statistics for split eval written to ./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/2/Split-eval. WARNING:root:Make sure that locally built Python SDK docker image has Python 3.7 interpreter. INFO:absl:Cleaning up stateless execution info. INFO:absl:Execution 2 succeeded. INFO:absl:Cleaning up stateful execution info. INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'statistics': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/2" custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:09:12.120566:StatisticsGen:statistics:0" } } custom_properties { key: "tfx_version" value { string_value: "1.6.0" } } , artifact_type: name: "ExampleStatistics" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } base_type: STATISTICS )]}) for execution 2 INFO:absl:MetadataStore with DB connection initialized INFO:absl:Component StatisticsGen is finished. INFO:absl:Component SchemaGen is running. INFO:absl:Running launcher for node_info { type { name: "tfx.components.schema_gen.component.SchemaGen" base_type: PROCESS } id: "SchemaGen" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:09:12.120566" } } } contexts { type { name: "node" } name { field_value { string_value: "my_pipeline.SchemaGen" } } } } inputs { inputs { key: "statistics" value { channels { producer_node_query { id: "StatisticsGen" } context_queries { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } context_queries { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:09:12.120566" } } } context_queries { type { name: "node" } name { field_value { string_value: "my_pipeline.StatisticsGen" } } } artifact_query { type { name: "ExampleStatistics" base_type: STATISTICS } } output_key: "statistics" } min_count: 1 } } } outputs { outputs { key: "schema" value { artifact_spec { type { name: "Schema" } } } } } parameters { parameters { key: "exclude_splits" value { field_value { string_value: "[]" } } } parameters { key: "infer_feature_shape" value { field_value { int_value: 1 } } } } upstream_nodes: "StatisticsGen" execution_options { caching_options { } } INFO:absl:MetadataStore with DB connection initialized INFO:absl:MetadataStore with DB connection initialized INFO:absl:Going to run a new execution 3 INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=3, input_dict={'statistics': [Artifact(artifact: id: 2 type_id: 17 uri: "./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/2" properties { key: "split_names" value { string_value: "[\"train\", \"eval\"]" } } custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:09:12.120566:StatisticsGen:statistics:0" } } custom_properties { key: "tfx_version" value { string_value: "1.6.0" } } state: LIVE create_time_since_epoch: 1643886556588 last_update_time_since_epoch: 1643886556588 , artifact_type: id: 17 name: "ExampleStatistics" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } base_type: STATISTICS )]}, output_dict=defaultdict(<class 'list'>, {'schema': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/SchemaGen/schema/3" custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:09:12.120566:SchemaGen:schema:0" } } , artifact_type: name: "Schema" )]}), exec_properties={'infer_feature_shape': 1, 'exclude_splits': '[]'}, execution_output_uri='./tfx_pipeline_output/my_pipeline/SchemaGen/.system/executor_execution/3/executor_output.pb', stateful_working_dir='./tfx_pipeline_output/my_pipeline/SchemaGen/.system/stateful_working_dir/2022-02-03T11:09:12.120566', tmp_dir='./tfx_pipeline_output/my_pipeline/SchemaGen/.system/executor_execution/3/.temp/', pipeline_node=node_info { type { name: "tfx.components.schema_gen.component.SchemaGen" base_type: PROCESS } id: "SchemaGen" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:09:12.120566" } } } contexts { type { name: "node" } name { field_value { string_value: "my_pipeline.SchemaGen" } } } } inputs { inputs { key: "statistics" value { channels { producer_node_query { id: "StatisticsGen" } context_queries { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } context_queries { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:09:12.120566" } } } context_queries { type { name: "node" } name { field_value { string_value: "my_pipeline.StatisticsGen" } } } artifact_query { type { name: "ExampleStatistics" base_type: STATISTICS } } output_key: "statistics" } min_count: 1 } } } outputs { outputs { key: "schema" value { artifact_spec { type { name: "Schema" } } } } } parameters { parameters { key: "exclude_splits" value { field_value { string_value: "[]" } } } parameters { key: "infer_feature_shape" value { field_value { int_value: 1 } } } } upstream_nodes: "StatisticsGen" execution_options { caching_options { } } , pipeline_info=id: "my_pipeline" , pipeline_run_id='2022-02-03T11:09:12.120566') INFO:absl:Processing schema from statistics for split train. INFO:absl:Processing schema from statistics for split eval. INFO:absl:Schema written to ./tfx_pipeline_output/my_pipeline/SchemaGen/schema/3/schema.pbtxt. INFO:absl:Cleaning up stateless execution info. INFO:absl:Execution 3 succeeded. INFO:absl:Cleaning up stateful execution info. INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'schema': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/SchemaGen/schema/3" custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:09:12.120566:SchemaGen:schema:0" } } custom_properties { key: "tfx_version" value { string_value: "1.6.0" } } , artifact_type: name: "Schema" )]}) for execution 3 INFO:absl:MetadataStore with DB connection initialized INFO:absl:Component SchemaGen is finished.
「ComponentExampleValidatorが終了しました」と表示されます。パイプラインが正常に実行された場合。
パイプラインの出力を調べます。
TFXパイプラインは、アーティファクトと、アーティファクトとパイプライン実行のメタデータを含むメタデータDB(MLMD)の2種類の出力を生成します。出力の場所はlocal_runner.py
で定義されています。デフォルトでは、アーティファクトはtfx_pipeline_output
ディレクトリに保存され、メタデータはsqliteデータベースとしてtfx_metadata
ディレクトリに保存されます。
MLMD APIを使用して、これらの出力を調べることができます。最初に、生成されたばかりの出力アーティファクトを検索するためのいくつかの効用関数を定義します。
import tensorflow as tf
import tfx
from ml_metadata import errors
from ml_metadata.proto import metadata_store_pb2
from tfx.types import artifact_utils
# TODO(b/171447278): Move these functions into TFX library.
def get_latest_executions(store, pipeline_name, component_id = None):
"""Fetch all pipeline runs."""
if component_id is None: # Find entire pipeline runs.
run_contexts = [
c for c in store.get_contexts_by_type('run')
if c.properties['pipeline_name'].string_value == pipeline_name
]
else: # Find specific component runs.
run_contexts = [
c for c in store.get_contexts_by_type('component_run')
if c.properties['pipeline_name'].string_value == pipeline_name and
c.properties['component_id'].string_value == component_id
]
if not run_contexts:
return []
# Pick the latest run context.
latest_context = max(run_contexts,
key=lambda c: c.last_update_time_since_epoch)
return store.get_executions_by_context(latest_context.id)
def get_latest_artifacts(store, pipeline_name, component_id = None):
"""Fetch all artifacts from latest pipeline execution."""
executions = get_latest_executions(store, pipeline_name, component_id)
# Fetch all artifacts produced from the given executions.
execution_ids = [e.id for e in executions]
events = store.get_events_by_execution_ids(execution_ids)
artifact_ids = [
event.artifact_id for event in events
if event.type == metadata_store_pb2.Event.OUTPUT
]
return store.get_artifacts_by_id(artifact_ids)
def find_latest_artifacts_by_type(store, artifacts, artifact_type):
"""Get the latest artifacts of a specified type."""
# Get type information from MLMD
try:
artifact_type = store.get_artifact_type(artifact_type)
except errors.NotFoundError:
return []
# Filter artifacts with type.
filtered_artifacts = [aritfact for aritfact in artifacts
if aritfact.type_id == artifact_type.id]
# Convert MLMD artifact data into TFX Artifact instances.
return [artifact_utils.deserialize_artifact(artifact_type, artifact)
for artifact in filtered_artifacts]
from tfx.orchestration.experimental.interactive import visualizations
def visualize_artifacts(artifacts):
"""Visualizes artifacts using standard visualization modules."""
for artifact in artifacts:
visualization = visualizations.get_registry().get_visualization(
artifact.type_name)
if visualization:
visualization.display(artifact)
from tfx.orchestration.experimental.interactive import standard_visualizations
standard_visualizations.register_standard_visualizations()
import pprint
from tfx.orchestration import metadata
from tfx.types import artifact_utils
from tfx.types import standard_artifacts
def preview_examples(artifacts):
"""Preview a few records from Examples artifacts."""
pp = pprint.PrettyPrinter()
for artifact in artifacts:
print("==== Examples artifact:{}({})".format(artifact.name, artifact.uri))
for split in artifact_utils.decode_split_names(artifact.split_names):
print("==== Reading from split:{}".format(split))
split_uri = artifact_utils.get_split_uri([artifact], split)
# Get the list of files in this directory (all compressed TFRecord files)
tfrecord_filenames = [os.path.join(split_uri, name)
for name in os.listdir(split_uri)]
# Create a `TFRecordDataset` to read these files
dataset = tf.data.TFRecordDataset(tfrecord_filenames,
compression_type="GZIP")
# Iterate over the first 2 records and decode them.
for tfrecord in dataset.take(2):
serialized_example = tfrecord.numpy()
example = tf.train.Example()
example.ParseFromString(serialized_example)
pp.pprint(example)
import local_runner
metadata_connection_config = metadata.sqlite_metadata_connection_config(
local_runner.METADATA_PATH)
これで、MLMDから出力アーティファクトのメタデータを読み取ることができます。
with metadata.Metadata(metadata_connection_config) as metadata_handler:
# Search all aritfacts from the previous pipeline run.
artifacts = get_latest_artifacts(metadata_handler.store, PIPELINE_NAME)
# Find artifacts of Examples type.
examples_artifacts = find_latest_artifacts_by_type(
metadata_handler.store, artifacts,
standard_artifacts.Examples.TYPE_NAME)
# Find artifacts generated from StatisticsGen.
stats_artifacts = find_latest_artifacts_by_type(
metadata_handler.store, artifacts,
standard_artifacts.ExampleStatistics.TYPE_NAME)
# Find artifacts generated from SchemaGen.
schema_artifacts = find_latest_artifacts_by_type(
metadata_handler.store, artifacts,
standard_artifacts.Schema.TYPE_NAME)
# Find artifacts generated from ExampleValidator.
anomalies_artifacts = find_latest_artifacts_by_type(
metadata_handler.store, artifacts,
standard_artifacts.ExampleAnomalies.TYPE_NAME)
これで、各コンポーネントからの出力を調べることができます。 Tensorflow Data Validation(TFDV)は、 StatisticsGen
、 SchemaGen
、およびExampleValidator
で使用され、TFDVを使用してこれらのコンポーネントからの出力を視覚化できます。
このチュートリアルでは、TFDVを内部的に使用して視覚化を表示するTFXの視覚化ヘルパーメソッドを使用します。各コンポーネントの詳細については、 TFXコンポーネントのチュートリアルを参照してください。
ExampleGenからの出力を調べます
ExampleGenからの出力を調べてみましょう。各分割の最初の2つの例を見てください。
preview_examples(examples_artifacts)
デフォルトでは、TFX ExampleGenは例をtrainとevalの2つの分割に分割しますが、分割構成を調整できます。
StatisticsGenからの出力を調べます
visualize_artifacts(stats_artifacts)
これらの統計は、データのスキーマを自動的に構築するためにSchemaGenに提供されます。
SchemaGenからの出力を調べます
visualize_artifacts(schema_artifacts)
このスキーマは、StatisticsGenの出力から自動的に推測されます。このチュートリアルでは、この生成されたスキーマを使用しますが、スキーマを変更およびカスタマイズすることもできます。
ExampleValidatorからの出力を調べます
visualize_artifacts(anomalies_artifacts)
異常が見つかった場合は、すべての例が想定に従っていることをデータで確認できます。 StatistcsGenなどの他のコンポーネントからの出力が役立つ場合があります。見つかった異常は、パイプラインの実行をブロックしません。
SchemaGen
の出力から利用可能な機能を確認できます。特徴を使用してTrainer
でMLモデルを直接構築できる場合は、次のステップをスキップしてステップ4に進むことができます。それ以外の場合は、次のステップでいくつかの特徴エンジニアリング作業を行うことができます。平均の計算などのフルパス操作が必要な場合、特にスケーリングが必要な場合は、 Transform
コンポーネントが必要です。
ステップ3.(オプション)Transformコンポーネントを使用した特徴エンジニアリング。
このステップでは、パイプラインのTransform
コンポーネントで使用されるさまざまな特徴エンジニアリングジョブを定義します。詳細については、変換コンポーネントガイドを参照してください。
これは、トレーニングコードにExampleGenの出力では利用できない追加機能が必要な場合にのみ必要です。それ以外の場合は、トレーナーを使用する次のステップに進んでください。
モデルの機能を定義する
models/features.py
には、機能名、語彙のサイズなど、モデルの機能を定義するための定数が含まれています。デフォルトでは、 penguin
テンプレートにはFEATURE_KEYS
とLABEL_KEY
の2つのコストがあります。これは、 penguin
モデルが教師あり学習を使用して分類問題を解決し、すべての特徴が連続数値特徴であるためです。別の例については、シカゴのタクシーの例の機能定義を参照してください。
preprocessing_fn()でトレーニング/サービングの前処理を実装します。
実際の特徴エンジニアリングは、 models/preprocessing.py
のpreprocessing_fn()
関数で行われます。
preprocessing_fn
では、テンソルの入力dictを操作して、テンソルの出力dictを生成する一連の関数を定義できます。 TensorFlow Transform APIにはscale_to_0_1
やcompute_and_apply_vocabulary
などのヘルパー関数がありますが、通常のTensorFlow関数を使用することもできます。デフォルトでは、 penguin
テンプレートには、特徴値を正規化するためのtft.scale_to_z_score関数の使用例が含まれています。
preprocessing_fn
のオーサリングの詳細については、 Tensflow変換ガイドを参照してください。
Transformコンポーネントをパイプラインに追加します。
preprocessing_fnの準備ができたら、 Transform
コンポーネントをパイプラインに追加します。
-
pipeline/pipeline.py
ファイルで、# components.append(transform)
のコメントを外して、コンポーネントをパイプラインに追加します。
パイプラインを更新して、再度実行できます。
!tfx pipeline update --engine=local --pipeline_path=local_runner.py \
&& tfx run create --engine=local --pipeline_name={PIPELINE_NAME}
CLI Updating pipeline INFO:absl:Excluding no splits because exclude_splits is not set. INFO:absl:Excluding no splits because exclude_splits is not set. Pipeline "my_pipeline" updated successfully. CLI Creating a run for pipeline: my_pipeline INFO:absl:Excluding no splits because exclude_splits is not set. INFO:absl:Excluding no splits because exclude_splits is not set. INFO:absl:Using deployment config: executor_specs { key: "CsvExampleGen" value { beam_executable_spec { python_executor_spec { class_path: "tfx.components.example_gen.csv_example_gen.executor.Executor" } } } } executor_specs { key: "SchemaGen" value { python_class_executable_spec { class_path: "tfx.components.schema_gen.executor.Executor" } } } executor_specs { key: "StatisticsGen" value { beam_executable_spec { python_executor_spec { class_path: "tfx.components.statistics_gen.executor.Executor" } } } } custom_driver_specs { key: "CsvExampleGen" value { python_class_executable_spec { class_path: "tfx.components.example_gen.driver.FileBasedDriver" } } } metadata_connection_config { database_connection_config { sqlite { filename_uri: "./tfx_metadata/my_pipeline/metadata.db" connection_mode: READWRITE_OPENCREATE } } } INFO:absl:Using connection config: sqlite { filename_uri: "./tfx_metadata/my_pipeline/metadata.db" connection_mode: READWRITE_OPENCREATE } INFO:absl:Component CsvExampleGen is running. INFO:absl:Running launcher for node_info { type { name: "tfx.components.example_gen.csv_example_gen.component.CsvExampleGen" } id: "CsvExampleGen" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:09:37.055994" } } } contexts { type { name: "node" } name { field_value { string_value: "my_pipeline.CsvExampleGen" } } } } outputs { outputs { key: "examples" value { artifact_spec { type { name: "Examples" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } properties { key: "version" value: INT } base_type: DATASET } } } } } parameters { parameters { key: "input_base" value { field_value { string_value: "/home/kbuilder/imported/my_pipeline/data" } } } parameters { key: "input_config" value { field_value { string_value: "{\n \"splits\": [\n {\n \"name\": \"single_split\",\n \"pattern\": \"*\"\n }\n ]\n}" } } } parameters { key: "output_config" value { field_value { string_value: "{\n \"split_config\": {\n \"splits\": [\n {\n \"hash_buckets\": 2,\n \"name\": \"train\"\n },\n {\n \"hash_buckets\": 1,\n \"name\": \"eval\"\n }\n ]\n }\n}" } } } parameters { key: "output_data_format" value { field_value { int_value: 6 } } } parameters { key: "output_file_format" value { field_value { int_value: 5 } } } } downstream_nodes: "StatisticsGen" execution_options { caching_options { } } INFO:absl:MetadataStore with DB connection initialized INFO:absl:select span and version = (0, None) INFO:absl:latest span and version = (0, None) INFO:absl:MetadataStore with DB connection initialized INFO:absl:Going to run a new execution 4 INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=4, input_dict={}, output_dict=defaultdict(<class 'list'>, {'examples': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/4" custom_properties { key: "input_fingerprint" value { string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522" } } custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:09:37.055994:CsvExampleGen:examples:0" } } custom_properties { key: "span" value { int_value: 0 } } , artifact_type: name: "Examples" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } properties { key: "version" value: INT } base_type: DATASET )]}), exec_properties={'output_file_format': 5, 'input_base': '/home/kbuilder/imported/my_pipeline/data', 'output_config': '{\n "split_config": {\n "splits": [\n {\n "hash_buckets": 2,\n "name": "train"\n },\n {\n "hash_buckets": 1,\n "name": "eval"\n }\n ]\n }\n}', 'output_data_format': 6, 'input_config': '{\n "splits": [\n {\n "name": "single_split",\n "pattern": "*"\n }\n ]\n}', 'span': 0, 'version': None, 'input_fingerprint': 'split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522'}, execution_output_uri='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/executor_execution/4/executor_output.pb', stateful_working_dir='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/stateful_working_dir/2022-02-03T11:09:37.055994', tmp_dir='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/executor_execution/4/.temp/', pipeline_node=node_info { type { name: "tfx.components.example_gen.csv_example_gen.component.CsvExampleGen" } id: "CsvExampleGen" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:09:37.055994" } } } contexts { type { name: "node" } name { field_value { string_value: "my_pipeline.CsvExampleGen" } } } } outputs { outputs { key: "examples" value { artifact_spec { type { name: "Examples" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } properties { key: "version" value: INT } base_type: DATASET } } } } } parameters { parameters { key: "input_base" value { field_value { string_value: "/home/kbuilder/imported/my_pipeline/data" } } } parameters { key: "input_config" value { field_value { string_value: "{\n \"splits\": [\n {\n \"name\": \"single_split\",\n \"pattern\": \"*\"\n }\n ]\n}" } } } parameters { key: "output_config" value { field_value { string_value: "{\n \"split_config\": {\n \"splits\": [\n {\n \"hash_buckets\": 2,\n \"name\": \"train\"\n },\n {\n \"hash_buckets\": 1,\n \"name\": \"eval\"\n }\n ]\n }\n}" } } } parameters { key: "output_data_format" value { field_value { int_value: 6 } } } parameters { key: "output_file_format" value { field_value { int_value: 5 } } } } downstream_nodes: "StatisticsGen" execution_options { caching_options { } } , pipeline_info=id: "my_pipeline" , pipeline_run_id='2022-02-03T11:09:37.055994') INFO:absl:Generating examples. INFO:absl:Processing input csv data /home/kbuilder/imported/my_pipeline/data/* to TFExample. WARNING:root:Make sure that locally built Python SDK docker image has Python 3.7 interpreter. E0203 11:09:37.596944686 5287 fork_posix.cc:70] Fork support is only compatible with the epoll1 and poll polling strategies WARNING:apache_beam.io.tfrecordio:Couldn't find python-snappy so the implementation of _TFRecordUtil._masked_crc32c is not as fast as it could be. INFO:absl:Examples generated. INFO:absl:Value type <class 'NoneType'> of key version in exec_properties is not supported, going to drop it INFO:absl:Value type <class 'list'> of key _beam_pipeline_args in exec_properties is not supported, going to drop it INFO:absl:Cleaning up stateless execution info. INFO:absl:Execution 4 succeeded. INFO:absl:Cleaning up stateful execution info. INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'examples': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/4" custom_properties { key: "input_fingerprint" value { string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522" } } custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:09:37.055994:CsvExampleGen:examples:0" } } custom_properties { key: "span" value { int_value: 0 } } custom_properties { key: "tfx_version" value { string_value: "1.6.0" } } , artifact_type: name: "Examples" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } properties { key: "version" value: INT } base_type: DATASET )]}) for execution 4 INFO:absl:MetadataStore with DB connection initialized INFO:absl:Component CsvExampleGen is finished. INFO:absl:Component StatisticsGen is running. INFO:absl:Running launcher for node_info { type { name: "tfx.components.statistics_gen.component.StatisticsGen" base_type: PROCESS } id: "StatisticsGen" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:09:37.055994" } } } contexts { type { name: "node" } name { field_value { string_value: "my_pipeline.StatisticsGen" } } } } inputs { inputs { key: "examples" value { channels { producer_node_query { id: "CsvExampleGen" } context_queries { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } context_queries { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:09:37.055994" } } } context_queries { type { name: "node" } name { field_value { string_value: "my_pipeline.CsvExampleGen" } } } artifact_query { type { name: "Examples" base_type: DATASET } } output_key: "examples" } min_count: 1 } } } outputs { outputs { key: "statistics" value { artifact_spec { type { name: "ExampleStatistics" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } base_type: STATISTICS } } } } } parameters { parameters { key: "exclude_splits" value { field_value { string_value: "[]" } } } } upstream_nodes: "CsvExampleGen" downstream_nodes: "SchemaGen" execution_options { caching_options { } } INFO:absl:MetadataStore with DB connection initialized INFO:absl:MetadataStore with DB connection initialized INFO:absl:Going to run a new execution 5 INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=5, input_dict={'examples': [Artifact(artifact: id: 4 type_id: 15 uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/4" properties { key: "split_names" value { string_value: "[\"train\", \"eval\"]" } } custom_properties { key: "file_format" value { string_value: "tfrecords_gzip" } } custom_properties { key: "input_fingerprint" value { string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522" } } custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:09:37.055994:CsvExampleGen:examples:0" } } custom_properties { key: "payload_format" value { string_value: "FORMAT_TF_EXAMPLE" } } custom_properties { key: "span" value { int_value: 0 } } custom_properties { key: "tfx_version" value { string_value: "1.6.0" } } state: LIVE create_time_since_epoch: 1643886578210 last_update_time_since_epoch: 1643886578210 , artifact_type: id: 15 name: "Examples" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } properties { key: "version" value: INT } base_type: DATASET )]}, output_dict=defaultdict(<class 'list'>, {'statistics': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/5" custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:09:37.055994:StatisticsGen:statistics:0" } } , artifact_type: name: "ExampleStatistics" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } base_type: STATISTICS )]}), exec_properties={'exclude_splits': '[]'}, execution_output_uri='./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/executor_execution/5/executor_output.pb', stateful_working_dir='./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/stateful_working_dir/2022-02-03T11:09:37.055994', tmp_dir='./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/executor_execution/5/.temp/', pipeline_node=node_info { type { name: "tfx.components.statistics_gen.component.StatisticsGen" base_type: PROCESS } id: "StatisticsGen" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:09:37.055994" } } } contexts { type { name: "node" } name { field_value { string_value: "my_pipeline.StatisticsGen" } } } } inputs { inputs { key: "examples" value { channels { producer_node_query { id: "CsvExampleGen" } context_queries { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } context_queries { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:09:37.055994" } } } context_queries { type { name: "node" } name { field_value { string_value: "my_pipeline.CsvExampleGen" } } } artifact_query { type { name: "Examples" base_type: DATASET } } output_key: "examples" } min_count: 1 } } } outputs { outputs { key: "statistics" value { artifact_spec { type { name: "ExampleStatistics" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } base_type: STATISTICS } } } } } parameters { parameters { key: "exclude_splits" value { field_value { string_value: "[]" } } } } upstream_nodes: "CsvExampleGen" downstream_nodes: "SchemaGen" execution_options { caching_options { } } , pipeline_info=id: "my_pipeline" , pipeline_run_id='2022-02-03T11:09:37.055994') INFO:absl:Generating statistics for split train. INFO:absl:Statistics for split train written to ./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/5/Split-train. INFO:absl:Generating statistics for split eval. INFO:absl:Statistics for split eval written to ./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/5/Split-eval. WARNING:root:Make sure that locally built Python SDK docker image has Python 3.7 interpreter. INFO:absl:Cleaning up stateless execution info. INFO:absl:Execution 5 succeeded. INFO:absl:Cleaning up stateful execution info. INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'statistics': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/5" custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:09:37.055994:StatisticsGen:statistics:0" } } custom_properties { key: "tfx_version" value { string_value: "1.6.0" } } , artifact_type: name: "ExampleStatistics" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } base_type: STATISTICS )]}) for execution 5 INFO:absl:MetadataStore with DB connection initialized INFO:absl:Component StatisticsGen is finished. INFO:absl:Component SchemaGen is running. INFO:absl:Running launcher for node_info { type { name: "tfx.components.schema_gen.component.SchemaGen" base_type: PROCESS } id: "SchemaGen" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:09:37.055994" } } } contexts { type { name: "node" } name { field_value { string_value: "my_pipeline.SchemaGen" } } } } inputs { inputs { key: "statistics" value { channels { producer_node_query { id: "StatisticsGen" } context_queries { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } context_queries { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:09:37.055994" } } } context_queries { type { name: "node" } name { field_value { string_value: "my_pipeline.StatisticsGen" } } } artifact_query { type { name: "ExampleStatistics" base_type: STATISTICS } } output_key: "statistics" } min_count: 1 } } } outputs { outputs { key: "schema" value { artifact_spec { type { name: "Schema" } } } } } parameters { parameters { key: "exclude_splits" value { field_value { string_value: "[]" } } } parameters { key: "infer_feature_shape" value { field_value { int_value: 1 } } } } upstream_nodes: "StatisticsGen" execution_options { caching_options { } } INFO:absl:MetadataStore with DB connection initialized INFO:absl:MetadataStore with DB connection initialized INFO:absl:Going to run a new execution 6 INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=6, input_dict={'statistics': [Artifact(artifact: id: 5 type_id: 17 uri: "./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/5" properties { key: "split_names" value { string_value: "[\"train\", \"eval\"]" } } custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:09:37.055994:StatisticsGen:statistics:0" } } custom_properties { key: "tfx_version" value { string_value: "1.6.0" } } state: LIVE create_time_since_epoch: 1643886581527 last_update_time_since_epoch: 1643886581527 , artifact_type: id: 17 name: "ExampleStatistics" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } base_type: STATISTICS )]}, output_dict=defaultdict(<class 'list'>, {'schema': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/SchemaGen/schema/6" custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:09:37.055994:SchemaGen:schema:0" } } , artifact_type: name: "Schema" )]}), exec_properties={'exclude_splits': '[]', 'infer_feature_shape': 1}, execution_output_uri='./tfx_pipeline_output/my_pipeline/SchemaGen/.system/executor_execution/6/executor_output.pb', stateful_working_dir='./tfx_pipeline_output/my_pipeline/SchemaGen/.system/stateful_working_dir/2022-02-03T11:09:37.055994', tmp_dir='./tfx_pipeline_output/my_pipeline/SchemaGen/.system/executor_execution/6/.temp/', pipeline_node=node_info { type { name: "tfx.components.schema_gen.component.SchemaGen" base_type: PROCESS } id: "SchemaGen" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:09:37.055994" } } } contexts { type { name: "node" } name { field_value { string_value: "my_pipeline.SchemaGen" } } } } inputs { inputs { key: "statistics" value { channels { producer_node_query { id: "StatisticsGen" } context_queries { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } context_queries { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:09:37.055994" } } } context_queries { type { name: "node" } name { field_value { string_value: "my_pipeline.StatisticsGen" } } } artifact_query { type { name: "ExampleStatistics" base_type: STATISTICS } } output_key: "statistics" } min_count: 1 } } } outputs { outputs { key: "schema" value { artifact_spec { type { name: "Schema" } } } } } parameters { parameters { key: "exclude_splits" value { field_value { string_value: "[]" } } } parameters { key: "infer_feature_shape" value { field_value { int_value: 1 } } } } upstream_nodes: "StatisticsGen" execution_options { caching_options { } } , pipeline_info=id: "my_pipeline" , pipeline_run_id='2022-02-03T11:09:37.055994') INFO:absl:Processing schema from statistics for split train. INFO:absl:Processing schema from statistics for split eval. INFO:absl:Schema written to ./tfx_pipeline_output/my_pipeline/SchemaGen/schema/6/schema.pbtxt. INFO:absl:Cleaning up stateless execution info. INFO:absl:Execution 6 succeeded. INFO:absl:Cleaning up stateful execution info. INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'schema': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/SchemaGen/schema/6" custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:09:37.055994:SchemaGen:schema:0" } } custom_properties { key: "tfx_version" value { string_value: "1.6.0" } } , artifact_type: name: "Schema" )]}) for execution 6 INFO:absl:MetadataStore with DB connection initialized INFO:absl:Component SchemaGen is finished.
パイプラインが正常に実行された場合は、「コンポーネント変換が終了しました」と表示されます。ログのどこかに。 Transform
コンポーネントとExampleValidator
コンポーネントは相互に依存していないため、実行の順序は固定されていません。とはいえ、 Transform
とExampleValidator
のどちらも、パイプライン実行の最後のコンポーネントになることができます。
変換からの出力を調べます
変換コンポーネントは、Tensorflowグラフと変換された例の2種類の出力を作成します。変換された例は、ExampleGenによって生成されたExamplesアーティファクトタイプですが、これには代わりに変換された特徴値が含まれています。
前の手順で行ったように、それらを調べることができます。
with metadata.Metadata(metadata_connection_config) as metadata_handler:
# Search all aritfacts from the previous run of Transform component.
artifacts = get_latest_artifacts(metadata_handler.store,
PIPELINE_NAME, "Transform")
# Find artifacts of Examples type.
transformed_examples_artifacts = find_latest_artifacts_by_type(
metadata_handler.store, artifacts,
standard_artifacts.Examples.TYPE_NAME)
preview_examples(transformed_examples_artifacts)
ステップ4.トレーナーコンポーネントを使用してモデルをトレーニングします。
Trainer
コンポーネントを使用してMLモデルを構築します。詳細については、トレーナーコンポーネントガイドを参照してください。トレーナーコンポーネントにモデルコードを提供する必要があります。
モデルを定義します。
ペンギンテンプレートでは、 models.model.run_fn
がTrainer
コンポーネントのrun_fn
引数として使用されます。これは、trainerコンポーネントの実行時にmodels/model.py
Trainer
run_fn()
関数が呼び出されることを意味します。与えられたコードでkeras
を使用して単純なDNNモデルを構築するためのコードを見ることができます。 TFXでのkerasAPIの使用の詳細については、TFXガイドのTensorFlow2.xを参照してください。
このrun_fn
では、モデルを作成し、コンポーネントによって指定されたfn_args.serving_model_dir
が指すディレクトリに保存する必要があります。 run_fn
に渡されるfn_args
で他の引数を使用できます。 fn_args
の引数の完全なリストについては、関連するコードを参照してください。
models/features.py
で機能を定義し、必要に応じて使用します。手順3でフィーチャを変換した場合は、変換されたフィーチャをモデルへの入力として使用する必要があります。
パイプラインにトレーナーコンポーネントを追加します。
run_fnの準備ができたら、 Trainer
コンポーネントをパイプラインに追加します。
-
pipeline/pipeline.py
ファイルで、# components.append(trainer)
のコメントを外して、コンポーネントをパイプラインに追加します。
トレーナーコンポーネントの引数は、Transformコンポーネントを使用するかどうかによって異なる場合があります。
-
Transform
コンポーネントを使用しない場合は、引数を変更する必要はありません。 Transform
コンポーネントを使用する場合は、Trainer
コンポーネントインスタンスを作成するときに引数を変更する必要があります。-
examples
引数をexamples=transform.outputs['transformed_examples'],
変更します。トレーニングには、変換された例を使用する必要があります。 -
transform_graph=transform.outputs['transform_graph'],
ようなtransform_graph
引数を追加します。このグラフには、変換操作のTensorFlowグラフが含まれています。 - 上記の変更後、トレーナーコンポーネントを作成するためのコードは次のようになります。
# If you use a Transform component. trainer = Trainer( run_fn=run_fn, examples=transform.outputs['transformed_examples'], transform_graph=transform.outputs['transform_graph'], schema=schema_gen.outputs['schema'], ...
-
パイプラインを更新して、再度実行できます。
!tfx pipeline update --engine=local --pipeline_path=local_runner.py \
&& tfx run create --engine=local --pipeline_name={PIPELINE_NAME}
CLI Updating pipeline INFO:absl:Excluding no splits because exclude_splits is not set. INFO:absl:Excluding no splits because exclude_splits is not set. Pipeline "my_pipeline" updated successfully. CLI Creating a run for pipeline: my_pipeline INFO:absl:Excluding no splits because exclude_splits is not set. INFO:absl:Excluding no splits because exclude_splits is not set. INFO:absl:Using deployment config: executor_specs { key: "CsvExampleGen" value { beam_executable_spec { python_executor_spec { class_path: "tfx.components.example_gen.csv_example_gen.executor.Executor" } } } } executor_specs { key: "SchemaGen" value { python_class_executable_spec { class_path: "tfx.components.schema_gen.executor.Executor" } } } executor_specs { key: "StatisticsGen" value { beam_executable_spec { python_executor_spec { class_path: "tfx.components.statistics_gen.executor.Executor" } } } } custom_driver_specs { key: "CsvExampleGen" value { python_class_executable_spec { class_path: "tfx.components.example_gen.driver.FileBasedDriver" } } } metadata_connection_config { database_connection_config { sqlite { filename_uri: "./tfx_metadata/my_pipeline/metadata.db" connection_mode: READWRITE_OPENCREATE } } } INFO:absl:Using connection config: sqlite { filename_uri: "./tfx_metadata/my_pipeline/metadata.db" connection_mode: READWRITE_OPENCREATE } INFO:absl:Component CsvExampleGen is running. INFO:absl:Running launcher for node_info { type { name: "tfx.components.example_gen.csv_example_gen.component.CsvExampleGen" } id: "CsvExampleGen" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:10:00.469382" } } } contexts { type { name: "node" } name { field_value { string_value: "my_pipeline.CsvExampleGen" } } } } outputs { outputs { key: "examples" value { artifact_spec { type { name: "Examples" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } properties { key: "version" value: INT } base_type: DATASET } } } } } parameters { parameters { key: "input_base" value { field_value { string_value: "/home/kbuilder/imported/my_pipeline/data" } } } parameters { key: "input_config" value { field_value { string_value: "{\n \"splits\": [\n {\n \"name\": \"single_split\",\n \"pattern\": \"*\"\n }\n ]\n}" } } } parameters { key: "output_config" value { field_value { string_value: "{\n \"split_config\": {\n \"splits\": [\n {\n \"hash_buckets\": 2,\n \"name\": \"train\"\n },\n {\n \"hash_buckets\": 1,\n \"name\": \"eval\"\n }\n ]\n }\n}" } } } parameters { key: "output_data_format" value { field_value { int_value: 6 } } } parameters { key: "output_file_format" value { field_value { int_value: 5 } } } } downstream_nodes: "StatisticsGen" execution_options { caching_options { } } INFO:absl:MetadataStore with DB connection initialized INFO:absl:select span and version = (0, None) INFO:absl:latest span and version = (0, None) INFO:absl:MetadataStore with DB connection initialized INFO:absl:Going to run a new execution 7 INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=7, input_dict={}, output_dict=defaultdict(<class 'list'>, {'examples': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/7" custom_properties { key: "input_fingerprint" value { string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522" } } custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:10:00.469382:CsvExampleGen:examples:0" } } custom_properties { key: "span" value { int_value: 0 } } , artifact_type: name: "Examples" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } properties { key: "version" value: INT } base_type: DATASET )]}), exec_properties={'input_base': '/home/kbuilder/imported/my_pipeline/data', 'output_config': '{\n "split_config": {\n "splits": [\n {\n "hash_buckets": 2,\n "name": "train"\n },\n {\n "hash_buckets": 1,\n "name": "eval"\n }\n ]\n }\n}', 'output_data_format': 6, 'output_file_format': 5, 'input_config': '{\n "splits": [\n {\n "name": "single_split",\n "pattern": "*"\n }\n ]\n}', 'span': 0, 'version': None, 'input_fingerprint': 'split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522'}, execution_output_uri='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/executor_execution/7/executor_output.pb', stateful_working_dir='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/stateful_working_dir/2022-02-03T11:10:00.469382', tmp_dir='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/executor_execution/7/.temp/', pipeline_node=node_info { type { name: "tfx.components.example_gen.csv_example_gen.component.CsvExampleGen" } id: "CsvExampleGen" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:10:00.469382" } } } contexts { type { name: "node" } name { field_value { string_value: "my_pipeline.CsvExampleGen" } } } } outputs { outputs { key: "examples" value { artifact_spec { type { name: "Examples" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } properties { key: "version" value: INT } base_type: DATASET } } } } } parameters { parameters { key: "input_base" value { field_value { string_value: "/home/kbuilder/imported/my_pipeline/data" } } } parameters { key: "input_config" value { field_value { string_value: "{\n \"splits\": [\n {\n \"name\": \"single_split\",\n \"pattern\": \"*\"\n }\n ]\n}" } } } parameters { key: "output_config" value { field_value { string_value: "{\n \"split_config\": {\n \"splits\": [\n {\n \"hash_buckets\": 2,\n \"name\": \"train\"\n },\n {\n \"hash_buckets\": 1,\n \"name\": \"eval\"\n }\n ]\n }\n}" } } } parameters { key: "output_data_format" value { field_value { int_value: 6 } } } parameters { key: "output_file_format" value { field_value { int_value: 5 } } } } downstream_nodes: "StatisticsGen" execution_options { caching_options { } } , pipeline_info=id: "my_pipeline" , pipeline_run_id='2022-02-03T11:10:00.469382') INFO:absl:Generating examples. INFO:absl:Processing input csv data /home/kbuilder/imported/my_pipeline/data/* to TFExample. WARNING:root:Make sure that locally built Python SDK docker image has Python 3.7 interpreter. E0203 11:10:01.173700221 5436 fork_posix.cc:70] Fork support is only compatible with the epoll1 and poll polling strategies WARNING:apache_beam.io.tfrecordio:Couldn't find python-snappy so the implementation of _TFRecordUtil._masked_crc32c is not as fast as it could be. INFO:absl:Examples generated. INFO:absl:Value type <class 'NoneType'> of key version in exec_properties is not supported, going to drop it INFO:absl:Value type <class 'list'> of key _beam_pipeline_args in exec_properties is not supported, going to drop it INFO:absl:Cleaning up stateless execution info. INFO:absl:Execution 7 succeeded. INFO:absl:Cleaning up stateful execution info. INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'examples': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/7" custom_properties { key: "input_fingerprint" value { string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522" } } custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:10:00.469382:CsvExampleGen:examples:0" } } custom_properties { key: "span" value { int_value: 0 } } custom_properties { key: "tfx_version" value { string_value: "1.6.0" } } , artifact_type: name: "Examples" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } properties { key: "version" value: INT } base_type: DATASET )]}) for execution 7 INFO:absl:MetadataStore with DB connection initialized INFO:absl:Component CsvExampleGen is finished. INFO:absl:Component StatisticsGen is running. INFO:absl:Running launcher for node_info { type { name: "tfx.components.statistics_gen.component.StatisticsGen" base_type: PROCESS } id: "StatisticsGen" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:10:00.469382" } } } contexts { type { name: "node" } name { field_value { string_value: "my_pipeline.StatisticsGen" } } } } inputs { inputs { key: "examples" value { channels { producer_node_query { id: "CsvExampleGen" } context_queries { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } context_queries { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:10:00.469382" } } } context_queries { type { name: "node" } name { field_value { string_value: "my_pipeline.CsvExampleGen" } } } artifact_query { type { name: "Examples" base_type: DATASET } } output_key: "examples" } min_count: 1 } } } outputs { outputs { key: "statistics" value { artifact_spec { type { name: "ExampleStatistics" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } base_type: STATISTICS } } } } } parameters { parameters { key: "exclude_splits" value { field_value { string_value: "[]" } } } } upstream_nodes: "CsvExampleGen" downstream_nodes: "SchemaGen" execution_options { caching_options { } } INFO:absl:MetadataStore with DB connection initialized INFO:absl:MetadataStore with DB connection initialized INFO:absl:Going to run a new execution 8 INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=8, input_dict={'examples': [Artifact(artifact: id: 7 type_id: 15 uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/7" properties { key: "split_names" value { string_value: "[\"train\", \"eval\"]" } } custom_properties { key: "file_format" value { string_value: "tfrecords_gzip" } } custom_properties { key: "input_fingerprint" value { string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522" } } custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:10:00.469382:CsvExampleGen:examples:0" } } custom_properties { key: "payload_format" value { string_value: "FORMAT_TF_EXAMPLE" } } custom_properties { key: "span" value { int_value: 0 } } custom_properties { key: "tfx_version" value { string_value: "1.6.0" } } state: LIVE create_time_since_epoch: 1643886601629 last_update_time_since_epoch: 1643886601629 , artifact_type: id: 15 name: "Examples" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } properties { key: "version" value: INT } base_type: DATASET )]}, output_dict=defaultdict(<class 'list'>, {'statistics': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/8" custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:10:00.469382:StatisticsGen:statistics:0" } } , artifact_type: name: "ExampleStatistics" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } base_type: STATISTICS )]}), exec_properties={'exclude_splits': '[]'}, execution_output_uri='./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/executor_execution/8/executor_output.pb', stateful_working_dir='./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/stateful_working_dir/2022-02-03T11:10:00.469382', tmp_dir='./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/executor_execution/8/.temp/', pipeline_node=node_info { type { name: "tfx.components.statistics_gen.component.StatisticsGen" base_type: PROCESS } id: "StatisticsGen" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:10:00.469382" } } } contexts { type { name: "node" } name { field_value { string_value: "my_pipeline.StatisticsGen" } } } } inputs { inputs { key: "examples" value { channels { producer_node_query { id: "CsvExampleGen" } context_queries { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } context_queries { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:10:00.469382" } } } context_queries { type { name: "node" } name { field_value { string_value: "my_pipeline.CsvExampleGen" } } } artifact_query { type { name: "Examples" base_type: DATASET } } output_key: "examples" } min_count: 1 } } } outputs { outputs { key: "statistics" value { artifact_spec { type { name: "ExampleStatistics" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } base_type: STATISTICS } } } } } parameters { parameters { key: "exclude_splits" value { field_value { string_value: "[]" } } } } upstream_nodes: "CsvExampleGen" downstream_nodes: "SchemaGen" execution_options { caching_options { } } , pipeline_info=id: "my_pipeline" , pipeline_run_id='2022-02-03T11:10:00.469382') INFO:absl:Generating statistics for split train. INFO:absl:Statistics for split train written to ./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/8/Split-train. INFO:absl:Generating statistics for split eval. INFO:absl:Statistics for split eval written to ./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/8/Split-eval. WARNING:root:Make sure that locally built Python SDK docker image has Python 3.7 interpreter. INFO:absl:Cleaning up stateless execution info. INFO:absl:Execution 8 succeeded. INFO:absl:Cleaning up stateful execution info. INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'statistics': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/8" custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:10:00.469382:StatisticsGen:statistics:0" } } custom_properties { key: "tfx_version" value { string_value: "1.6.0" } } , artifact_type: name: "ExampleStatistics" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } base_type: STATISTICS )]}) for execution 8 INFO:absl:MetadataStore with DB connection initialized INFO:absl:Component StatisticsGen is finished. INFO:absl:Component SchemaGen is running. INFO:absl:Running launcher for node_info { type { name: "tfx.components.schema_gen.component.SchemaGen" base_type: PROCESS } id: "SchemaGen" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:10:00.469382" } } } contexts { type { name: "node" } name { field_value { string_value: "my_pipeline.SchemaGen" } } } } inputs { inputs { key: "statistics" value { channels { producer_node_query { id: "StatisticsGen" } context_queries { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } context_queries { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:10:00.469382" } } } context_queries { type { name: "node" } name { field_value { string_value: "my_pipeline.StatisticsGen" } } } artifact_query { type { name: "ExampleStatistics" base_type: STATISTICS } } output_key: "statistics" } min_count: 1 } } } outputs { outputs { key: "schema" value { artifact_spec { type { name: "Schema" } } } } } parameters { parameters { key: "exclude_splits" value { field_value { string_value: "[]" } } } parameters { key: "infer_feature_shape" value { field_value { int_value: 1 } } } } upstream_nodes: "StatisticsGen" execution_options { caching_options { } } INFO:absl:MetadataStore with DB connection initialized INFO:absl:MetadataStore with DB connection initialized INFO:absl:Going to run a new execution 9 INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=9, input_dict={'statistics': [Artifact(artifact: id: 8 type_id: 17 uri: "./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/8" properties { key: "split_names" value { string_value: "[\"train\", \"eval\"]" } } custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:10:00.469382:StatisticsGen:statistics:0" } } custom_properties { key: "tfx_version" value { string_value: "1.6.0" } } state: LIVE create_time_since_epoch: 1643886605023 last_update_time_since_epoch: 1643886605023 , artifact_type: id: 17 name: "ExampleStatistics" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } base_type: STATISTICS )]}, output_dict=defaultdict(<class 'list'>, {'schema': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/SchemaGen/schema/9" custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:10:00.469382:SchemaGen:schema:0" } } , artifact_type: name: "Schema" )]}), exec_properties={'exclude_splits': '[]', 'infer_feature_shape': 1}, execution_output_uri='./tfx_pipeline_output/my_pipeline/SchemaGen/.system/executor_execution/9/executor_output.pb', stateful_working_dir='./tfx_pipeline_output/my_pipeline/SchemaGen/.system/stateful_working_dir/2022-02-03T11:10:00.469382', tmp_dir='./tfx_pipeline_output/my_pipeline/SchemaGen/.system/executor_execution/9/.temp/', pipeline_node=node_info { type { name: "tfx.components.schema_gen.component.SchemaGen" base_type: PROCESS } id: "SchemaGen" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:10:00.469382" } } } contexts { type { name: "node" } name { field_value { string_value: "my_pipeline.SchemaGen" } } } } inputs { inputs { key: "statistics" value { channels { producer_node_query { id: "StatisticsGen" } context_queries { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } context_queries { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:10:00.469382" } } } context_queries { type { name: "node" } name { field_value { string_value: "my_pipeline.StatisticsGen" } } } artifact_query { type { name: "ExampleStatistics" base_type: STATISTICS } } output_key: "statistics" } min_count: 1 } } } outputs { outputs { key: "schema" value { artifact_spec { type { name: "Schema" } } } } } parameters { parameters { key: "exclude_splits" value { field_value { string_value: "[]" } } } parameters { key: "infer_feature_shape" value { field_value { int_value: 1 } } } } upstream_nodes: "StatisticsGen" execution_options { caching_options { } } , pipeline_info=id: "my_pipeline" , pipeline_run_id='2022-02-03T11:10:00.469382') INFO:absl:Processing schema from statistics for split train. INFO:absl:Processing schema from statistics for split eval. INFO:absl:Schema written to ./tfx_pipeline_output/my_pipeline/SchemaGen/schema/9/schema.pbtxt. INFO:absl:Cleaning up stateless execution info. INFO:absl:Execution 9 succeeded. INFO:absl:Cleaning up stateful execution info. INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'schema': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/SchemaGen/schema/9" custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:10:00.469382:SchemaGen:schema:0" } } custom_properties { key: "tfx_version" value { string_value: "1.6.0" } } , artifact_type: name: "Schema" )]}) for execution 9 INFO:absl:MetadataStore with DB connection initialized INFO:absl:Component SchemaGen is finished.プレースホルダー28
この実行が正常に実行されると、モデルの最初のTFXパイプラインが作成されて実行されます。おめでとう!
新しいモデルは出力ディレクトリの下のどこかに配置されますが、多くの中間結果を保持するTFXパイプラインの外部の固定された場所またはサービスにモデルを配置することをお勧めします。 ML本番システムで重要な構築モデルを継続的に評価することでさらに改善されます。次のステップでは、TFXで継続的な評価と展開がどのように機能するかを確認します。
手順5.(オプション)Evaluatorを使用してモデルを評価し、プッシャーを使用して公開します。
Evaluator
コンポーネントは、 Trainer
から構築されたすべてのモデルを継続的に評価し、 Pusher
はモデルをファイルシステム内の事前定義された場所、またはGoogle CloudAIプラットフォームモデルにコピーします。
Evaluatorコンポーネントをパイプラインに追加します。
pipeline/pipeline.py
ファイル:
-
# components.append(model_resolver)
のコメントを外して、パイプラインに最新のモデルリゾルバーを追加します。 Evaluatorを使用して、最後のパイプライン実行でEvaluatorを通過した古いベースラインモデルとモデルを比較できます。LatestBlessedModelResolver
は、Evaluatorを通過した最新のモデルを検索します。 - モデルに適切な
tfma.MetricsSpec
を設定します。評価はMLモデルごとに異なる場合があります。ペンギンテンプレートでは、マルチカテゴリ分類問題を解決しているため、SparseCategoricalAccuracy
が使用されました。また、特定のスライスについてモデルを分析するには、tfma.SliceSpec
を指定する必要があります。詳細については、 Evaluatorコンポーネントガイドを参照してください。 -
# components.append(evaluator)
のコメントを外して、コンポーネントをパイプラインに追加します。
パイプラインを更新して、再度実行できます。
# Update and run the pipeline.
!tfx pipeline update --engine=local --pipeline_path=local_runner.py \
&& tfx run create --engine=local --pipeline_name={PIPELINE_NAME}
CLI Updating pipeline INFO:absl:Excluding no splits because exclude_splits is not set. INFO:absl:Excluding no splits because exclude_splits is not set. Pipeline "my_pipeline" updated successfully. CLI Creating a run for pipeline: my_pipeline INFO:absl:Excluding no splits because exclude_splits is not set. INFO:absl:Excluding no splits because exclude_splits is not set. INFO:absl:Using deployment config: executor_specs { key: "CsvExampleGen" value { beam_executable_spec { python_executor_spec { class_path: "tfx.components.example_gen.csv_example_gen.executor.Executor" } } } } executor_specs { key: "SchemaGen" value { python_class_executable_spec { class_path: "tfx.components.schema_gen.executor.Executor" } } } executor_specs { key: "StatisticsGen" value { beam_executable_spec { python_executor_spec { class_path: "tfx.components.statistics_gen.executor.Executor" } } } } custom_driver_specs { key: "CsvExampleGen" value { python_class_executable_spec { class_path: "tfx.components.example_gen.driver.FileBasedDriver" } } } metadata_connection_config { database_connection_config { sqlite { filename_uri: "./tfx_metadata/my_pipeline/metadata.db" connection_mode: READWRITE_OPENCREATE } } } INFO:absl:Using connection config: sqlite { filename_uri: "./tfx_metadata/my_pipeline/metadata.db" connection_mode: READWRITE_OPENCREATE } INFO:absl:Component CsvExampleGen is running. INFO:absl:Running launcher for node_info { type { name: "tfx.components.example_gen.csv_example_gen.component.CsvExampleGen" } id: "CsvExampleGen" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:10:24.358660" } } } contexts { type { name: "node" } name { field_value { string_value: "my_pipeline.CsvExampleGen" } } } } outputs { outputs { key: "examples" value { artifact_spec { type { name: "Examples" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } properties { key: "version" value: INT } base_type: DATASET } } } } } parameters { parameters { key: "input_base" value { field_value { string_value: "/home/kbuilder/imported/my_pipeline/data" } } } parameters { key: "input_config" value { field_value { string_value: "{\n \"splits\": [\n {\n \"name\": \"single_split\",\n \"pattern\": \"*\"\n }\n ]\n}" } } } parameters { key: "output_config" value { field_value { string_value: "{\n \"split_config\": {\n \"splits\": [\n {\n \"hash_buckets\": 2,\n \"name\": \"train\"\n },\n {\n \"hash_buckets\": 1,\n \"name\": \"eval\"\n }\n ]\n }\n}" } } } parameters { key: "output_data_format" value { field_value { int_value: 6 } } } parameters { key: "output_file_format" value { field_value { int_value: 5 } } } } downstream_nodes: "StatisticsGen" execution_options { caching_options { } } INFO:absl:MetadataStore with DB connection initialized INFO:absl:select span and version = (0, None) INFO:absl:latest span and version = (0, None) INFO:absl:MetadataStore with DB connection initialized INFO:absl:Going to run a new execution 10 INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=10, input_dict={}, output_dict=defaultdict(<class 'list'>, {'examples': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/10" custom_properties { key: "input_fingerprint" value { string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522" } } custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:10:24.358660:CsvExampleGen:examples:0" } } custom_properties { key: "span" value { int_value: 0 } } , artifact_type: name: "Examples" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } properties { key: "version" value: INT } base_type: DATASET )]}), exec_properties={'output_file_format': 5, 'output_config': '{\n "split_config": {\n "splits": [\n {\n "hash_buckets": 2,\n "name": "train"\n },\n {\n "hash_buckets": 1,\n "name": "eval"\n }\n ]\n }\n}', 'output_data_format': 6, 'input_base': '/home/kbuilder/imported/my_pipeline/data', 'input_config': '{\n "splits": [\n {\n "name": "single_split",\n "pattern": "*"\n }\n ]\n}', 'span': 0, 'version': None, 'input_fingerprint': 'split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522'}, execution_output_uri='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/executor_execution/10/executor_output.pb', stateful_working_dir='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/stateful_working_dir/2022-02-03T11:10:24.358660', tmp_dir='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/executor_execution/10/.temp/', pipeline_node=node_info { type { name: "tfx.components.example_gen.csv_example_gen.component.CsvExampleGen" } id: "CsvExampleGen" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:10:24.358660" } } } contexts { type { name: "node" } name { field_value { string_value: "my_pipeline.CsvExampleGen" } } } } outputs { outputs { key: "examples" value { artifact_spec { type { name: "Examples" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } properties { key: "version" value: INT } base_type: DATASET } } } } } parameters { parameters { key: "input_base" value { field_value { string_value: "/home/kbuilder/imported/my_pipeline/data" } } } parameters { key: "input_config" value { field_value { string_value: "{\n \"splits\": [\n {\n \"name\": \"single_split\",\n \"pattern\": \"*\"\n }\n ]\n}" } } } parameters { key: "output_config" value { field_value { string_value: "{\n \"split_config\": {\n \"splits\": [\n {\n \"hash_buckets\": 2,\n \"name\": \"train\"\n },\n {\n \"hash_buckets\": 1,\n \"name\": \"eval\"\n }\n ]\n }\n}" } } } parameters { key: "output_data_format" value { field_value { int_value: 6 } } } parameters { key: "output_file_format" value { field_value { int_value: 5 } } } } downstream_nodes: "StatisticsGen" execution_options { caching_options { } } , pipeline_info=id: "my_pipeline" , pipeline_run_id='2022-02-03T11:10:24.358660') INFO:absl:Generating examples. INFO:absl:Processing input csv data /home/kbuilder/imported/my_pipeline/data/* to TFExample. WARNING:root:Make sure that locally built Python SDK docker image has Python 3.7 interpreter. E0203 11:10:24.894390124 5584 fork_posix.cc:70] Fork support is only compatible with the epoll1 and poll polling strategies WARNING:apache_beam.io.tfrecordio:Couldn't find python-snappy so the implementation of _TFRecordUtil._masked_crc32c is not as fast as it could be. INFO:absl:Examples generated. INFO:absl:Value type <class 'NoneType'> of key version in exec_properties is not supported, going to drop it INFO:absl:Value type <class 'list'> of key _beam_pipeline_args in exec_properties is not supported, going to drop it INFO:absl:Cleaning up stateless execution info. INFO:absl:Execution 10 succeeded. INFO:absl:Cleaning up stateful execution info. INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'examples': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/10" custom_properties { key: "input_fingerprint" value { string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522" } } custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:10:24.358660:CsvExampleGen:examples:0" } } custom_properties { key: "span" value { int_value: 0 } } custom_properties { key: "tfx_version" value { string_value: "1.6.0" } } , artifact_type: name: "Examples" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } properties { key: "version" value: INT } base_type: DATASET )]}) for execution 10 INFO:absl:MetadataStore with DB connection initialized INFO:absl:Component CsvExampleGen is finished. INFO:absl:Component StatisticsGen is running. INFO:absl:Running launcher for node_info { type { name: "tfx.components.statistics_gen.component.StatisticsGen" base_type: PROCESS } id: "StatisticsGen" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:10:24.358660" } } } contexts { type { name: "node" } name { field_value { string_value: "my_pipeline.StatisticsGen" } } } } inputs { inputs { key: "examples" value { channels { producer_node_query { id: "CsvExampleGen" } context_queries { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } context_queries { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:10:24.358660" } } } context_queries { type { name: "node" } name { field_value { string_value: "my_pipeline.CsvExampleGen" } } } artifact_query { type { name: "Examples" base_type: DATASET } } output_key: "examples" } min_count: 1 } } } outputs { outputs { key: "statistics" value { artifact_spec { type { name: "ExampleStatistics" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } base_type: STATISTICS } } } } } parameters { parameters { key: "exclude_splits" value { field_value { string_value: "[]" } } } } upstream_nodes: "CsvExampleGen" downstream_nodes: "SchemaGen" execution_options { caching_options { } } INFO:absl:MetadataStore with DB connection initialized INFO:absl:MetadataStore with DB connection initialized INFO:absl:Going to run a new execution 11 INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=11, input_dict={'examples': [Artifact(artifact: id: 10 type_id: 15 uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/10" properties { key: "split_names" value { string_value: "[\"train\", \"eval\"]" } } custom_properties { key: "file_format" value { string_value: "tfrecords_gzip" } } custom_properties { key: "input_fingerprint" value { string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522" } } custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:10:24.358660:CsvExampleGen:examples:0" } } custom_properties { key: "payload_format" value { string_value: "FORMAT_TF_EXAMPLE" } } custom_properties { key: "span" value { int_value: 0 } } custom_properties { key: "tfx_version" value { string_value: "1.6.0" } } state: LIVE create_time_since_epoch: 1643886625515 last_update_time_since_epoch: 1643886625515 , artifact_type: id: 15 name: "Examples" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } properties { key: "version" value: INT } base_type: DATASET )]}, output_dict=defaultdict(<class 'list'>, {'statistics': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/11" custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:10:24.358660:StatisticsGen:statistics:0" } } , artifact_type: name: "ExampleStatistics" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } base_type: STATISTICS )]}), exec_properties={'exclude_splits': '[]'}, execution_output_uri='./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/executor_execution/11/executor_output.pb', stateful_working_dir='./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/stateful_working_dir/2022-02-03T11:10:24.358660', tmp_dir='./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/executor_execution/11/.temp/', pipeline_node=node_info { type { name: "tfx.components.statistics_gen.component.StatisticsGen" base_type: PROCESS } id: "StatisticsGen" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:10:24.358660" } } } contexts { type { name: "node" } name { field_value { string_value: "my_pipeline.StatisticsGen" } } } } inputs { inputs { key: "examples" value { channels { producer_node_query { id: "CsvExampleGen" } context_queries { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } context_queries { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:10:24.358660" } } } context_queries { type { name: "node" } name { field_value { string_value: "my_pipeline.CsvExampleGen" } } } artifact_query { type { name: "Examples" base_type: DATASET } } output_key: "examples" } min_count: 1 } } } outputs { outputs { key: "statistics" value { artifact_spec { type { name: "ExampleStatistics" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } base_type: STATISTICS } } } } } parameters { parameters { key: "exclude_splits" value { field_value { string_value: "[]" } } } } upstream_nodes: "CsvExampleGen" downstream_nodes: "SchemaGen" execution_options { caching_options { } } , pipeline_info=id: "my_pipeline" , pipeline_run_id='2022-02-03T11:10:24.358660') INFO:absl:Generating statistics for split train. INFO:absl:Statistics for split train written to ./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/11/Split-train. INFO:absl:Generating statistics for split eval. INFO:absl:Statistics for split eval written to ./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/11/Split-eval. WARNING:root:Make sure that locally built Python SDK docker image has Python 3.7 interpreter. INFO:absl:Cleaning up stateless execution info. INFO:absl:Execution 11 succeeded. INFO:absl:Cleaning up stateful execution info. INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'statistics': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/11" custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:10:24.358660:StatisticsGen:statistics:0" } } custom_properties { key: "tfx_version" value { string_value: "1.6.0" } } , artifact_type: name: "ExampleStatistics" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } base_type: STATISTICS )]}) for execution 11 INFO:absl:MetadataStore with DB connection initialized INFO:absl:Component StatisticsGen is finished. INFO:absl:Component SchemaGen is running. INFO:absl:Running launcher for node_info { type { name: "tfx.components.schema_gen.component.SchemaGen" base_type: PROCESS } id: "SchemaGen" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:10:24.358660" } } } contexts { type { name: "node" } name { field_value { string_value: "my_pipeline.SchemaGen" } } } } inputs { inputs { key: "statistics" value { channels { producer_node_query { id: "StatisticsGen" } context_queries { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } context_queries { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:10:24.358660" } } } context_queries { type { name: "node" } name { field_value { string_value: "my_pipeline.StatisticsGen" } } } artifact_query { type { name: "ExampleStatistics" base_type: STATISTICS } } output_key: "statistics" } min_count: 1 } } } outputs { outputs { key: "schema" value { artifact_spec { type { name: "Schema" } } } } } parameters { parameters { key: "exclude_splits" value { field_value { string_value: "[]" } } } parameters { key: "infer_feature_shape" value { field_value { int_value: 1 } } } } upstream_nodes: "StatisticsGen" execution_options { caching_options { } } INFO:absl:MetadataStore with DB connection initialized INFO:absl:MetadataStore with DB connection initialized INFO:absl:Going to run a new execution 12 INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=12, input_dict={'statistics': [Artifact(artifact: id: 11 type_id: 17 uri: "./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/11" properties { key: "split_names" value { string_value: "[\"train\", \"eval\"]" } } custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:10:24.358660:StatisticsGen:statistics:0" } } custom_properties { key: "tfx_version" value { string_value: "1.6.0" } } state: LIVE create_time_since_epoch: 1643886628941 last_update_time_since_epoch: 1643886628941 , artifact_type: id: 17 name: "ExampleStatistics" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } base_type: STATISTICS )]}, output_dict=defaultdict(<class 'list'>, {'schema': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/SchemaGen/schema/12" custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:10:24.358660:SchemaGen:schema:0" } } , artifact_type: name: "Schema" )]}), exec_properties={'infer_feature_shape': 1, 'exclude_splits': '[]'}, execution_output_uri='./tfx_pipeline_output/my_pipeline/SchemaGen/.system/executor_execution/12/executor_output.pb', stateful_working_dir='./tfx_pipeline_output/my_pipeline/SchemaGen/.system/stateful_working_dir/2022-02-03T11:10:24.358660', tmp_dir='./tfx_pipeline_output/my_pipeline/SchemaGen/.system/executor_execution/12/.temp/', pipeline_node=node_info { type { name: "tfx.components.schema_gen.component.SchemaGen" base_type: PROCESS } id: "SchemaGen" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:10:24.358660" } } } contexts { type { name: "node" } name { field_value { string_value: "my_pipeline.SchemaGen" } } } } inputs { inputs { key: "statistics" value { channels { producer_node_query { id: "StatisticsGen" } context_queries { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } context_queries { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:10:24.358660" } } } context_queries { type { name: "node" } name { field_value { string_value: "my_pipeline.StatisticsGen" } } } artifact_query { type { name: "ExampleStatistics" base_type: STATISTICS } } output_key: "statistics" } min_count: 1 } } } outputs { outputs { key: "schema" value { artifact_spec { type { name: "Schema" } } } } } parameters { parameters { key: "exclude_splits" value { field_value { string_value: "[]" } } } parameters { key: "infer_feature_shape" value { field_value { int_value: 1 } } } } upstream_nodes: "StatisticsGen" execution_options { caching_options { } } , pipeline_info=id: "my_pipeline" , pipeline_run_id='2022-02-03T11:10:24.358660') INFO:absl:Processing schema from statistics for split train. INFO:absl:Processing schema from statistics for split eval. INFO:absl:Schema written to ./tfx_pipeline_output/my_pipeline/SchemaGen/schema/12/schema.pbtxt. INFO:absl:Cleaning up stateless execution info. INFO:absl:Execution 12 succeeded. INFO:absl:Cleaning up stateful execution info. INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'schema': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/SchemaGen/schema/12" custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:10:24.358660:SchemaGen:schema:0" } } custom_properties { key: "tfx_version" value { string_value: "1.6.0" } } , artifact_type: name: "Schema" )]}) for execution 12 INFO:absl:MetadataStore with DB connection initialized INFO:absl:Component SchemaGen is finished.
Evaluatorの出力を調べます
この手順には、TensorFlow Model Analysis(TFMA)Jupyterノートブック拡張機能が必要です。 TFMAノートブック拡張機能のバージョンは、TFMApythonパッケージのバージョンと同じである必要があることに注意してください。
次のコマンドは、NPMレジストリからTFMAノートブック拡張機能をインストールします。完了するまでに数分かかる場合があります。
# Install TFMA notebook extension.
jupyter labextension install tensorflow_model_analysis@{tfma.__version__}
usage: jupyter [-h] [--version] [--config-dir] [--data-dir] [--runtime-dir] [--paths] [--json] [--debug] [subcommand] Jupyter: Interactive Computing positional arguments: subcommand the subcommand to launch optional arguments: -h, --help show this help message and exit --version show the versions of core jupyter packages and exit --config-dir show Jupyter config dir --data-dir show Jupyter data dir --runtime-dir show Jupyter runtime dir --paths show all Jupyter paths. Add --json for machine-readable format. --json output paths as machine-readable json --debug output debug information about paths Available subcommands: bundlerextension console dejavu execute kernel kernelspec migrate nbconvert nbextension notebook qtconsole run serverextension troubleshoot trust Jupyter command `jupyter-labextension` not found.
インストールが完了したら、ブラウザをリロードして拡張機能を有効にしてください。
with metadata.Metadata(metadata_connection_config) as metadata_handler:
# Search all aritfacts from the previous pipeline run.
artifacts = get_latest_artifacts(metadata_handler.store, PIPELINE_NAME)
model_evaluation_artifacts = find_latest_artifacts_by_type(
metadata_handler.store, artifacts,
standard_artifacts.ModelEvaluation.TYPE_NAME)
if model_evaluation_artifacts:
tfma_result = tfma.load_eval_result(model_evaluation_artifacts[0].uri)
tfma.view.render_slicing_metrics(tfma_result)
プッシャーコンポーネントをパイプラインに追加します。
モデルが有望に見える場合は、モデルを公開する必要があります。プッシャーコンポーネントは、カスタムエグゼキューターを使用して、モデルをファイルシステム内の場所またはGCPAIプラットフォームモデルに公開できます。
Evaluator
コンポーネントは、 Trainer
から構築されたすべてのモデルを継続的に評価し、 Pusher
はモデルをファイルシステム内の事前定義された場所、またはGoogle CloudAIプラットフォームモデルにコピーします。
-
local_runner.py
で、SERVING_MODEL_DIR
を公開するディレクトリに設定します。 -
pipeline/pipeline.py
ファイルで、# components.append(pusher)
のコメントを解除して、Pusherをパイプラインに追加します。
パイプラインを更新して、再度実行できます。
# Update and run the pipeline.
!tfx pipeline update --engine=local --pipeline_path=local_runner.py \
&& tfx run create --engine=local --pipeline_name={PIPELINE_NAME}
CLI Updating pipeline INFO:absl:Excluding no splits because exclude_splits is not set. INFO:absl:Excluding no splits because exclude_splits is not set. Pipeline "my_pipeline" updated successfully. CLI Creating a run for pipeline: my_pipeline INFO:absl:Excluding no splits because exclude_splits is not set. INFO:absl:Excluding no splits because exclude_splits is not set. INFO:absl:Using deployment config: executor_specs { key: "CsvExampleGen" value { beam_executable_spec { python_executor_spec { class_path: "tfx.components.example_gen.csv_example_gen.executor.Executor" } } } } executor_specs { key: "SchemaGen" value { python_class_executable_spec { class_path: "tfx.components.schema_gen.executor.Executor" } } } executor_specs { key: "StatisticsGen" value { beam_executable_spec { python_executor_spec { class_path: "tfx.components.statistics_gen.executor.Executor" } } } } custom_driver_specs { key: "CsvExampleGen" value { python_class_executable_spec { class_path: "tfx.components.example_gen.driver.FileBasedDriver" } } } metadata_connection_config { database_connection_config { sqlite { filename_uri: "./tfx_metadata/my_pipeline/metadata.db" connection_mode: READWRITE_OPENCREATE } } } INFO:absl:Using connection config: sqlite { filename_uri: "./tfx_metadata/my_pipeline/metadata.db" connection_mode: READWRITE_OPENCREATE } INFO:absl:Component CsvExampleGen is running. INFO:absl:Running launcher for node_info { type { name: "tfx.components.example_gen.csv_example_gen.component.CsvExampleGen" } id: "CsvExampleGen" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:10:48.556314" } } } contexts { type { name: "node" } name { field_value { string_value: "my_pipeline.CsvExampleGen" } } } } outputs { outputs { key: "examples" value { artifact_spec { type { name: "Examples" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } properties { key: "version" value: INT } base_type: DATASET } } } } } parameters { parameters { key: "input_base" value { field_value { string_value: "/home/kbuilder/imported/my_pipeline/data" } } } parameters { key: "input_config" value { field_value { string_value: "{\n \"splits\": [\n {\n \"name\": \"single_split\",\n \"pattern\": \"*\"\n }\n ]\n}" } } } parameters { key: "output_config" value { field_value { string_value: "{\n \"split_config\": {\n \"splits\": [\n {\n \"hash_buckets\": 2,\n \"name\": \"train\"\n },\n {\n \"hash_buckets\": 1,\n \"name\": \"eval\"\n }\n ]\n }\n}" } } } parameters { key: "output_data_format" value { field_value { int_value: 6 } } } parameters { key: "output_file_format" value { field_value { int_value: 5 } } } } downstream_nodes: "StatisticsGen" execution_options { caching_options { } } INFO:absl:MetadataStore with DB connection initialized INFO:absl:select span and version = (0, None) INFO:absl:latest span and version = (0, None) INFO:absl:MetadataStore with DB connection initialized INFO:absl:Going to run a new execution 13 INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=13, input_dict={}, output_dict=defaultdict(<class 'list'>, {'examples': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/13" custom_properties { key: "input_fingerprint" value { string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522" } } custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:10:48.556314:CsvExampleGen:examples:0" } } custom_properties { key: "span" value { int_value: 0 } } , artifact_type: name: "Examples" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } properties { key: "version" value: INT } base_type: DATASET )]}), exec_properties={'input_config': '{\n "splits": [\n {\n "name": "single_split",\n "pattern": "*"\n }\n ]\n}', 'output_config': '{\n "split_config": {\n "splits": [\n {\n "hash_buckets": 2,\n "name": "train"\n },\n {\n "hash_buckets": 1,\n "name": "eval"\n }\n ]\n }\n}', 'output_file_format': 5, 'output_data_format': 6, 'input_base': '/home/kbuilder/imported/my_pipeline/data', 'span': 0, 'version': None, 'input_fingerprint': 'split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522'}, execution_output_uri='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/executor_execution/13/executor_output.pb', stateful_working_dir='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/stateful_working_dir/2022-02-03T11:10:48.556314', tmp_dir='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/executor_execution/13/.temp/', pipeline_node=node_info { type { name: "tfx.components.example_gen.csv_example_gen.component.CsvExampleGen" } id: "CsvExampleGen" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:10:48.556314" } } } contexts { type { name: "node" } name { field_value { string_value: "my_pipeline.CsvExampleGen" } } } } outputs { outputs { key: "examples" value { artifact_spec { type { name: "Examples" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } properties { key: "version" value: INT } base_type: DATASET } } } } } parameters { parameters { key: "input_base" value { field_value { string_value: "/home/kbuilder/imported/my_pipeline/data" } } } parameters { key: "input_config" value { field_value { string_value: "{\n \"splits\": [\n {\n \"name\": \"single_split\",\n \"pattern\": \"*\"\n }\n ]\n}" } } } parameters { key: "output_config" value { field_value { string_value: "{\n \"split_config\": {\n \"splits\": [\n {\n \"hash_buckets\": 2,\n \"name\": \"train\"\n },\n {\n \"hash_buckets\": 1,\n \"name\": \"eval\"\n }\n ]\n }\n}" } } } parameters { key: "output_data_format" value { field_value { int_value: 6 } } } parameters { key: "output_file_format" value { field_value { int_value: 5 } } } } downstream_nodes: "StatisticsGen" execution_options { caching_options { } } , pipeline_info=id: "my_pipeline" , pipeline_run_id='2022-02-03T11:10:48.556314') INFO:absl:Generating examples. INFO:absl:Processing input csv data /home/kbuilder/imported/my_pipeline/data/* to TFExample. WARNING:root:Make sure that locally built Python SDK docker image has Python 3.7 interpreter. E0203 11:10:49.163841363 5734 fork_posix.cc:70] Fork support is only compatible with the epoll1 and poll polling strategies WARNING:apache_beam.io.tfrecordio:Couldn't find python-snappy so the implementation of _TFRecordUtil._masked_crc32c is not as fast as it could be. INFO:absl:Examples generated. INFO:absl:Value type <class 'NoneType'> of key version in exec_properties is not supported, going to drop it INFO:absl:Value type <class 'list'> of key _beam_pipeline_args in exec_properties is not supported, going to drop it INFO:absl:Cleaning up stateless execution info. INFO:absl:Execution 13 succeeded. INFO:absl:Cleaning up stateful execution info. INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'examples': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/13" custom_properties { key: "input_fingerprint" value { string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522" } } custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:10:48.556314:CsvExampleGen:examples:0" } } custom_properties { key: "span" value { int_value: 0 } } custom_properties { key: "tfx_version" value { string_value: "1.6.0" } } , artifact_type: name: "Examples" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } properties { key: "version" value: INT } base_type: DATASET )]}) for execution 13 INFO:absl:MetadataStore with DB connection initialized INFO:absl:Component CsvExampleGen is finished. INFO:absl:Component StatisticsGen is running. INFO:absl:Running launcher for node_info { type { name: "tfx.components.statistics_gen.component.StatisticsGen" base_type: PROCESS } id: "StatisticsGen" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:10:48.556314" } } } contexts { type { name: "node" } name { field_value { string_value: "my_pipeline.StatisticsGen" } } } } inputs { inputs { key: "examples" value { channels { producer_node_query { id: "CsvExampleGen" } context_queries { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } context_queries { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:10:48.556314" } } } context_queries { type { name: "node" } name { field_value { string_value: "my_pipeline.CsvExampleGen" } } } artifact_query { type { name: "Examples" base_type: DATASET } } output_key: "examples" } min_count: 1 } } } outputs { outputs { key: "statistics" value { artifact_spec { type { name: "ExampleStatistics" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } base_type: STATISTICS } } } } } parameters { parameters { key: "exclude_splits" value { field_value { string_value: "[]" } } } } upstream_nodes: "CsvExampleGen" downstream_nodes: "SchemaGen" execution_options { caching_options { } } INFO:absl:MetadataStore with DB connection initialized INFO:absl:MetadataStore with DB connection initialized INFO:absl:Going to run a new execution 14 INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=14, input_dict={'examples': [Artifact(artifact: id: 13 type_id: 15 uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/13" properties { key: "split_names" value { string_value: "[\"train\", \"eval\"]" } } custom_properties { key: "file_format" value { string_value: "tfrecords_gzip" } } custom_properties { key: "input_fingerprint" value { string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522" } } custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:10:48.556314:CsvExampleGen:examples:0" } } custom_properties { key: "payload_format" value { string_value: "FORMAT_TF_EXAMPLE" } } custom_properties { key: "span" value { int_value: 0 } } custom_properties { key: "tfx_version" value { string_value: "1.6.0" } } state: LIVE create_time_since_epoch: 1643886649739 last_update_time_since_epoch: 1643886649739 , artifact_type: id: 15 name: "Examples" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } properties { key: "version" value: INT } base_type: DATASET )]}, output_dict=defaultdict(<class 'list'>, {'statistics': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/14" custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:10:48.556314:StatisticsGen:statistics:0" } } , artifact_type: name: "ExampleStatistics" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } base_type: STATISTICS )]}), exec_properties={'exclude_splits': '[]'}, execution_output_uri='./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/executor_execution/14/executor_output.pb', stateful_working_dir='./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/stateful_working_dir/2022-02-03T11:10:48.556314', tmp_dir='./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/executor_execution/14/.temp/', pipeline_node=node_info { type { name: "tfx.components.statistics_gen.component.StatisticsGen" base_type: PROCESS } id: "StatisticsGen" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:10:48.556314" } } } contexts { type { name: "node" } name { field_value { string_value: "my_pipeline.StatisticsGen" } } } } inputs { inputs { key: "examples" value { channels { producer_node_query { id: "CsvExampleGen" } context_queries { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } context_queries { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:10:48.556314" } } } context_queries { type { name: "node" } name { field_value { string_value: "my_pipeline.CsvExampleGen" } } } artifact_query { type { name: "Examples" base_type: DATASET } } output_key: "examples" } min_count: 1 } } } outputs { outputs { key: "statistics" value { artifact_spec { type { name: "ExampleStatistics" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } base_type: STATISTICS } } } } } parameters { parameters { key: "exclude_splits" value { field_value { string_value: "[]" } } } } upstream_nodes: "CsvExampleGen" downstream_nodes: "SchemaGen" execution_options { caching_options { } } , pipeline_info=id: "my_pipeline" , pipeline_run_id='2022-02-03T11:10:48.556314') INFO:absl:Generating statistics for split train. INFO:absl:Statistics for split train written to ./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/14/Split-train. INFO:absl:Generating statistics for split eval. INFO:absl:Statistics for split eval written to ./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/14/Split-eval. WARNING:root:Make sure that locally built Python SDK docker image has Python 3.7 interpreter. INFO:absl:Cleaning up stateless execution info. INFO:absl:Execution 14 succeeded. INFO:absl:Cleaning up stateful execution info. INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'statistics': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/14" custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:10:48.556314:StatisticsGen:statistics:0" } } custom_properties { key: "tfx_version" value { string_value: "1.6.0" } } , artifact_type: name: "ExampleStatistics" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } base_type: STATISTICS )]}) for execution 14 INFO:absl:MetadataStore with DB connection initialized INFO:absl:Component StatisticsGen is finished. INFO:absl:Component SchemaGen is running. INFO:absl:Running launcher for node_info { type { name: "tfx.components.schema_gen.component.SchemaGen" base_type: PROCESS } id: "SchemaGen" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:10:48.556314" } } } contexts { type { name: "node" } name { field_value { string_value: "my_pipeline.SchemaGen" } } } } inputs { inputs { key: "statistics" value { channels { producer_node_query { id: "StatisticsGen" } context_queries { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } context_queries { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:10:48.556314" } } } context_queries { type { name: "node" } name { field_value { string_value: "my_pipeline.StatisticsGen" } } } artifact_query { type { name: "ExampleStatistics" base_type: STATISTICS } } output_key: "statistics" } min_count: 1 } } } outputs { outputs { key: "schema" value { artifact_spec { type { name: "Schema" } } } } } parameters { parameters { key: "exclude_splits" value { field_value { string_value: "[]" } } } parameters { key: "infer_feature_shape" value { field_value { int_value: 1 } } } } upstream_nodes: "StatisticsGen" execution_options { caching_options { } } INFO:absl:MetadataStore with DB connection initialized INFO:absl:MetadataStore with DB connection initialized INFO:absl:Going to run a new execution 15 INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=15, input_dict={'statistics': [Artifact(artifact: id: 14 type_id: 17 uri: "./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/14" properties { key: "split_names" value { string_value: "[\"train\", \"eval\"]" } } custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:10:48.556314:StatisticsGen:statistics:0" } } custom_properties { key: "tfx_version" value { string_value: "1.6.0" } } state: LIVE create_time_since_epoch: 1643886653128 last_update_time_since_epoch: 1643886653128 , artifact_type: id: 17 name: "ExampleStatistics" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } base_type: STATISTICS )]}, output_dict=defaultdict(<class 'list'>, {'schema': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/SchemaGen/schema/15" custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:10:48.556314:SchemaGen:schema:0" } } , artifact_type: name: "Schema" )]}), exec_properties={'infer_feature_shape': 1, 'exclude_splits': '[]'}, execution_output_uri='./tfx_pipeline_output/my_pipeline/SchemaGen/.system/executor_execution/15/executor_output.pb', stateful_working_dir='./tfx_pipeline_output/my_pipeline/SchemaGen/.system/stateful_working_dir/2022-02-03T11:10:48.556314', tmp_dir='./tfx_pipeline_output/my_pipeline/SchemaGen/.system/executor_execution/15/.temp/', pipeline_node=node_info { type { name: "tfx.components.schema_gen.component.SchemaGen" base_type: PROCESS } id: "SchemaGen" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:10:48.556314" } } } contexts { type { name: "node" } name { field_value { string_value: "my_pipeline.SchemaGen" } } } } inputs { inputs { key: "statistics" value { channels { producer_node_query { id: "StatisticsGen" } context_queries { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } context_queries { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:10:48.556314" } } } context_queries { type { name: "node" } name { field_value { string_value: "my_pipeline.StatisticsGen" } } } artifact_query { type { name: "ExampleStatistics" base_type: STATISTICS } } output_key: "statistics" } min_count: 1 } } } outputs { outputs { key: "schema" value { artifact_spec { type { name: "Schema" } } } } } parameters { parameters { key: "exclude_splits" value { field_value { string_value: "[]" } } } parameters { key: "infer_feature_shape" value { field_value { int_value: 1 } } } } upstream_nodes: "StatisticsGen" execution_options { caching_options { } } , pipeline_info=id: "my_pipeline" , pipeline_run_id='2022-02-03T11:10:48.556314') INFO:absl:Processing schema from statistics for split train. INFO:absl:Processing schema from statistics for split eval. INFO:absl:Schema written to ./tfx_pipeline_output/my_pipeline/SchemaGen/schema/15/schema.pbtxt. INFO:absl:Cleaning up stateless execution info. INFO:absl:Execution 15 succeeded. INFO:absl:Cleaning up stateful execution info. INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'schema': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/SchemaGen/schema/15" custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:10:48.556314:SchemaGen:schema:0" } } custom_properties { key: "tfx_version" value { string_value: "1.6.0" } } , artifact_type: name: "Schema" )]}) for execution 15 INFO:absl:MetadataStore with DB connection initialized INFO:absl:Component SchemaGen is finished.プレースホルダー36
新しいモデルはSERVING_MODEL_DIR
で見つけることができるはずです。
手順6.(オプション)パイプラインをGCPのKubeflowパイプラインにデプロイします。
前述のように、 local_runner.py
はデバッグや開発の目的には適していますが、本番ワークロードには最適なソリューションではありません。このステップでは、パイプラインをGoogleCloudのKubeflowパイプラインにデプロイします。
準備
パイプラインをKubeflowPipelinesクラスターにデプロイするには、 skaffold
kfp
が必要です。
pip install --upgrade -q kfp
# Download skaffold and set it executable.
curl -Lo skaffold https://storage.googleapis.com/skaffold/releases/latest/skaffold-linux-amd64 && chmod +x skaffold
skaffold
バイナリをシェルが見つけられる場所に移動する必要があります。または、 --skaffold-cmd
フラグを指定してtfx
binaryを実行するときに、skaffoldへのパスを指定できます。
# Move skaffold binary into your path
mv skaffold /home/jupyter/.local/bin/
mv: cannot move 'skaffold' to '/home/jupyter/.local/bin/': No such file or directory
パイプラインを実行するには、Kubeflowパイプラインクラスターも必要です。 TFX on Cloud AI PlatformPipelinesチュートリアルのステップ1と2に従ってください。
クラスタの準備ができたら、 Googleクラウドコンソールの[パイプライン]ページPipelines
[パイプラインダッシュボードを開く]をクリックして、パイプラインダッシュボードを開きます。このページのURLは、パイプラインの実行を要求するENDPOINT
です。エンドポイント値は、https://からgoogleusercontent.comまでのURL内のすべてです。エンドポイントを次のコードブロックに配置します。
ENDPOINT='' # Enter your ENDPOINT here.
Kubeflow Pipelinesクラスターでコードを実行するには、コードをコンテナーイメージにパックする必要があります。パイプラインのデプロイ中にイメージが自動的に構築され、イメージの名前とコンテナレジストリを設定するだけで済みます。この例では、 Google Containerレジストリを使用し、 tfx-pipeline
という名前を付けます。
# Read GCP project id from env.
shell_output=!gcloud config list --format 'value(core.project)' 2>/dev/null
GOOGLE_CLOUD_PROJECT=shell_output[0]
# Docker image name for the pipeline image.
CUSTOM_TFX_IMAGE='gcr.io/' + GOOGLE_CLOUD_PROJECT + '/tfx-pipeline'
データの場所を設定します。
データには、Kubeflowパイプラインクラスターからアクセスできる必要があります。ローカル環境でデータを使用したことがある場合は、Google CloudStorageなどのリモートストレージにデータをアップロードする必要がある場合があります。たとえば、ペンギンのデータを、KubeflowPipelinesクラスターがデプロイされたときに自動的に作成されるデフォルトのバケットに次のようにアップロードできます。
gsutil cp data/data.csv gs://{GOOGLE_CLOUD_PROJECT}-kubeflowpipelines-default/tfx-template/data/penguin/
Copying file://data/data.csv [Content-Type=text/csv]... NotFoundException: 404 The destination bucket gs://tf-benchmark-dashboard-kubeflowpipelines-default does not exist or the write to the destination must be restarted
DATA_PATH
のkubeflow_runner.py
に保存されているデータの場所を更新します。
BigQueryExampleGenを使用している場合、データファイルをアップロードする必要はありませんが、 beam_pipeline_args
がpipeline.create_pipeline()
関数に対して同じquery
とkubeflow_runner.py
引数を使用していることを確認してください。
パイプラインをデプロイします。
すべての準備ができたら、 tfx pipeline create
コマンドを使用してパイプラインを作成できます。
!tfx pipeline create \
--engine=kubeflow \
--pipeline-path=kubeflow_runner.py \
--endpoint={ENDPOINT} \
--build-target-image={CUSTOM_TFX_IMAGE}
CLI [Error] --build-target-image flag was DELETED. You should specify the build target image at the `KubeflowDagRunnerConfig` class instead, and use --build-image flag without argument to build a container image when creating or updating a pipeline.
次に、 tfx run create
コマンドを使用して、新しく作成されたパイプラインで実行実行を開始します。
tfx run create --engine=kubeflow --pipeline-name={PIPELINE_NAME} --endpoint={ENDPOINT}
CLI Creating a run for pipeline: my_pipeline Failed to load kube config. Traceback (most recent call last): File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/urllib3/connection.py", line 175, in _new_conn (self._dns_host, self.port), self.timeout, **extra_kw File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/urllib3/util/connection.py", line 95, in create_connection raise err File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/urllib3/util/connection.py", line 85, in create_connection sock.connect(sa) ConnectionRefusedError: [Errno 111] Connection refused During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/urllib3/connectionpool.py", line 710, in urlopen chunked=chunked, File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/urllib3/connectionpool.py", line 398, in _make_request conn.request(method, url, **httplib_request_kw) File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/urllib3/connection.py", line 239, in request super(HTTPConnection, self).request(method, url, body=body, headers=headers) File "/usr/lib/python3.7/http/client.py", line 1256, in request self._send_request(method, url, body, headers, encode_chunked) File "/usr/lib/python3.7/http/client.py", line 1302, in _send_request self.endheaders(body, encode_chunked=encode_chunked) File "/usr/lib/python3.7/http/client.py", line 1251, in endheaders self._send_output(message_body, encode_chunked=encode_chunked) File "/usr/lib/python3.7/http/client.py", line 1030, in _send_output self.send(msg) File "/usr/lib/python3.7/http/client.py", line 970, in send self.connect() File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/urllib3/connection.py", line 205, in connect conn = self._new_conn() File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/urllib3/connection.py", line 187, in _new_conn self, "Failed to establish a new connection: %s" % e urllib3.exceptions.NewConnectionError: <urllib3.connection.HTTPConnection object at 0x7ff729e34190>: Failed to establish a new connection: [Errno 111] Connection refused During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/tmpfs/src/tf_docs_env/bin/tfx", line 8, in <module> sys.exit(cli_group()) File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/click/core.py", line 829, in __call__ return self.main(*args, **kwargs) File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/click/core.py", line 782, in main rv = self.invoke(ctx) File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/click/core.py", line 1259, in invoke return _process_result(sub_ctx.command.invoke(sub_ctx)) File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/click/core.py", line 1259, in invoke return _process_result(sub_ctx.command.invoke(sub_ctx)) File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/click/core.py", line 1066, in invoke return ctx.invoke(self.callback, **ctx.params) File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/click/core.py", line 610, in invoke return callback(*args, **kwargs) File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/click/decorators.py", line 73, in new_func return ctx.invoke(f, obj, *args, **kwargs) File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/click/core.py", line 610, in invoke return callback(*args, **kwargs) File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tfx/tools/cli/commands/run.py", line 94, in create_run handler = handler_factory.create_handler(ctx.flags_dict) File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tfx/tools/cli/handler/handler_factory.py", line 93, in create_handler return kubeflow_handler.KubeflowHandler(flags_dict) File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tfx/tools/cli/handler/kubeflow_handler.py", line 62, in __init__ namespace=self.flags_dict[labels.NAMESPACE]) File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/kfp/_client.py", line 197, in __init__ if not self._context_setting['namespace'] and self.get_kfp_healthz( File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/kfp/_client.py", line 411, in get_kfp_healthz response = self._healthz_api.get_healthz() File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/kfp_server_api/api/healthz_service_api.py", line 63, in get_healthz return self.get_healthz_with_http_info(**kwargs) # noqa: E501 File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/kfp_server_api/api/healthz_service_api.py", line 148, in get_healthz_with_http_info collection_formats=collection_formats) File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/kfp_server_api/api_client.py", line 369, in call_api _preload_content, _request_timeout, _host) File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/kfp_server_api/api_client.py", line 185, in __call_api _request_timeout=_request_timeout) File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/kfp_server_api/api_client.py", line 393, in request headers=headers) File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/kfp_server_api/rest.py", line 234, in GET query_params=query_params) File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/kfp_server_api/rest.py", line 212, in request headers=headers) File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/urllib3/request.py", line 75, in request method, url, fields=fields, headers=headers, **urlopen_kw File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/urllib3/request.py", line 96, in request_encode_url return self.urlopen(method, url, **extra_kw) File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/urllib3/poolmanager.py", line 375, in urlopen response = conn.urlopen(method, u.request_uri, **kw) File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/urllib3/connectionpool.py", line 826, in urlopen **response_kw File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/urllib3/connectionpool.py", line 826, in urlopen **response_kw File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/urllib3/connectionpool.py", line 826, in urlopen **response_kw File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/urllib3/connectionpool.py", line 786, in urlopen method, url, error=e, _pool=self, _stacktrace=sys.exc_info()[2] File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/urllib3/util/retry.py", line 592, in increment raise MaxRetryError(_pool, url, error or ResponseError(cause)) urllib3.exceptions.MaxRetryError: HTTPConnectionPool(host='localhost', port=80): Max retries exceeded with url: /apis/v1beta1/healthz (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7ff729e34190>: Failed to establish a new connection: [Errno 111] Connection refused'))
または、Kubeflowパイプラインダッシュボードでパイプラインを実行することもできます。新しい実行は、Kubeflowパイプラインダッシュボードの[ Experiments
]の下に一覧表示されます。実験をクリックすると、進行状況を監視し、実行中に作成されたアーティファクトを視覚化できます。
Kubeflowパイプラインでパイプラインを実行することに興味がある場合は、 TFX on CloudAIプラットフォームパイプラインチュートリアルで詳細な手順を確認してください。
清掃
この手順で使用したすべてのGoogleCloudリソースをクリーンアップするには、チュートリアルで使用したGoogleCloudプロジェクトを削除します。
または、各コンソールにアクセスして、個々のリソースをクリーンアップすることもできます。