API tập dữ liệu Avro

Xem trên TensorFlow.org Chạy trong Google Colab Xem nguồn trên GitHub Tải xuống sổ ghi chép

Tổng quat

Mục tiêu của Avro Dataset API là để tải Avro định dạng dữ liệu nguyên bản vào TensorFlow như TensorFlow bộ dữ liệu . Avro là một hệ thống tuần tự hóa dữ liệu tương tự như Bộ đệm giao thức. Nó được sử dụng rộng rãi trong Apache Hadoop nơi nó có thể cung cấp cả định dạng tuần tự hóa cho dữ liệu liên tục và định dạng dây để giao tiếp giữa các nút Hadoop. Dữ liệu Avro là một định dạng dữ liệu nhị phân nén, hướng hàng. Nó dựa trên lược đồ được lưu trữ dưới dạng tệp JSON riêng biệt. Đối với spec của định dạng và schema khai Avro, vui lòng tham khảo hướng dẫn chính thức .

Gói thiết lập

Cài đặt gói tensorflow-io bắt buộc

pip install tensorflow-io

Nhập gói

import tensorflow as tf
import tensorflow_io as tfio

Xác thực nhập tf và tfio

print("tensorflow-io version: {}".format(tfio.__version__))
print("tensorflow version: {}".format(tf.__version__))
tensorflow-io version: 0.18.0
tensorflow version: 2.5.0

Cách sử dụng

Khám phá tập dữ liệu

Đối với mục đích của hướng dẫn này, chúng ta hãy tải xuống tập dữ liệu Avro mẫu.

Tải xuống tệp Avro mẫu:

curl -OL https://github.com/tensorflow/io/raw/master/docs/tutorials/avro/train.avro
ls -l train.avro
% Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100   151  100   151    0     0   1268      0 --:--:-- --:--:-- --:--:--  1268
100   369  100   369    0     0   1255      0 --:--:-- --:--:-- --:--:--  1255
-rw-rw-r-- 1 kbuilder kokoro 369 May 25 22:23 train.avro

Tải xuống tệp giản đồ tương ứng của tệp Avro mẫu:

curl -OL https://github.com/tensorflow/io/raw/master/docs/tutorials/avro/train.avsc
ls -l train.avsc
% Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100   151  100   151    0     0   1247      0 --:--:-- --:--:-- --:--:--  1247
100   271  100   271    0     0    780      0 --:--:-- --:--:-- --:--:--   780
-rw-rw-r-- 1 kbuilder kokoro 271 May 25 22:23 train.avsc

Trong ví dụ trên, tập dữ liệu Avro thử nghiệm đã được tạo dựa trên tập dữ liệu mnist. Các mnist bộ dữ liệu gốc ở định dạng TFRecord được tạo ra từ TF tên tập dữ liệu . Tuy nhiên, tập dữ liệu mnist quá lớn như một tập dữ liệu demo. Vì mục đích đơn giản, hầu hết nó đã được cắt bỏ và một số hồ sơ đầu tiên chỉ được lưu giữ. Hơn nữa, trang trí bổ sung được thực hiện cho image hiện trường trong tập dữ liệu mnist gốc và ánh xạ nó vào features lĩnh vực trong Avro. Vì vậy, các tập tin Avro train.avro có 4 hồ sơ, mỗi trong số đó có 3 lĩnh vực: features , mà là một mảng int, label , một int hoặc null, và dataType , một enum. Để xem giải mã train.avro (Lưu ý bản gốc tập tin dữ liệu Avro không phải là con người có thể đọc được như Avro là một định dạng nén chặt):

Cài đặt gói bắt buộc để đọc tệp Avro:

pip install avro

Để đọc và in tệp Avro ở định dạng con người có thể đọc được:

from avro.io import DatumReader
from avro.datafile import DataFileReader

import json

def print_avro(avro_file, max_record_num=None):
    if max_record_num is not None and max_record_num <= 0:
        return

    with open(avro_file, 'rb') as avro_handler:
        reader = DataFileReader(avro_handler, DatumReader())
        record_count = 0
        for record in reader:
            record_count = record_count+1
            print(record)
            if max_record_num is not None and record_count == max_record_num:
               break

print_avro(avro_file='train.avro')
{'features': [0, 0, 0, 1, 4], 'label': None, 'dataType': 'TRAINING'}
{'features': [0, 0], 'label': 2, 'dataType': 'TRAINING'}
{'features': [0], 'label': 3, 'dataType': 'VALIDATION'}
{'features': [1], 'label': 4, 'dataType': 'VALIDATION'}

Và giản đồ của train.avro mà được đại diện bởi train.avsc là một tập tin định dạng JSON. Để xem train.avsc :

def print_schema(avro_schema_file):
    with open(avro_schema_file, 'r') as handle:
        parsed = json.load(handle)
    print(json.dumps(parsed, indent=4, sort_keys=True))

print_schema('train.avsc')
{
    "fields": [
        {
            "name": "features",
            "type": {
                "items": "int",
                "type": "array"
            }
        },
        {
            "name": "label",
            "type": [
                "int",
                "null"
            ]
        },
        {
            "name": "dataType",
            "type": {
                "name": "dataTypes",
                "symbols": [
                    "TRAINING",
                    "VALIDATION"
                ],
                "type": "enum"
            }
        }
    ],
    "name": "ImageDataset",
    "type": "record"
}

Chuẩn bị tập dữ liệu

Tải train.avro như TensorFlow bộ dữ liệu với Avro bộ dữ liệu API:

features = {
    'features[*]': tfio.experimental.columnar.VarLenFeatureWithRank(dtype=tf.int32),
    'label': tf.io.FixedLenFeature(shape=[], dtype=tf.int32, default_value=-100),
    'dataType': tf.io.FixedLenFeature(shape=[], dtype=tf.string)
}

schema = tf.io.gfile.GFile('train.avsc').read()

dataset = tfio.experimental.columnar.make_avro_record_dataset(file_pattern=['train.avro'],
                                                              reader_schema=schema,
                                                              features=features,
                                                              shuffle=False,
                                                              batch_size=3,
                                                              num_epochs=1)

for record in dataset:
    print(record['features[*]'])
    print(record['label'])
    print(record['dataType'])
    print("--------------------")
SparseTensor(indices=tf.Tensor(
[[0 0]
 [0 1]
 [0 2]
 [0 3]
 [0 4]
 [1 0]
 [1 1]
 [2 0]], shape=(8, 2), dtype=int64), values=tf.Tensor([0 0 0 1 4 0 0 0], shape=(8,), dtype=int32), dense_shape=tf.Tensor([3 5], shape=(2,), dtype=int64))
tf.Tensor([-100    2    3], shape=(3,), dtype=int32)
tf.Tensor([b'TRAINING' b'TRAINING' b'VALIDATION'], shape=(3,), dtype=string)
--------------------
SparseTensor(indices=tf.Tensor([[0 0]], shape=(1, 2), dtype=int64), values=tf.Tensor([1], shape=(1,), dtype=int32), dense_shape=tf.Tensor([1 1], shape=(2,), dtype=int64))
tf.Tensor([4], shape=(1,), dtype=int32)
tf.Tensor([b'VALIDATION'], shape=(1,), dtype=string)
--------------------

Ví dụ cải đạo trên train.avro vào dataset tensorflow. Mỗi phần tử của tập dữ liệu là một từ điển có khóa là tên đối tượng, giá trị là tensor thưa thớt hoặc dày đặc được chuyển đổi. Ví dụ, nó chuyển đổi features , label , dataType trường để một VarLenFeature (SparseTensor), FixedLenFeature (DenseTensor), và FixedLenFeature (DenseTensor) tương ứng. Kể từ batch_size là 3, nó ép buộc 3 hồ sơ từ train.avro thành một phần tử trong tập dữ liệu kết quả. Đối với hồ sơ đầu tiên trong train.avro có nhãn là null, Avro đọc thay thế nó bằng giá trị mặc định cụ thể (-100). Trong ví dụ này, đang có 4 hồ sơ trong tổng số trong train.avro . Kể từ khi kích thước hàng loạt là 3, kết quả dữ liệu chứa 3 yếu tố, cuối cùng trong số đó của kích thước hàng loạt là 1. Tuy nhiên người dùng cũng có thể thả đợt cuối cùng nếu kích thước nhỏ hơn kích thước hàng loạt bằng cách cho phép drop_final_batch . Ví dụ:

dataset = tfio.experimental.columnar.make_avro_record_dataset(file_pattern=['train.avro'],
                                                              reader_schema=schema,
                                                              features=features,
                                                              shuffle=False,
                                                              batch_size=3,
                                                              drop_final_batch=True,
                                                              num_epochs=1)

for record in dataset:
    print(record)
{'features[*]': <tensorflow.python.framework.sparse_tensor.SparseTensor object at 0x7f97656423d0>, 'dataType': <tf.Tensor: shape=(3,), dtype=string, numpy=array([b'TRAINING', b'TRAINING', b'VALIDATION'], dtype=object)>, 'label': <tf.Tensor: shape=(3,), dtype=int32, numpy=array([-100,    2,    3], dtype=int32)>}

Người ta cũng có thể tăng num_parallel_reads để thúc đẩy quá trình xử lý dữ liệu Avro bằng cách tăng tính song song phân tích cú pháp / đọc avro.

dataset = tfio.experimental.columnar.make_avro_record_dataset(file_pattern=['train.avro'],
                                                              reader_schema=schema,
                                                              features=features,
                                                              shuffle=False,
                                                              num_parallel_reads=16,
                                                              batch_size=3,
                                                              drop_final_batch=True,
                                                              num_epochs=1)

for record in dataset:
    print(record)
{'features[*]': <tensorflow.python.framework.sparse_tensor.SparseTensor object at 0x7f9765693990>, 'dataType': <tf.Tensor: shape=(3,), dtype=string, numpy=array([b'TRAINING', b'TRAINING', b'VALIDATION'], dtype=object)>, 'label': <tf.Tensor: shape=(3,), dtype=int32, numpy=array([-100,    2,    3], dtype=int32)>}

Đối với việc sử dụng chi tiết về make_avro_record_dataset , vui lòng tham khảo API doc .

Đào tạo mô hình tf.keras với tập dữ liệu Avro

Bây giờ chúng ta hãy xem qua một ví dụ từ đầu đến cuối về đào tạo mô hình tf.keras với tập dữ liệu Avro dựa trên tập dữ liệu mnist.

Tải train.avro như TensorFlow bộ dữ liệu với Avro bộ dữ liệu API:

features = {
    'features[*]': tfio.experimental.columnar.VarLenFeatureWithRank(dtype=tf.int32)
}

schema = tf.io.gfile.GFile('train.avsc').read()

dataset = tfio.experimental.columnar.make_avro_record_dataset(file_pattern=['train.avro'],
                                                              reader_schema=schema,
                                                              features=features,
                                                              shuffle=False,
                                                              batch_size=1,
                                                              num_epochs=1)

Xác định một mô hình keras đơn giản:

def build_and_compile_cnn_model():
    model = tf.keras.Sequential()
    model.compile(optimizer='sgd', loss='mse')
    return model

model = build_and_compile_cnn_model()

Đào tạo mô hình keras với tập dữ liệu Avro:

model.fit(x=dataset, epochs=1, steps_per_epoch=1, verbose=1)
WARNING:tensorflow:Layers in a Sequential model should only have a single input tensor, but we receive a <class 'dict'> input: {'features[*]': <tensorflow.python.framework.sparse_tensor.SparseTensor object at 0x7f94b00645d0>}
Consider rewriting this model with the Functional API.
WARNING:tensorflow:Layers in a Sequential model should only have a single input tensor, but we receive a <class 'dict'> input: {'features[*]': <tensorflow.python.framework.sparse_tensor.SparseTensor object at 0x7f976476ca90>}
Consider rewriting this model with the Functional API.
1/1 [==============================] - 0s 60ms/step - loss: 0.0000e+00
<tensorflow.python.keras.callbacks.History at 0x7f94ec08c6d0>

Tập dữ liệu avro có thể phân tích cú pháp và ép buộc bất kỳ dữ liệu avro nào vào TensorFlow tensors, bao gồm các bản ghi trong bản ghi, bản đồ, mảng, nhánh và liệt kê. Thông tin phân tích cú pháp được chuyển vào triển khai tập dữ liệu avro dưới dạng bản đồ nơi các khóa mã hóa cách phân tích cú pháp các giá trị dữ liệu mã hóa cách ép dữ liệu vào các tensors TensorFlow - quyết định kiểu nguyên thủy (ví dụ: bool, int, long, float, double, string ) cũng như kiểu tensor (ví dụ: thưa thớt hoặc dày đặc). Danh sách các kiểu phân tích cú pháp của TensorFlow (xem Bảng 1) và độ ép của các kiểu nguyên thủy (Bảng 2) được cung cấp.

Bảng 1 các loại phân tích cú pháp TensorFlow được hỗ trợ:

Các loại phân tích cú pháp TensorFlow TensorFlow Tensors Giải trình
tf.FixedLenFeature ([], tf.int32) căng đặc Phân tích cú pháp một tính năng có độ dài cố định; nghĩa là tất cả các hàng có cùng số phần tử không đổi, ví dụ: chỉ một phần tử hoặc một mảng luôn có cùng số phần tử cho mỗi hàng
tf.SparseFeature (index_key = ['key_1st_index', 'key_2nd_index'], value_key = 'key_value', dtype = tf.int64, size = [20, 50]) tensor thưa thớt Phân tích cú pháp một đối tượng địa lý thưa thớt trong đó mỗi hàng có một danh sách chỉ số và giá trị có độ dài thay đổi. 'Index_key' xác định các chỉ số. 'Value_key' xác định giá trị. 'Dtype' là kiểu dữ liệu. 'Kích thước' là giá trị chỉ mục tối đa dự kiến ​​cho mỗi mục nhập chỉ mục
tfio.experimental.columnar.VarLenFeatureWithRank ([], tf.int64) tensor thưa thớt Phân tích cú pháp một tính năng có độ dài thay đổi; điều đó có nghĩa là mỗi hàng dữ liệu có thể có một số phần tử thay đổi, ví dụ: hàng đầu tiên có 5 phần tử, hàng thứ 2 có 7 phần tử

Bảng 2 chuyển đổi được hỗ trợ từ các loại Avro sang các loại TensorFlow:

Loại nguyên thủy Avro Loại nguyên thủy TensorFlow
boolean: một giá trị nhị phân tf.bool
byte: một chuỗi các byte không dấu 8 bit tf.string
double: số dấu phẩy động IEEE 64-bit chính xác gấp đôi tf.float64
enum: kiểu liệt kê tf.string sử dụng tên biểu tượng
float: số dấu chấm động 32-bit IEEE chính xác đơn tf.float32
int: số nguyên có dấu 32 bit tf.int32
long: số nguyên có dấu 64 bit tf.int64
null: không có giá trị sử dụng giá trị mặc định
string: chuỗi ký tự unicode tf.string

Một tập hợp đầy đủ các ví dụ về Avro bộ dữ liệu API được cung cấp trong các bài kiểm tra .