التحقق من صحة البيانات باستخدام خط أنابيب TFX والتحقق من صحة بيانات TensorFlow

في هذا البرنامج التعليمي المستند إلى الكمبيوتر المحمول ، سننشئ خطوط أنابيب TFX ونشغلها للتحقق من صحة بيانات الإدخال وإنشاء نموذج ML. ويستند هذا الكمبيوتر الدفتري على خط أنابيب TFX بنينا في بسيط TFX خط أنابيب التعليمي . إذا لم تكن قد قرأت هذا البرنامج التعليمي حتى الآن ، فيجب عليك قراءته قبل المتابعة مع دفتر الملاحظات هذا.

تتمثل المهمة الأولى في أي مشروع علم بيانات أو مشروع ML في فهم البيانات وتنظيفها ، والتي تشمل:

  • فهم أنواع البيانات والتوزيعات والمعلومات الأخرى (على سبيل المثال ، متوسط ​​القيمة أو عدد العناصر الفريدة) حول كل ميزة
  • إنشاء مخطط أولي يصف البيانات
  • تحديد الانحرافات والقيم المفقودة في البيانات فيما يتعلق بمخطط معين

في هذا البرنامج التعليمي ، سننشئ خطي أنابيب TFX.

أولاً ، سننشئ خط أنابيب لتحليل مجموعة البيانات وإنشاء مخطط أولي لمجموعة البيانات المحددة. وسيشمل هذا الخط اثنين من عناصر جديدة، StatisticsGen و SchemaGen .

بمجرد أن يكون لدينا مخطط مناسب للبيانات ، سننشئ خط أنابيب لتدريب نموذج تصنيف ML بناءً على خط الأنابيب من البرنامج التعليمي السابق. في هذا الخط، وسوف نستخدم مخطط من خط الأنابيب الأول والعنصر الجديد، ExampleValidator ، للتحقق من صحة البيانات المدخلة.

المكونات الثلاثة الجديدة، StatisticsGen، SchemaGen وExampleValidator، هي مكونات TFX لتحليل البيانات والتحقق من صحة، وتنفيذها باستخدام TensorFlow بيانات التحقق من صحة مكتبة.

يرجى الاطلاع على فهم TFX خطوط الأنابيب لمعرفة المزيد عن مفاهيم مختلفة في TFX.


نحتاج أولاً إلى تثبيت حزمة TFX Python وتنزيل مجموعة البيانات التي سنستخدمها لنموذجنا.

ترقية النقطة

لتجنب ترقية Pip في نظام عند التشغيل محليًا ، تحقق للتأكد من أننا نعمل في Colab. يمكن بالطبع ترقية الأنظمة المحلية بشكل منفصل.

  import colab
  !pip install --upgrade pip

قم بتثبيت TFX

pip install -U tfx

هل أعدت تشغيل وقت التشغيل؟

إذا كنت تستخدم Google Colab ، في المرة الأولى التي تقوم فيها بتشغيل الخلية أعلاه ، يجب إعادة تشغيل وقت التشغيل بالنقر فوق الزر "RESTART RUNTIME" أعلاه أو باستخدام قائمة "Runtime> Restart runtime ...". هذا بسبب الطريقة التي يقوم بها كولاب بتحميل الحزم.

تحقق من إصدارات TensorFlow و TFX.

import tensorflow as tf
print('TensorFlow version: {}'.format(tf.__version__))
from tfx import v1 as tfx
print('TFX version: {}'.format(tfx.__version__))
TensorFlow version: 2.6.2
TFX version: 1.4.0

قم بإعداد المتغيرات

هناك بعض المتغيرات المستخدمة لتحديد خط الأنابيب. يمكنك تخصيص هذه المتغيرات كما تريد. بشكل افتراضي ، سيتم إنشاء كل الإخراج من خط الأنابيب ضمن الدليل الحالي.

import os

# We will create two pipelines. One for schema generation and one for training.
SCHEMA_PIPELINE_NAME = "penguin-tfdv-schema"
PIPELINE_NAME = "penguin-tfdv"

# Output directory to store artifacts generated from the pipeline.
PIPELINE_ROOT = os.path.join('pipelines', PIPELINE_NAME)
# Path to a SQLite DB file to use as an MLMD storage.
METADATA_PATH = os.path.join('metadata', PIPELINE_NAME, 'metadata.db')

# Output directory where created models from the pipeline will be exported.
SERVING_MODEL_DIR = os.path.join('serving_model', PIPELINE_NAME)

from absl import logging
logging.set_verbosity(logging.INFO)  # Set default logging level.

تحضير البيانات النموذجية

سنقوم بتنزيل نموذج مجموعة البيانات لاستخدامه في خط أنابيب TFX الخاص بنا. مجموعة البيانات التي نستخدمها هي بالمر البطاريق مجموعة البيانات التي تستخدم أيضا في غيرها من الأمثلة TFX .

توجد أربع سمات رقمية في مجموعة البيانات هذه:

  • culmen_length_mm
  • culmen_depth_mm
  • الزعنفة_length_mm
  • body_mass_g

تم بالفعل تسوية جميع الميزات ليكون لها نطاق [0،1]. سوف نبني نموذج التصنيف الذي يتنبأ species من طيور البطريق.

نظرًا لأن مكون TFX ExampleGen يقرأ المدخلات من دليل ، نحتاج إلى إنشاء دليل ونسخ مجموعة البيانات إليه.

import urllib.request
import tempfile

DATA_ROOT = tempfile.mkdtemp(prefix='tfx-data')  # Create a temporary directory.
_data_url = 'https://raw.githubusercontent.com/tensorflow/tfx/master/tfx/examples/penguin/data/labelled/penguins_processed.csv'
_data_filepath = os.path.join(DATA_ROOT, "data.csv")
urllib.request.urlretrieve(_data_url, _data_filepath)
('/tmp/tfx-datan3p7t1d2/data.csv', <http.client.HTTPMessage at 0x7f8d2f9f9110>)

ألق نظرة سريعة على ملف CSV.

head {_data_filepath}

يجب أن تكون قادرًا على رؤية خمسة أعمدة للميزات. species هي واحدة من 0 أو 1 أو 2، ويجب على جميع الميزات الأخرى لديها قيم بين 0 و 1. ونحن سوف إنشاء خط أنابيب TFX لتحليل هذه البيانات.

قم بإنشاء مخطط أولي

يتم تعريف خطوط أنابيب TFX باستخدام واجهات برمجة تطبيقات Python. سننشئ خط أنابيب لإنشاء مخطط من أمثلة الإدخال تلقائيًا. يمكن مراجعة هذا المخطط من قبل الإنسان وتعديله حسب الحاجة. بمجرد الانتهاء من المخطط ، يمكن استخدامه للتدريب والتحقق من صحة المثال في المهام اللاحقة.

بالإضافة إلى CsvExampleGen الذي يستخدم في بسيط TFX خط أنابيب التعليمي ، سوف نستخدم StatisticsGen و SchemaGen :

  • StatisticsGen بحساب إحصاءات عن مجموعة البيانات.
  • SchemaGen يدرس الإحصاءات ويخلق مخطط البيانات الأولية.

اطلع على دليل لكل مكون أو مكونات TFX البرنامج التعليمي لمعرفة المزيد عن هذه المكونات.

اكتب تعريف خط الأنابيب

نحدد وظيفة لإنشاء خط أنابيب TFX. A Pipeline يمثل الكائن خط أنابيب TFX التي يمكن تشغيلها باستخدام واحدة من شبكات الأنابيب تزامن التي تدعم TFX.

def _create_schema_pipeline(pipeline_name: str,
                            pipeline_root: str,
                            data_root: str,
                            metadata_path: str) -> tfx.dsl.Pipeline:
  """Creates a pipeline for schema generation."""
  # Brings data into the pipeline.
  example_gen = tfx.components.CsvExampleGen(input_base=data_root)

  # NEW: Computes statistics over data for visualization and schema generation.
  statistics_gen = tfx.components.StatisticsGen(

  # NEW: Generates schema based on the generated statistics.
  schema_gen = tfx.components.SchemaGen(
      statistics=statistics_gen.outputs['statistics'], infer_feature_shape=True)

  components = [

  return tfx.dsl.Pipeline(

قم بتشغيل خط الأنابيب

سوف نستخدم LocalDagRunner كما في السابق تعليمي.

يجب أن تشاهد "INFO: absl: تم الانتهاء من SchemaGen المكون." إذا انتهى خط الأنابيب بنجاح.

سوف نفحص ناتج خط الأنابيب لفهم مجموعة البيانات الخاصة بنا.

مراجعة مخرجات خط الأنابيب

كما هو موضح في البرنامج التعليمي السابق، خط أنابيب TFX تنتج نوعين من النتائج، والتحف و الفوقية DB (MLMD) الذي يحتوي على بيانات التعريف من التحف والإعدام خط الانابيب. حددنا موقع هذه المخرجات في الخلايا المذكورة أعلاه. افتراضيا، يتم تخزين القطع الأثرية تحت pipelines الدليل ويتم تخزين بيانات التعريف باعتباره قاعدة بيانات SQLite تحت metadata الدليل.

يمكنك استخدام واجهات برمجة تطبيقات MLMD لتحديد هذه المخرجات برمجياً. أولاً ، سنحدد بعض وظائف الأداة المساعدة للبحث عن عناصر الإخراج التي تم إنتاجها للتو.

from ml_metadata.proto import metadata_store_pb2
# Non-public APIs, just for showcase.
from tfx.orchestration.portable.mlmd import execution_lib

# TODO(b/171447278): Move these functions into the TFX library.

def get_latest_artifacts(metadata, pipeline_name, component_id):
  """Output artifacts of the latest run of the component."""
  context = metadata.store.get_context_by_type_and_name(
      'node', f'{pipeline_name}.{component_id}')
  executions = metadata.store.get_executions_by_context(context.id)
  latest_execution = max(executions,
                         key=lambda e:e.last_update_time_since_epoch)
  return execution_lib.get_artifacts_dict(metadata, latest_execution.id,

# Non-public APIs, just for showcase.
from tfx.orchestration.experimental.interactive import visualizations

def visualize_artifacts(artifacts):
  """Visualizes artifacts using standard visualization modules."""
  for artifact in artifacts:
    visualization = visualizations.get_registry().get_visualization(
    if visualization:

from tfx.orchestration.experimental.interactive import standard_visualizations

الآن يمكننا فحص مخرجات تنفيذ خط الأنابيب.

# Non-public APIs, just for showcase.
from tfx.orchestration.metadata import Metadata
from tfx.types import standard_component_specs

metadata_connection_config = tfx.orchestration.metadata.sqlite_metadata_connection_config(

with Metadata(metadata_connection_config) as metadata_handler:
  # Find output artifacts from MLMD.
  stat_gen_output = get_latest_artifacts(metadata_handler, SCHEMA_PIPELINE_NAME,
  stats_artifacts = stat_gen_output[standard_component_specs.STATISTICS_KEY]

  schema_gen_output = get_latest_artifacts(metadata_handler,
                                           SCHEMA_PIPELINE_NAME, 'SchemaGen')
  schema_artifacts = schema_gen_output[standard_component_specs.SCHEMA_KEY]
INFO:absl:MetadataStore with DB connection initialized

حان الوقت لفحص مخرجات كل مكون. كما هو موضح أعلاه، Tensorflow التحقق من صحة البيانات (TFDV) يستخدم في StatisticsGen و SchemaGen ، ويوفر أيضا TFDV التصور من المخرجات من هذه المكونات.

في هذا البرنامج التعليمي ، سوف نستخدم طرق مساعد التصور في TFX والتي تستخدم TFDV داخليًا لإظهار التصور.

افحص الإخراج من StatisticsGen

# docs-infra: no-execute

يمكنك مشاهدة احصائيات مختلفة لبيانات الإدخال. يتم توفير هذه الإحصاءات إلى SchemaGen لبناء مخطط أولي للبيانات تلقائيا.

افحص الإخراج من SchemaGen


يتم استنتاج هذا المخطط تلقائيًا من إخراج StatisticsGen. يجب أن تكون قادرًا على رؤية 4 ميزات FLOAT وميزة 1 INT.

تصدير مخطط قاعدة البيانات للاستخدام في المستقبل

نحن بحاجة إلى مراجعة وتحسين المخطط الذي تم إنشاؤه. يحتاج المخطط الذي تمت مراجعته إلى الاستمرار في استخدامه في خطوط الأنابيب اللاحقة لتدريب نموذج ML. بمعنى آخر ، قد ترغب في إضافة ملف المخطط إلى نظام التحكم في الإصدار الخاص بك لحالات الاستخدام الفعلي. في هذا البرنامج التعليمي ، سنقوم فقط بنسخ المخطط إلى مسار نظام ملفات محدد مسبقًا من أجل البساطة.

import shutil

_schema_filename = 'schema.pbtxt'
SCHEMA_PATH = 'schema'

os.makedirs(SCHEMA_PATH, exist_ok=True)
_generated_path = os.path.join(schema_artifacts[0].uri, _schema_filename)

# Copy the 'schema.pbtxt' file from the artifact uri to a predefined path.
shutil.copy(_generated_path, SCHEMA_PATH)

يستخدم ملف المخطط شكل النص العازلة بروتوكول ومثيل بروتو TensorFlow الفوقية مخطط .

print(f'Schema at {SCHEMA_PATH}-----')
!cat {SCHEMA_PATH}/*
Schema at schema-----
feature {
  name: "body_mass_g"
  type: FLOAT
  presence {
    min_fraction: 1.0
    min_count: 1
  shape {
    dim {
      size: 1
feature {
  name: "culmen_depth_mm"
  type: FLOAT
  presence {
    min_fraction: 1.0
    min_count: 1
  shape {
    dim {
      size: 1
feature {
  name: "culmen_length_mm"
  type: FLOAT
  presence {
    min_fraction: 1.0
    min_count: 1
  shape {
    dim {
      size: 1
feature {
  name: "flipper_length_mm"
  type: FLOAT
  presence {
    min_fraction: 1.0
    min_count: 1
  shape {
    dim {
      size: 1
feature {
  name: "species"
  type: INT
  presence {
    min_fraction: 1.0
    min_count: 1
  shape {
    dim {
      size: 1

يجب أن تتأكد من مراجعة تعريف المخطط وربما تعديله حسب الحاجة. في هذا البرنامج التعليمي ، سنستخدم مخطط قاعدة البيانات الذي تم إنشاؤه دون تغيير.

التحقق من صحة أمثلة الإدخال وتدريب نموذج ML

وسوف نعود إلى خط الأنابيب الذي خلقنا في بسيط TFX خط أنابيب تعليمي لتدريب نموذج ML واستخدام المخطط إنشاؤها لكتابة رمز تدريبية نموذجية.

ونحن أيضا إضافة ExampleValidator العنصر الذي سوف تبحث عن الشذوذ والقيم المفقودة في مجموعة البيانات الواردة فيما يتعلق المخطط.

اكتب كود التدريب النموذجي

نحن بحاجة إلى كتابة مدونة نموذجية كما فعلنا في بسيط TFX خط أنابيب التعليمي .

النموذج نفسه هو نفسه كما في البرنامج التعليمي السابق ، ولكن هذه المرة سنستخدم المخطط الذي تم إنشاؤه من خط الأنابيب السابق بدلاً من تحديد الميزات يدويًا. لم يتم تغيير معظم الكود. الاختلاف الوحيد هو أننا لا نحتاج إلى تحديد أسماء وأنواع الميزات في هذا الملف. بدلا من ذلك، نقرأ لهم من ملف المخطط.

_trainer_module_file = 'penguin_trainer.py'
%%writefile {_trainer_module_file}

from typing import List
from absl import logging
import tensorflow as tf
from tensorflow import keras
from tensorflow_transform.tf_metadata import schema_utils

from tfx import v1 as tfx
from tfx_bsl.public import tfxio
from tensorflow_metadata.proto.v0 import schema_pb2

# We don't need to specify _FEATURE_KEYS and _FEATURE_SPEC any more.
# Those information can be read from the given schema file.

_LABEL_KEY = 'species'


def _input_fn(file_pattern: List[str],
              data_accessor: tfx.components.DataAccessor,
              schema: schema_pb2.Schema,
              batch_size: int = 200) -> tf.data.Dataset:
  """Generates features and label for training.

    file_pattern: List of paths or patterns of input tfrecord files.
    data_accessor: DataAccessor for converting input to RecordBatch.
    schema: schema of the input data.
    batch_size: representing the number of consecutive elements of returned
      dataset to combine in a single batch

    A dataset that contains (features, indices) tuple where features is a
      dictionary of Tensors, and indices is a single Tensor of label indices.
  return data_accessor.tf_dataset_factory(
          batch_size=batch_size, label_key=_LABEL_KEY),

def _build_keras_model(schema: schema_pb2.Schema) -> tf.keras.Model:
  """Creates a DNN Keras model for classifying penguin data.

    A Keras Model.
  # The model below is built with Functional API, please refer to
  # https://www.tensorflow.org/guide/keras/overview for all API options.

  # ++ Changed code: Uses all features in the schema except the label.
  feature_keys = [f.name for f in schema.feature if f.name != _LABEL_KEY]
  inputs = [keras.layers.Input(shape=(1,), name=f) for f in feature_keys]
  # ++ End of the changed code.

  d = keras.layers.concatenate(inputs)
  for _ in range(2):
    d = keras.layers.Dense(8, activation='relu')(d)
  outputs = keras.layers.Dense(3)(d)

  model = keras.Model(inputs=inputs, outputs=outputs)

  return model

# TFX Trainer will call this function.
def run_fn(fn_args: tfx.components.FnArgs):
  """Train the model based on given args.

    fn_args: Holds args used to train the model as name/value pairs.

  # ++ Changed code: Reads in schema file passed to the Trainer component.
  schema = tfx.utils.parse_pbtxt_file(fn_args.schema_path, schema_pb2.Schema())
  # ++ End of the changed code.

  train_dataset = _input_fn(
  eval_dataset = _input_fn(

  model = _build_keras_model(schema)

  # The result of the training should be saved in `fn_args.serving_model_dir`
  # directory.
  model.save(fn_args.serving_model_dir, save_format='tf')
Writing penguin_trainer.py

لقد أكملت الآن جميع خطوات الإعداد لإنشاء خط أنابيب TFX لتدريب النموذج.

اكتب تعريف خط الأنابيب

وسوف نقوم بإضافة اثنين من عناصر جديدة، Importer و ExampleValidator . يقوم المستورد بإحضار ملف خارجي إلى خط أنابيب TFX. في هذه الحالة ، يكون ملفًا يحتوي على تعريف المخطط. سوف يفحص ExampleValidator بيانات الإدخال ويتحقق مما إذا كانت جميع بيانات الإدخال تتوافق مع مخطط البيانات الذي قدمناه.

def _create_pipeline(pipeline_name: str, pipeline_root: str, data_root: str,
                     schema_path: str, module_file: str, serving_model_dir: str,
                     metadata_path: str) -> tfx.dsl.Pipeline:
  """Creates a pipeline using predefined schema with TFX."""
  # Brings data into the pipeline.
  example_gen = tfx.components.CsvExampleGen(input_base=data_root)

  # Computes statistics over data for visualization and example validation.
  statistics_gen = tfx.components.StatisticsGen(

  # NEW: Import the schema.
  schema_importer = tfx.dsl.Importer(

  # NEW: Performs anomaly detection based on statistics and data schema.
  example_validator = tfx.components.ExampleValidator(

  # Uses user-provided Python function that trains a model.
  trainer = tfx.components.Trainer(
      schema=schema_importer.outputs['result'],  # Pass the imported schema.

  # Pushes the model to a filesystem destination.
  pusher = tfx.components.Pusher(

  components = [

      # NEW: Following three components were added to the pipeline.


  return tfx.dsl.Pipeline(

قم بتشغيل خط الأنابيب

يجب أن تشاهد "INFO: absl: تم الانتهاء من وحدة دفع المكونات." إذا انتهى خط الأنابيب بنجاح.

افحص مخرجات خط الأنابيب

لقد قمنا بتدريب نموذج التصنيف لطيور البطريق ، وقمنا أيضًا بالتحقق من صحة أمثلة الإدخال في مكون ExampleValidator. يمكننا تحليل الناتج من ExampleValidator كما فعلنا مع خط الأنابيب السابق.

metadata_connection_config = tfx.orchestration.metadata.sqlite_metadata_connection_config(

with Metadata(metadata_connection_config) as metadata_handler:
  ev_output = get_latest_artifacts(metadata_handler, PIPELINE_NAME,
  anomalies_artifacts = ev_output[standard_component_specs.ANOMALIES_KEY]
INFO:absl:MetadataStore with DB connection initialized

يمكن تصور الحالات الشاذة من ExampleValidator أيضًا.


يجب أن تشاهد "لم يتم العثور على أي شذوذ" لكل قسم من الأمثلة. نظرًا لأننا استخدمنا نفس البيانات التي تم استخدامها لإنشاء المخطط في خط الأنابيب هذا ، فلا يُتوقع حدوث أي شذوذ هنا. إذا قمت بتشغيل خط الأنابيب هذا بشكل متكرر باستخدام بيانات واردة جديدة ، فيجب أن يتمكن ExampleValidator من العثور على أي تناقضات بين البيانات الجديدة والمخطط الحالي.

إذا تم العثور على أي شذوذ ، يمكنك مراجعة بياناتك للتحقق لمعرفة ما إذا كانت أي أمثلة لا تتبع افتراضاتك. قد تكون المخرجات من مكونات أخرى مثل StatisticsGen مفيدة. ومع ذلك ، فإن أي حالات شاذة تم العثور عليها لن تمنع تنفيذ المزيد من خطوط الأنابيب.

الخطوات التالية

يمكنك العثور على مزيد من الموارد على https://www.tensorflow.org/tfx/tutorials

يرجى الاطلاع على فهم TFX خطوط الأنابيب لمعرفة المزيد عن مفاهيم مختلفة في TFX.