API Kumpulan Data Avro

Lihat di TensorFlow.org Jalankan di Google Colab Lihat sumber di GitHub Unduh buku catatan

Ringkasan

Tujuan dari Avro Dataset API adalah untuk memuat Avro diformat data yang native ke TensorFlow sebagai TensorFlow dataset . Avro adalah sistem serialisasi data yang mirip dengan Protocol Buffer. Ini banyak digunakan di Apache Hadoop di mana ia dapat menyediakan format serialisasi untuk data persisten, dan format kabel untuk komunikasi antara node Hadoop. Data avro adalah format data biner yang dipadatkan dan berorientasi pada baris. Itu bergantung pada skema yang disimpan sebagai file JSON terpisah. Untuk spec Avro Format dan skema deklarasi, silakan lihat manual resmi .

Paket pengaturan

Instal paket tensorflow-io yang diperlukan

pip install tensorflow-io

paket impor

import tensorflow as tf
import tensorflow_io as tfio

Validasi impor tf dan tfio

print("tensorflow-io version: {}".format(tfio.__version__))
print("tensorflow version: {}".format(tf.__version__))
tensorflow-io version: 0.18.0
tensorflow version: 2.5.0

Penggunaan

Jelajahi kumpulan data

Untuk tujuan tutorial ini, mari unduh contoh dataset Avro.

Unduh contoh file Avro:

curl -OL https://github.com/tensorflow/io/raw/master/docs/tutorials/avro/train.avro
ls -l train.avro
% Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100   151  100   151    0     0   1268      0 --:--:-- --:--:-- --:--:--  1268
100   369  100   369    0     0   1255      0 --:--:-- --:--:-- --:--:--  1255
-rw-rw-r-- 1 kbuilder kokoro 369 May 25 22:23 train.avro

Unduh file skema yang sesuai dari contoh file Avro:

curl -OL https://github.com/tensorflow/io/raw/master/docs/tutorials/avro/train.avsc
ls -l train.avsc
% Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100   151  100   151    0     0   1247      0 --:--:-- --:--:-- --:--:--  1247
100   271  100   271    0     0    780      0 --:--:-- --:--:-- --:--:--   780
-rw-rw-r-- 1 kbuilder kokoro 271 May 25 22:23 train.avsc

Dalam contoh di atas, kumpulan data Avro pengujian dibuat berdasarkan kumpulan data mnist. Dataset mnist asli dalam format TFRecord dihasilkan dari TF bernama dataset . Namun, kumpulan data mnist terlalu besar sebagai kumpulan data demo. Untuk tujuan kesederhanaan, sebagian besar dipangkas dan hanya beberapa catatan pertama yang disimpan. Selain itu, pemangkasan tambahan dilakukan untuk image lapangan dalam dataset mnist asli dan dipetakan ke features lapangan di Avro. Jadi file avro train.avro memiliki 4 catatan, masing-masing memiliki 3 bidang: features , yang merupakan array dari int, label , int atau null, dan dataType , enum. Untuk melihat diterjemahkan train.avro (Catatan asli data file avro tidak dibaca manusia sebagai avro adalah format dipadatkan):

Instal paket yang diperlukan untuk membaca file Avro:

pip install avro

Untuk membaca dan mencetak file Avro dalam format yang dapat dibaca manusia:

from avro.io import DatumReader
from avro.datafile import DataFileReader

import json

def print_avro(avro_file, max_record_num=None):
    if max_record_num is not None and max_record_num <= 0:
        return

    with open(avro_file, 'rb') as avro_handler:
        reader = DataFileReader(avro_handler, DatumReader())
        record_count = 0
        for record in reader:
            record_count = record_count+1
            print(record)
            if max_record_num is not None and record_count == max_record_num:
               break

print_avro(avro_file='train.avro')
{'features': [0, 0, 0, 1, 4], 'label': None, 'dataType': 'TRAINING'}
{'features': [0, 0], 'label': 2, 'dataType': 'TRAINING'}
{'features': [0], 'label': 3, 'dataType': 'VALIDATION'}
{'features': [1], 'label': 4, 'dataType': 'VALIDATION'}

Dan skema train.avro yang diwakili oleh train.avsc adalah file berformat JSON. Untuk melihat train.avsc :

def print_schema(avro_schema_file):
    with open(avro_schema_file, 'r') as handle:
        parsed = json.load(handle)
    print(json.dumps(parsed, indent=4, sort_keys=True))

print_schema('train.avsc')
{
    "fields": [
        {
            "name": "features",
            "type": {
                "items": "int",
                "type": "array"
            }
        },
        {
            "name": "label",
            "type": [
                "int",
                "null"
            ]
        },
        {
            "name": "dataType",
            "type": {
                "name": "dataTypes",
                "symbols": [
                    "TRAINING",
                    "VALIDATION"
                ],
                "type": "enum"
            }
        }
    ],
    "name": "ImageDataset",
    "type": "record"
}

Siapkan kumpulan datanya

Beban train.avro sebagai TensorFlow dataset dengan Avro dataset API:

features = {
    'features[*]': tfio.experimental.columnar.VarLenFeatureWithRank(dtype=tf.int32),
    'label': tf.io.FixedLenFeature(shape=[], dtype=tf.int32, default_value=-100),
    'dataType': tf.io.FixedLenFeature(shape=[], dtype=tf.string)
}

schema = tf.io.gfile.GFile('train.avsc').read()

dataset = tfio.experimental.columnar.make_avro_record_dataset(file_pattern=['train.avro'],
                                                              reader_schema=schema,
                                                              features=features,
                                                              shuffle=False,
                                                              batch_size=3,
                                                              num_epochs=1)

for record in dataset:
    print(record['features[*]'])
    print(record['label'])
    print(record['dataType'])
    print("--------------------")
SparseTensor(indices=tf.Tensor(
[[0 0]
 [0 1]
 [0 2]
 [0 3]
 [0 4]
 [1 0]
 [1 1]
 [2 0]], shape=(8, 2), dtype=int64), values=tf.Tensor([0 0 0 1 4 0 0 0], shape=(8,), dtype=int32), dense_shape=tf.Tensor([3 5], shape=(2,), dtype=int64))
tf.Tensor([-100    2    3], shape=(3,), dtype=int32)
tf.Tensor([b'TRAINING' b'TRAINING' b'VALIDATION'], shape=(3,), dtype=string)
--------------------
SparseTensor(indices=tf.Tensor([[0 0]], shape=(1, 2), dtype=int64), values=tf.Tensor([1], shape=(1,), dtype=int32), dense_shape=tf.Tensor([1 1], shape=(2,), dtype=int64))
tf.Tensor([4], shape=(1,), dtype=int32)
tf.Tensor([b'VALIDATION'], shape=(1,), dtype=string)
--------------------

Contoh di atas bertobat train.avro ke dataset tensorflow. Setiap elemen kumpulan data adalah kamus yang kuncinya adalah nama fitur, nilainya adalah tensor renggang atau padat yang dikonversi. Misalnya, itu mengkonversi features , label , dataType lapangan ke VarLenFeature (SparseTensor), FixedLenFeature (DenseTensor), dan FixedLenFeature (DenseTensor) masing-masing. Sejak batch_size adalah 3, itu memaksa 3 catatan dari train.avro menjadi satu elemen dalam dataset hasil. Untuk catatan pertama di train.avro yang label null, Menggantikan pembaca avro dengan nilai default yang ditentukan (-100). Dalam contoh ini, sudah ada 4 catatan total di train.avro . Karena ukuran batch 3, hasil dataset mengandung 3 unsur, terakhir yang ini ukuran batch 1. Namun pengguna juga mampu menjatuhkan batch terakhir jika ukuran lebih kecil dari ukuran batch dengan memungkinkan drop_final_batch . Misalnya:

dataset = tfio.experimental.columnar.make_avro_record_dataset(file_pattern=['train.avro'],
                                                              reader_schema=schema,
                                                              features=features,
                                                              shuffle=False,
                                                              batch_size=3,
                                                              drop_final_batch=True,
                                                              num_epochs=1)

for record in dataset:
    print(record)
{'features[*]': <tensorflow.python.framework.sparse_tensor.SparseTensor object at 0x7f97656423d0>, 'dataType': <tf.Tensor: shape=(3,), dtype=string, numpy=array([b'TRAINING', b'TRAINING', b'VALIDATION'], dtype=object)>, 'label': <tf.Tensor: shape=(3,), dtype=int32, numpy=array([-100,    2,    3], dtype=int32)>}

Seseorang juga dapat meningkatkan num_parallel_reads untuk mempercepat pemrosesan data Avro dengan meningkatkan paralelisme avro parse/read.

dataset = tfio.experimental.columnar.make_avro_record_dataset(file_pattern=['train.avro'],
                                                              reader_schema=schema,
                                                              features=features,
                                                              shuffle=False,
                                                              num_parallel_reads=16,
                                                              batch_size=3,
                                                              drop_final_batch=True,
                                                              num_epochs=1)

for record in dataset:
    print(record)
{'features[*]': <tensorflow.python.framework.sparse_tensor.SparseTensor object at 0x7f9765693990>, 'dataType': <tf.Tensor: shape=(3,), dtype=string, numpy=array([b'TRAINING', b'TRAINING', b'VALIDATION'], dtype=object)>, 'label': <tf.Tensor: shape=(3,), dtype=int32, numpy=array([-100,    2,    3], dtype=int32)>}

Untuk penggunaan rinci make_avro_record_dataset , silakan lihat API doc .

Latih model tf.keras dengan dataset Avro

Sekarang mari kita telusuri contoh end-to-end pelatihan model tf.keras dengan dataset Avro berdasarkan dataset mnist.

Beban train.avro sebagai TensorFlow dataset dengan Avro dataset API:

features = {
    'features[*]': tfio.experimental.columnar.VarLenFeatureWithRank(dtype=tf.int32)
}

schema = tf.io.gfile.GFile('train.avsc').read()

dataset = tfio.experimental.columnar.make_avro_record_dataset(file_pattern=['train.avro'],
                                                              reader_schema=schema,
                                                              features=features,
                                                              shuffle=False,
                                                              batch_size=1,
                                                              num_epochs=1)

Tentukan model keras sederhana:

def build_and_compile_cnn_model():
    model = tf.keras.Sequential()
    model.compile(optimizer='sgd', loss='mse')
    return model

model = build_and_compile_cnn_model()

Latih model keras dengan dataset Avro:

model.fit(x=dataset, epochs=1, steps_per_epoch=1, verbose=1)
WARNING:tensorflow:Layers in a Sequential model should only have a single input tensor, but we receive a <class 'dict'> input: {'features[*]': <tensorflow.python.framework.sparse_tensor.SparseTensor object at 0x7f94b00645d0>}
Consider rewriting this model with the Functional API.
WARNING:tensorflow:Layers in a Sequential model should only have a single input tensor, but we receive a <class 'dict'> input: {'features[*]': <tensorflow.python.framework.sparse_tensor.SparseTensor object at 0x7f976476ca90>}
Consider rewriting this model with the Functional API.
1/1 [==============================] - 0s 60ms/step - loss: 0.0000e+00
<tensorflow.python.keras.callbacks.History at 0x7f94ec08c6d0>

Kumpulan data avro dapat mengurai dan memaksa data avro apa pun ke dalam tensor TensorFlow, termasuk catatan dalam catatan, peta, larik, cabang, dan enumerasi. Informasi penguraian diteruskan ke implementasi kumpulan data avro sebagai peta di mana kunci mengkodekan cara mengurai nilai data yang dikodekan tentang cara memaksa data ke tensor TensorFlow – memutuskan tipe primitif (mis. bool, int, long, float, double, string ) serta jenis tensor (misalnya jarang atau padat). Daftar tipe parser TensorFlow (lihat Tabel 1) dan paksaan tipe primitif (Tabel 2) disediakan.

Tabel 1 jenis parser TensorFlow yang didukung:

Jenis Parser TensorFlow TensorFlow Tensor Penjelasan
tf.FixedLenFeature([], tf.int32) tensor padat Parsing fitur panjang tetap; yaitu semua baris memiliki jumlah elemen yang sama, misalnya hanya satu elemen atau array yang selalu memiliki jumlah elemen yang sama untuk setiap baris
tf.SparseFeature(index_key=['key_1st_index', 'key_2nd_index'], value_key='key_value', dtype=tf.int64, size=[20, 50]) tensor jarang Parsing fitur sparse di mana setiap baris memiliki daftar panjang variabel indeks dan nilai. 'index_key' mengidentifikasi indeks. 'value_key' mengidentifikasi nilai. 'dtype' adalah tipe data. 'Ukuran' adalah nilai indeks maksimum yang diharapkan untuk setiap entri indeks
tfio.experimental.columnar.VarLenFeatureWithRank([],tf.int64) tensor jarang Parsing fitur panjang variabel; itu artinya setiap baris data dapat memiliki sejumlah variabel elemen, misal baris ke-1 memiliki 5 elemen, baris ke-2 memiliki 7 elemen

Tabel 2 konversi yang didukung dari tipe Avro ke tipe TensorFlow:

Tipe Primitif Avro Jenis Primitif TensorFlow
boolean: nilai biner tf.bool
byte: urutan byte tidak bertanda 8-bit tf.string
ganda: nomor floating point IEEE 64-bit presisi ganda tf.float64
enum: tipe enumerasi tf.string menggunakan nama simbol
float: nomor floating point IEEE 32-bit presisi tunggal tf.float32
int: bilangan bulat bertanda 32-bit tf.int32
panjang: bilangan bulat bertanda 64-bit tf.int64
nol: tidak ada nilai menggunakan nilai default
string: urutan karakter unicode tf.string

Sebuah seperangkat contoh Avro dataset API disediakan dalam tes .