Visão geral
Este tutorial centra-se na transmissão de dados a partir de um ElasticSearch aglomerado num
que é então utilizado em conjunto com tf.keras
de formação e de inferência.
Elasticseach é principalmente um mecanismo de busca distribuído que suporta o armazenamento de dados estruturados, não estruturados, geoespaciais, numéricos, etc. Para o propósito deste tutorial, um conjunto de dados com registros estruturados é utilizado.
Pacotes de configuração
O elasticsearch
pacote é utilizado para preparar e armazenar os dados dentro índices ElasticSearch apenas para fins de demonstração. Em clusters de produção do mundo real com vários nós, o cluster pode estar recebendo os dados de conectores como logstash etc.
Uma vez que os dados estão disponíveis no cluster ElasticSearch, única tensorflow-io
é necessária para transmitir os dados para os modelos.
Instale os pacotes tensorflow-io e elasticsearch necessários
pip install tensorflow-io
pip install elasticsearch
Pacotes de importação
import os
import time
from sklearn.model_selection import train_test_split
from elasticsearch import Elasticsearch
import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow.keras import layers
from tensorflow.keras.layers.experimental import preprocessing
import tensorflow_io as tfio
Validar importações tf e tfio
print("tensorflow-io version: {}".format(tfio.__version__))
print("tensorflow version: {}".format(tf.__version__))
tensorflow-io version: 0.16.0 tensorflow version: 2.3.0
Baixe e configure a instância Elasticsearch
Para fins de demonstração, é usada a versão de código aberto do pacote elasticsearch.
wget -q
wget -q
tar -xzf elasticsearch-oss-7.9.2-linux-x86_64.tar.gz
sudo chown -R daemon:daemon elasticsearch-7.9.2/
shasum -a 512 -c elasticsearch-oss-7.9.2-linux-x86_64.tar.gz.sha512
elasticsearch-oss-7.9.2-linux-x86_64.tar.gz: OK
Execute a instância como um processo daemon
sudo -H -u daemon elasticsearch-7.9.2/bin/elasticsearch
Starting job # 0 in a separate thread.
# Sleep for few seconds to let the instance start.
Uma vez que a instância foi iniciada, grep para elasticsearch
nos processos lista para confirmar a disponibilidade.
ps -ef | grep elasticsearch
root 144 142 0 21:24 ? 00:00:00 sudo -H -u daemon elasticsearch-7.9.2/bin/elasticsearch daemon 145 144 86 21:24 ? 00:00:17 /content/elasticsearch-7.9.2/jdk/bin/java -Xshare:auto -Des.networkaddress.cache.ttl=60 -Des.networkaddress.cache.negative.ttl=10 -XX:+AlwaysPreTouch -Xss1m -Djava.awt.headless=true -Dfile.encoding=UTF-8 -Djna.nosys=true -XX:-OmitStackTraceInFastThrow -XX:+ShowCodeDetailsInExceptionMessages -Dio.netty.noUnsafe=true -Dio.netty.noKeySetOptimization=true -Dio.netty.recycler.maxCapacityPerThread=0 -Dio.netty.allocator.numDirectArenas=0 -Dlog4j.shutdownHookEnabled=false -Dlog4j2.disable.jmx=true -Djava.locale.providers=SPI,COMPAT -Xms1g -Xmx1g -XX:+UseG1GC -XX:G1ReservePercent=25 -XX:InitiatingHeapOccupancyPercent=30 -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=data -XX:ErrorFile=logs/hs_err_pid%p.log -Xlog:gc*,gc+age=trace,safepoint:file=logs/gc.log:utctime,pid,tags:filecount=32,filesize=64m -XX:MaxDirectMemorySize=536870912 -Des.path.home=/content/elasticsearch-7.9.2 -Des.path.conf=/content/elasticsearch-7.9.2/config -Des.distribution.flavor=oss -Des.distribution.type=tar -Des.bundled_jdk=true -cp /content/elasticsearch-7.9.2/lib/* org.elasticsearch.bootstrap.Elasticsearch root 382 380 0 21:24 ? 00:00:00 grep elasticsearch
consulte o ponto de extremidade de base para recuperar informações sobre o cluster.
curl -sX GET "localhost:9200/"
{ "name" : "d1bc7d054c69", "cluster_name" : "elasticsearch", "cluster_uuid" : "P8YXfKqYS-OS3k9CdMmlsw", "version" : { "number" : "7.9.2", "build_flavor" : "oss", "build_type" : "tar", "build_hash" : "d34da0ea4a966c4e49417f2da2f244e3e97b4e6e", "build_date" : "2020-09-23T00:45:33.626720Z", "build_snapshot" : false, "lucene_version" : "8.6.2", "minimum_wire_compatibility_version" : "6.8.0", "minimum_index_compatibility_version" : "6.0.0-beta1" }, "tagline" : "You Know, for Search" }
Explore o conjunto de dados
Para o propósito deste tutorial, vamos baixar o Petfinder conjunto de dados e alimentar os dados em ElasticSearch manualmente. O objetivo desse problema de classificação é prever se o pet será adotado ou não.
dataset_url = ''
csv_file = 'datasets/petfinder-mini/petfinder-mini.csv'
tf.keras.utils.get_file('', dataset_url,
extract=True, cache_dir='.')
pf_df = pd.read_csv(csv_file)
Downloading data from 1671168/1668792 [==============================] - 0s 0us/step
Para o propósito do tutorial, modificações são feitas na coluna do rótulo. 0 indicará que o animal de estimação não foi adotado e 1 indicará que foi.
# In the original dataset "4" indicates the pet was not adopted.
pf_df['target'] = np.where(pf_df['AdoptionSpeed']==4, 0, 1)
# Drop un-used columns.
pf_df = pf_df.drop(columns=['AdoptionSpeed', 'Description'])
# Number of datapoints and columns
len(pf_df), len(pf_df.columns)
(11537, 14)
Divida o conjunto de dados
train_df, test_df = train_test_split(pf_df, test_size=0.3, shuffle=True)
print("Number of training samples: ",len(train_df))
print("Number of testing sample: ",len(test_df))
Number of training samples: 8075 Number of testing sample: 3462
Armazene os dados de treinamento e teste em índices de pesquisa elástica
Armazenar os dados no cluster elasticsearch local simula um ambiente para recuperação remota de dados contínua para fins de treinamento e inferência.
ES_NODES = "http://localhost:9200"
def prepare_es_data(index, doc_type, df):
records = df.to_dict(orient="records")
es_data = []
for idx, record in enumerate(records):
meta_dict = {
"index": {
"_index": index,
"_type": doc_type,
"_id": idx
return es_data
def index_es_data(index, es_data):
es_client = Elasticsearch(hosts = [ES_NODES])
if es_client.indices.exists(index):
print("deleting the '{}' index.".format(index))
res = es_client.indices.delete(index=index)
print("Response from server: {}".format(res))
print("creating the '{}' index.".format(index))
res = es_client.indices.create(index=index)
print("Response from server: {}".format(res))
print("bulk index the data")
res = es_client.bulk(index=index, body=es_data, refresh = True)
print("Errors: {}, Num of records indexed: {}".format(res["errors"], len(res["items"])))
train_es_data = prepare_es_data(index="train", doc_type="pet", df=train_df)
test_es_data = prepare_es_data(index="test", doc_type="pet", df=test_df)
index_es_data(index="train", es_data=train_es_data)
index_es_data(index="test", es_data=test_es_data)
creating the 'train' index. Response from server: {'acknowledged': True, 'shards_acknowledged': True, 'index': 'train'} bulk index the data /usr/local/lib/python3.6/dist-packages/elasticsearch/connection/ ElasticsearchDeprecationWarning: [types removal] Specifying types in bulk requests is deprecated. warnings.warn(message, category=ElasticsearchDeprecationWarning) Errors: False, Num of records indexed: 8075 creating the 'test' index. Response from server: {'acknowledged': True, 'shards_acknowledged': True, 'index': 'test'} bulk index the data Errors: False, Num of records indexed: 3462
Prepare conjuntos de dados tfio
Uma vez que os dados estão disponíveis no cluster, apenas tensorflow-io
é necessária para transmitir os dados a partir dos índices. O elasticsearch.ElasticsearchIODataset
classe é utilizado para esta finalidade. Os herda da classe de
e, portanto, expõe todas as funcionalidades úteis do
fora da caixa.
Conjunto de dados de treinamento
HEADERS = {"Content-Type": "application/json"}
train_ds = tfio.experimental.elasticsearch.ElasticsearchIODataset(
# Prepare a tuple of (features, label)
train_ds = v: (v, v.pop("target")))
train_ds = train_ds.batch(BATCH_SIZE)
Connection successful: http://localhost:9200/_cluster/health
Testando conjunto de dados
test_ds = tfio.experimental.elasticsearch.ElasticsearchIODataset(
# Prepare a tuple of (features, label)
test_ds = v: (v, v.pop("target")))
test_ds = test_ds.batch(BATCH_SIZE)
Connection successful: http://localhost:9200/_cluster/health
Defina as camadas de pré-processamento do keras
De acordo com o tutorial de dados estruturados , recomenda-se usar as Camadas Keras pré-processamento como eles são mais intuitivo, e pode ser facilmente integrado com os modelos. No entanto, o padrão feature_columns também pode ser usado.
Para uma melhor compreensão dos preprocessing_layers
na classificação dados estruturados, consulte o tutorial de dados estruturados
def get_normalization_layer(name, dataset):
# Create a Normalization layer for our feature.
normalizer = preprocessing.Normalization()
# Prepare a Dataset that only yields our feature.
feature_ds = x, y: x[name])
# Learn the statistics of the data.
return normalizer
def get_category_encoding_layer(name, dataset, dtype, max_tokens=None):
# Create a StringLookup layer which will turn strings into integer indices
if dtype == 'string':
index = preprocessing.StringLookup(max_tokens=max_tokens)
index = preprocessing.IntegerLookup(max_values=max_tokens)
# Prepare a Dataset that only yields our feature
feature_ds = x, y: x[name])
# Learn the set of possible values and assign them a fixed integer index.
# Create a Discretization for our integer indices.
encoder = preprocessing.CategoryEncoding(max_tokens=index.vocab_size())
# Prepare a Dataset that only yields our feature.
feature_ds =
# Learn the space of possible indices.
# Apply one-hot encoding to our indices. The lambda function captures the
# layer so you can use them, or include them in the functional model later.
return lambda feature: encoder(index(feature))
Busque um lote e observe as características de um registro de amostra. Isso ajudará na definição das keras de pré-processamento camadas para treinar o tf.keras
ds_iter = iter(train_ds)
features, label = next(ds_iter)
{key: value.numpy()[0] for key,value in features.items()}
{'Age': 2, 'Breed1': b'Tabby', 'Color1': b'Black', 'Color2': b'Cream', 'Fee': 0, 'FurLength': b'Short', 'Gender': b'Male', 'Health': b'Healthy', 'MaturitySize': b'Small', 'PhotoAmt': 4, 'Sterilized': b'No', 'Type': b'Cat', 'Vaccinated': b'No'}
Escolha um subconjunto de recursos.
all_inputs = []
encoded_features = []
# Numeric features.
for header in ['PhotoAmt', 'Fee']:
numeric_col = tf.keras.Input(shape=(1,), name=header)
normalization_layer = get_normalization_layer(header, train_ds)
encoded_numeric_col = normalization_layer(numeric_col)
# Categorical features encoded as string.
categorical_cols = ['Type', 'Color1', 'Color2', 'Gender', 'MaturitySize',
'FurLength', 'Vaccinated', 'Sterilized', 'Health', 'Breed1']
for header in categorical_cols:
categorical_col = tf.keras.Input(shape=(1,), name=header, dtype='string')
encoding_layer = get_category_encoding_layer(header, train_ds, dtype='string',
encoded_categorical_col = encoding_layer(categorical_col)
Construir, compilar e treinar o modelo
# Set the parameters
# Convert the feature columns into a tf.keras layer
all_features = tf.keras.layers.concatenate(encoded_features)
# design/build the model
x = tf.keras.layers.Dense(32, activation="relu")(all_features)
x = tf.keras.layers.Dropout(0.5)(x)
x = tf.keras.layers.Dense(64, activation="relu")(x)
x = tf.keras.layers.Dropout(0.5)(x)
output = tf.keras.layers.Dense(1)(x)
model = tf.keras.Model(all_inputs, output)
tf.keras.utils.plot_model(model, rankdir='LR', show_shapes=True)
# compile the model
model.compile(optimizer=OPTIMIZER, loss=LOSS, metrics=METRICS)
# fit the model, epochs=EPOCHS)
Epoch 1/10 /usr/local/lib/python3.6/dist-packages/tensorflow/python/keras/engine/ UserWarning: Input dict contained keys ['Age'] which did not match any model input. They will be ignored by the model. [n for n in tensors.keys() if n not in ref_input_names]) 253/253 [==============================] - 4s 14ms/step - loss: 0.6169 - accuracy: 0.6042 Epoch 2/10 253/253 [==============================] - 4s 14ms/step - loss: 0.5634 - accuracy: 0.6937 Epoch 3/10 253/253 [==============================] - 4s 15ms/step - loss: 0.5573 - accuracy: 0.6981 Epoch 4/10 253/253 [==============================] - 4s 15ms/step - loss: 0.5528 - accuracy: 0.7087 Epoch 5/10 253/253 [==============================] - 4s 14ms/step - loss: 0.5512 - accuracy: 0.7173 Epoch 6/10 253/253 [==============================] - 4s 15ms/step - loss: 0.5456 - accuracy: 0.7219 Epoch 7/10 253/253 [==============================] - 4s 15ms/step - loss: 0.5397 - accuracy: 0.7283 Epoch 8/10 253/253 [==============================] - 4s 14ms/step - loss: 0.5385 - accuracy: 0.7331 Epoch 9/10 253/253 [==============================] - 4s 15ms/step - loss: 0.5355 - accuracy: 0.7326 Epoch 10/10 253/253 [==============================] - 4s 15ms/step - loss: 0.5412 - accuracy: 0.7321 <tensorflow.python.keras.callbacks.History at 0x7f5c235112e8>
Inferir sobre os dados de teste
res = model.evaluate(test_ds)
print("test loss, test acc:", res)
/usr/local/lib/python3.6/dist-packages/tensorflow/python/keras/engine/ UserWarning: Input dict contained keys ['Age'] which did not match any model input. They will be ignored by the model. [n for n in tensors.keys() if n not in ref_input_names]) 109/109 [==============================] - 2s 15ms/step - loss: 0.5344 - accuracy: 0.7421 test loss, test acc: [0.534355640411377, 0.7420566082000732]