למידת מכונה חזקה על הזרמת נתונים באמצעות Kafka ו- Tensorflow-IO

הצג באתר TensorFlow.org הפעל בגוגל קולאב צפה במקור ב-GitHub הורד מחברת

סקירה כללית

הדרכה זו מתמקדת הזרמת נתונים מתוך קפקא אשכול לתוך tf.data.Dataset אשר משמש בשילוב עם tf.keras לאימונים היקש.

קפקא היא בעיקרה פלטפורמת הזרמת אירועים מבוזרת המספקת נתונים ניתנים להרחבה וסובלנות לתקלות על פני צינורות נתונים. זהו מרכיב טכני חיוני של שפע של ארגונים גדולים שבהם אספקת נתונים קריטיים למשימה היא דרישה ראשית.

להכין

התקן את חבילות tensorflow-io ו-kafka הנדרשות

pip install tensorflow-io
pip install kafka-python

ייבוא ​​חבילות

import os
from datetime import datetime
import time
import threading
import json
from kafka import KafkaProducer
from kafka.errors import KafkaError
from sklearn.model_selection import train_test_split
import pandas as pd
import tensorflow as tf
import tensorflow_io as tfio

אימות יבוא tf ו-tfio

print("tensorflow-io version: {}".format(tfio.__version__))
print("tensorflow version: {}".format(tf.__version__))
tensorflow-io version: 0.23.1
tensorflow version: 2.8.0-rc0

הורד והגדר מופעים של Kafka ו-Zookeeper

למטרות הדגמה, המופעים הבאים מוגדרים באופן מקומי:

  • קפקא (ברוקרים: 127.0.0.1:9092)
  • שומר גן החיות (צומת: 127.0.0.1:2181)
curl -sSOL https://downloads.apache.org/kafka/2.7.2/kafka_2.13-2.7.2.tgz
tar -xzf kafka_2.13-2.7.2.tgz

שימוש בתצורות ברירת המחדל (המסופקות על ידי Apache Kafka) לסיבוב המופעים.

./kafka_2.13-2.7.2/bin/zookeeper-server-start.sh -daemon ./kafka_2.13-2.7.2/config/zookeeper.properties
./kafka_2.13-2.7.2/bin/kafka-server-start.sh -daemon ./kafka_2.13-2.7.2/config/server.properties
echo "Waiting for 10 secs until kafka and zookeeper services are up and running"
sleep 10
Waiting for 10 secs until kafka and zookeeper services are up and running

לאחר המופעים מתחילים בפעילות תהליכי daemon, grep עבור kafka ברשימת התהליכים. שני תהליכי ה-Java תואמים ל-zookeeper ולמופעי קפקא.

ps -ef | grep kafka
kbuilder 27856 20044  4 20:28 ?        00:00:00 python /tmpfs/src/gfile/executor.py --input_notebook=/tmpfs/src/temp/docs/tutorials/kafka.ipynb --timeout=15000
kbuilder 28271     1 16 20:28 ?        00:00:01 java -Xmx512M -Xms512M -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -XX:MaxInlineLevel=15 -Djava.awt.headless=true -Xlog:gc*:file=/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../logs/zookeeper-gc.log:time,tags:filecount=10,filesize=100M -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dkafka.logs.dir=/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../logs -Dlog4j.configuration=file:./kafka_2.13-2.7.2/bin/../config/log4j.properties -cp /tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/activation-1.1.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/aopalliance-repackaged-2.6.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/argparse4j-0.7.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/audience-annotations-0.5.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/commons-cli-1.4.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/commons-lang3-3.8.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/connect-api-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/connect-basic-auth-extension-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/connect-file-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/connect-json-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/connect-mirror-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/connect-mirror-client-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/connect-runtime-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/connect-transforms-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/hk2-api-2.6.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/hk2-locator-2.6.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/hk2-utils-2.6.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-annotations-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-core-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-databind-2.10.5.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-dataformat-csv-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-datatype-jdk8-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-jaxrs-base-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-jaxrs-json-provider-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-module-jaxb-annotations-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-module-paranamer-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-module-scala_2.13-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jakarta.activation-api-1.2.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jakarta.annotation-api-1.3.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jakarta.inject-2.6.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jakarta.validation-api-2.0.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jakarta.ws.rs-api-2.1.6.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jakarta.xml.bind-api-2.3.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/javassist-3.25.0-GA.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/javassist-3.26.0-GA.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/javax.servlet-api-3.1.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/javax.ws.rs-api-2.1.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jaxb-api-2.3.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jersey-client-2.34.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jersey-common-2.34.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jersey-container-servlet-2.34.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jersey-container-servlet-core-2.34.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jersey-hk2-2.34.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jersey-server-2.34.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-client-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-continuation-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-http-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-io-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-security-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-server-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-servlet-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-servlets-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-util-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-util-ajax-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jopt-simple-5.0.4.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka-clients-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka-log4j-appender-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka-raft-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka-streams-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka-streams-examples-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka-streams-scala_2.13-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka-streams-test-utils-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka-tools-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka_2.13-2.7.2-sources.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka_2.13-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/log4j-1.2.17.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/lz4-java-1.7.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/maven-artifact-3.8.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/metrics-core-2.2.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/netty-buffer-4.1.59.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/netty-codec-4.1.59.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/netty-common-4.1.59.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/netty-handler-4.1.59.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/netty-resolver-4.1.59.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/netty-transport-4.1.59.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/netty-transport-native-epoll-4.1.59.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/netty-transport-native-unix-common-4.1.59.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/osgi-resource-locator-1.0.3.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/paranamer-2.8.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/plexus-utils-3.2.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/reflections-0.9.12.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/rocksdbjni-5.18.4.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/scala-collection-compat_2.13-2.2.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/scala-java8-compat_2.13-0.9.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/scala-library-2.13.3.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/scala-logging_2.13-3.9.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/scala-reflect-2.13.3.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/slf4j-api-1.7.30.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/slf4j-log4j12-1.7.30.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/snappy-java-1.1.7.7.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/zookeeper-3.5.9.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/zookeeper-jute-3.5.9.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/zstd-jni-1.4.5-6.jar org.apache.zookeeper.server.quorum.QuorumPeerMain ./kafka_2.13-2.7.2/config/zookeeper.properties
kbuilder 28635     1 57 20:28 ?        00:00:05 java -Xmx1G -Xms1G -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -XX:MaxInlineLevel=15 -Djava.awt.headless=true -Xlog:gc*:file=/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../logs/kafkaServer-gc.log:time,tags:filecount=10,filesize=100M -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dkafka.logs.dir=/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../logs -Dlog4j.configuration=file:./kafka_2.13-2.7.2/bin/../config/log4j.properties -cp /tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/activation-1.1.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/aopalliance-repackaged-2.6.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/argparse4j-0.7.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/audience-annotations-0.5.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/commons-cli-1.4.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/commons-lang3-3.8.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/connect-api-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/connect-basic-auth-extension-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/connect-file-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/connect-json-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/connect-mirror-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/connect-mirror-client-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/connect-runtime-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/connect-transforms-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/hk2-api-2.6.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/hk2-locator-2.6.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/hk2-utils-2.6.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-annotations-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-core-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-databind-2.10.5.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-dataformat-csv-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-datatype-jdk8-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-jaxrs-base-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-jaxrs-json-provider-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-module-jaxb-annotations-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-module-paranamer-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-module-scala_2.13-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jakarta.activation-api-1.2.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jakarta.annotation-api-1.3.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jakarta.inject-2.6.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jakarta.validation-api-2.0.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jakarta.ws.rs-api-2.1.6.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jakarta.xml.bind-api-2.3.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/javassist-3.25.0-GA.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/javassist-3.26.0-GA.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/javax.servlet-api-3.1.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/javax.ws.rs-api-2.1.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jaxb-api-2.3.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jersey-client-2.34.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jersey-common-2.34.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jersey-container-servlet-2.34.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jersey-container-servlet-core-2.34.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jersey-hk2-2.34.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jersey-server-2.34.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-client-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-continuation-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-http-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-io-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-security-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-server-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-servlet-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-servlets-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-util-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-util-ajax-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jopt-simple-5.0.4.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka-clients-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka-log4j-appender-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka-raft-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka-streams-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka-streams-examples-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka-streams-scala_2.13-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka-streams-test-utils-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka-tools-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka_2.13-2.7.2-sources.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka_2.13-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/log4j-1.2.17.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/lz4-java-1.7.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/maven-artifact-3.8.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/metrics-core-2.2.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/netty-buffer-4.1.59.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/netty-codec-4.1.59.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/netty-common-4.1.59.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/netty-handler-4.1.59.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/netty-resolver-4.1.59.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/netty-transport-4.1.59.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/netty-transport-native-epoll-4.1.59.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/netty-transport-native-unix-common-4.1.59.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/osgi-resource-locator-1.0.3.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/paranamer-2.8.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/plexus-utils-3.2.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/reflections-0.9.12.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/rocksdbjni-5.18.4.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/scala-collection-compat_2.13-2.2.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/scala-java8-compat_2.13-0.9.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/scala-library-2.13.3.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/scala-logging_2.13-3.9.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/scala-reflect-2.13.3.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/slf4j-api-1.7.30.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/slf4j-log4j12-1.7.30.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/snappy-java-1.1.7.7.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/zookeeper-3.5.9.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/zookeeper-jute-3.5.9.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/zstd-jni-1.4.5-6.jar kafka.Kafka ./kafka_2.13-2.7.2/config/server.properties
kbuilder 28821 27860  0 20:28 pts/0    00:00:00 /bin/bash -c ps -ef | grep kafka
kbuilder 28823 28821  0 20:28 pts/0    00:00:00 grep kafka

צור את נושאי הקפקא עם המפרט הבא:

  • susy-train: partitions=1, replication-factor=1
  • susy-test: partitions=2, replication-factor=1
./kafka_2.13-2.7.2/bin/kafka-topics.sh --create --bootstrap-server 127.0.0.1:9092 --replication-factor 1 --partitions 1 --topic susy-train
./kafka_2.13-2.7.2/bin/kafka-topics.sh --create --bootstrap-server 127.0.0.1:9092 --replication-factor 1 --partitions 2 --topic susy-test
Created topic susy-train.
Created topic susy-test.

תאר את הנושא לפרטים על התצורה

./kafka_2.13-2.7.2/bin/kafka-topics.sh --describe --bootstrap-server 127.0.0.1:9092 --topic susy-train
./kafka_2.13-2.7.2/bin/kafka-topics.sh --describe --bootstrap-server 127.0.0.1:9092 --topic susy-test
Topic: susy-train PartitionCount: 1 ReplicationFactor: 1  Configs: segment.bytes=1073741824
    Topic: susy-train Partition: 0  Leader: 0 Replicas: 0   Isr: 0
Topic: susy-test  PartitionCount: 2 ReplicationFactor: 1  Configs: segment.bytes=1073741824
    Topic: susy-test  Partition: 0  Leader: 0 Replicas: 0   Isr: 0
    Topic: susy-test  Partition: 1  Leader: 0 Replicas: 0   Isr: 0

גורם השכפול 1 מציין שהנתונים אינם משוכפלים. זאת בשל נוכחותו של מתווך יחיד במערך הקפקא שלנו. במערכות ייצור, מספר שרתי האתחול יכול להיות בטווח של 100 צמתים. כאן נכנסת לתמונה סבילות התקלות באמצעות שכפול.

עיין Docs לפרטים נוספים.

מערך נתונים של SUSY

בהיות קפקא פלטפורמת הזרמת אירועים, מאפשרת כתיבת נתונים ממקורות שונים לתוכה. לדוגמה:

  • יומני תעבורה באינטרנט
  • מדידות אסטרונומיות
  • נתוני חיישני IoT
  • ביקורות על מוצרים ועוד רבים.

לצורך הדרכה זו, מאפשרת להוריד את סוזי במערך ולהאכיל את הנתונים לתוך קפקא באופן ידני. המטרה של בעיית סיווג זו היא להבחין בין תהליך אות אשר מייצר חלקיקים סופר-סימטריים לבין תהליך רקע שאינו עושה זאת.

curl -sSOL https://archive.ics.uci.edu/ml/machine-learning-databases/00279/SUSY.csv.gz

חקור את מערך הנתונים

העמודה הראשונה היא תווית המחלקה (1 לאות, 0 לרקע), ואחריה 18 תכונות (8 תכונות ברמה נמוכה ואז 10 תכונות ברמה גבוהה). 8 התכונות הראשונות הן תכונות קינמטיות הנמדדות על ידי גלאי החלקיקים במאיץ. 10 התכונות האחרונות הן פונקציות של 8 התכונות הראשונות. אלו הן תכונות ברמה גבוהה שנגזרו על ידי פיזיקאים כדי לעזור להבחין בין שני המעמדות.

COLUMNS = [
          #  labels
           'class',
          #  low-level features
           'lepton_1_pT',
           'lepton_1_eta',
           'lepton_1_phi',
           'lepton_2_pT',
           'lepton_2_eta',
           'lepton_2_phi',
           'missing_energy_magnitude',
           'missing_energy_phi',
          #  high-level derived features
           'MET_rel',
           'axial_MET',
           'M_R',
           'M_TR_2',
           'R',
           'MT2',
           'S_R',
           'M_Delta_R',
           'dPhi_r_b',
           'cos(theta_r1)'
           ]

מערך הנתונים כולו מורכב מ-5 מיליון שורות. עם זאת, לצורך מדריך זה, הבה ניקח בחשבון רק חלק ממערך הנתונים (100,000 שורות) כך שפחות זמן מושקע בהעברת הנתונים ויותר זמן בהבנת הפונקציונליות של ה-API.

susy_iterator = pd.read_csv('SUSY.csv.gz', header=None, names=COLUMNS, chunksize=100000)
susy_df = next(susy_iterator)
susy_df.head()
# Number of datapoints and columns
len(susy_df), len(susy_df.columns)
(100000, 19)
# Number of datapoints belonging to each class (0: background noise, 1: signal)
len(susy_df[susy_df["class"]==0]), len(susy_df[susy_df["class"]==1])
(54025, 45975)

פצל את מערך הנתונים

train_df, test_df = train_test_split(susy_df, test_size=0.4, shuffle=True)
print("Number of training samples: ",len(train_df))
print("Number of testing sample: ",len(test_df))

x_train_df = train_df.drop(["class"], axis=1)
y_train_df = train_df["class"]

x_test_df = test_df.drop(["class"], axis=1)
y_test_df = test_df["class"]

# The labels are set as the kafka message keys so as to store data
# in multiple-partitions. Thus, enabling efficient data retrieval
# using the consumer groups.
x_train = list(filter(None, x_train_df.to_csv(index=False).split("\n")[1:]))
y_train = list(filter(None, y_train_df.to_csv(index=False).split("\n")[1:]))

x_test = list(filter(None, x_test_df.to_csv(index=False).split("\n")[1:]))
y_test = list(filter(None, y_test_df.to_csv(index=False).split("\n")[1:]))
Number of training samples:  60000
Number of testing sample:  40000
NUM_COLUMNS = len(x_train_df.columns)
len(x_train), len(y_train), len(x_test), len(y_test)
(60000, 60000, 40000, 40000)

אחסן את הרכבת ונתוני הבדיקה בקפקא

אחסון הנתונים בקפקא מדמה סביבה לאחזור נתונים רציף מרחוק למטרות הדרכה והסקת מסקנות.

def error_callback(exc):
    raise Exception('Error while sendig data to kafka: {0}'.format(str(exc)))

def write_to_kafka(topic_name, items):
  count=0
  producer = KafkaProducer(bootstrap_servers=['127.0.0.1:9092'])
  for message, key in items:
    producer.send(topic_name, key=key.encode('utf-8'), value=message.encode('utf-8')).add_errback(error_callback)
    count+=1
  producer.flush()
  print("Wrote {0} messages into topic: {1}".format(count, topic_name))

write_to_kafka("susy-train", zip(x_train, y_train))
write_to_kafka("susy-test", zip(x_test, y_test))
Wrote 60000 messages into topic: susy-train
Wrote 40000 messages into topic: susy-test

הגדר את מערך הנתונים של הרכבת tfio

IODataset בכיתה מנוצל עבור הזרמת נתוני קפקא לתוך tensorflow. יורש בכיתה מ tf.data.Dataset ובכך יש את כל הפונקציות שימושי של tf.data.Dataset מהקופסה.

def decode_kafka_item(item):
  message = tf.io.decode_csv(item.message, [[0.0] for i in range(NUM_COLUMNS)])
  key = tf.strings.to_number(item.key)
  return (message, key)

BATCH_SIZE=64
SHUFFLE_BUFFER_SIZE=64
train_ds = tfio.IODataset.from_kafka('susy-train', partition=0, offset=0)
train_ds = train_ds.shuffle(buffer_size=SHUFFLE_BUFFER_SIZE)
train_ds = train_ds.map(decode_kafka_item)
train_ds = train_ds.batch(BATCH_SIZE)
2022-01-07 20:29:21.602817: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected

בנה ואימון המודל

# Set the parameters

OPTIMIZER="adam"
LOSS=tf.keras.losses.BinaryCrossentropy(from_logits=True)
METRICS=['accuracy']
EPOCHS=10
# design/build the model
model = tf.keras.Sequential([
  tf.keras.layers.Input(shape=(NUM_COLUMNS,)),
  tf.keras.layers.Dense(128, activation='relu'),
  tf.keras.layers.Dropout(0.2),
  tf.keras.layers.Dense(256, activation='relu'),
  tf.keras.layers.Dropout(0.4),
  tf.keras.layers.Dense(128, activation='relu'),
  tf.keras.layers.Dropout(0.4),
  tf.keras.layers.Dense(1, activation='sigmoid')
])

print(model.summary())
Model: "sequential"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
=================================================================
 dense (Dense)               (None, 128)               2432      
                                                                 
 dropout (Dropout)           (None, 128)               0         
                                                                 
 dense_1 (Dense)             (None, 256)               33024     
                                                                 
 dropout_1 (Dropout)         (None, 256)               0         
                                                                 
 dense_2 (Dense)             (None, 128)               32896     
                                                                 
 dropout_2 (Dropout)         (None, 128)               0         
                                                                 
 dense_3 (Dense)             (None, 1)                 129       
                                                                 
=================================================================
Total params: 68,481
Trainable params: 68,481
Non-trainable params: 0
_________________________________________________________________
None
# compile the model
model.compile(optimizer=OPTIMIZER, loss=LOSS, metrics=METRICS)
# fit the model
model.fit(train_ds, epochs=EPOCHS)
Epoch 1/10
/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow/python/util/dispatch.py:1082: UserWarning: "`binary_crossentropy` received `from_logits=True`, but the `output` argument was produced by a sigmoid or softmax activation and thus does not represent logits. Was this intended?"
  return dispatch_target(*args, **kwargs)
938/938 [==============================] - 31s 33ms/step - loss: 0.4817 - accuracy: 0.7691
Epoch 2/10
938/938 [==============================] - 30s 32ms/step - loss: 0.4550 - accuracy: 0.7875
Epoch 3/10
938/938 [==============================] - 31s 32ms/step - loss: 0.4512 - accuracy: 0.7911
Epoch 4/10
938/938 [==============================] - 31s 32ms/step - loss: 0.4487 - accuracy: 0.7940
Epoch 5/10
938/938 [==============================] - 31s 32ms/step - loss: 0.4466 - accuracy: 0.7934
Epoch 6/10
938/938 [==============================] - 31s 32ms/step - loss: 0.4459 - accuracy: 0.7933
Epoch 7/10
938/938 [==============================] - 31s 32ms/step - loss: 0.4448 - accuracy: 0.7935
Epoch 8/10
938/938 [==============================] - 31s 32ms/step - loss: 0.4439 - accuracy: 0.7950
Epoch 9/10
938/938 [==============================] - 31s 32ms/step - loss: 0.4421 - accuracy: 0.7956
Epoch 10/10
938/938 [==============================] - 31s 32ms/step - loss: 0.4425 - accuracy: 0.7962
<keras.callbacks.History at 0x7fb364fd2a90>

מכיוון שרק חלק ממערך הנתונים נמצא בשימוש, הדיוק שלנו מוגבל ל~78% במהלך שלב ההדרכה. עם זאת, אל תהסס לאחסן נתונים נוספים ב-kafka לביצועי דגם טובים יותר. כמו כן, מכיוון שהמטרה הייתה רק להדגים את הפונקציונליות של מערכי הנתונים של tfio kafka, נעשה שימוש ברשת עצבית קטנה יותר ופחות מסובכת. עם זאת, ניתן להגדיל את המורכבות של המודל, לשנות את אסטרטגיית הלמידה, לכוון פרמטרים היפר וכו' למטרות חקירה. לקבלת גישה הבסיס, עיין זה מאמר .

הסיק על נתוני הבדיקה

כדי להסיק על נתוני הבדיקה על ידי הקפדה על סמנטיקה "בדיוק-פעם" יחד עם עמידות גבוהה בפני תקלות, את streaming.KafkaGroupIODataset יכול להיות מנוצל.

הגדר את מערך הנתונים של בדיקת tfio

stream_timeout בלוקי הפרמטר למשך פרק זמן נתון עבור נקודה נתונים חדשות להיות מוזרמים לתוך הנושא. זה מסיר את הצורך ביצירת מערכי נתונים חדשים אם הנתונים מוזרמים לנושא באופן לסירוגין.

test_ds = tfio.experimental.streaming.KafkaGroupIODataset(
    topics=["susy-test"],
    group_id="testcg",
    servers="127.0.0.1:9092",
    stream_timeout=10000,
    configuration=[
        "session.timeout.ms=7000",
        "max.poll.interval.ms=8000",
        "auto.offset.reset=earliest"
    ],
)

def decode_kafka_test_item(raw_message, raw_key):
  message = tf.io.decode_csv(raw_message, [[0.0] for i in range(NUM_COLUMNS)])
  key = tf.strings.to_number(raw_key)
  return (message, key)

test_ds = test_ds.map(decode_kafka_test_item)
test_ds = test_ds.batch(BATCH_SIZE)
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow_io/python/experimental/kafka_group_io_dataset_ops.py:188: take_while (from tensorflow.python.data.experimental.ops.take_while_ops) is deprecated and will be removed in a future version.
Instructions for updating:
Use `tf.data.Dataset.take_while(...)

למרות שניתן להשתמש בשיעור זה למטרות הדרכה, ישנן אזהרות שיש לטפל בהן. לאחר כול הודעות לקרוא קפקא ואת הקיזוז האחרון מחויב באמצעות streaming.KafkaGroupIODataset , הצרכן אינו מחדש לקרוא את המסרים מההתחלה. לפיכך, בזמן האימון, ניתן להתאמן רק לתקופה בודדת כשהנתונים זורמים פנימה ללא הרף. לסוג זה של פונקציונליות יש מקרי שימוש מוגבלים במהלך שלב האימון שבהם, ברגע שנקודת נתונים נצרכה על ידי המודל, היא כבר לא קיימת. נדרש וניתן לזרוק.

עם זאת, פונקציונליות זו זורחת כשמדובר בהסקת מסקנות חזקות עם סמנטיקה של פעם אחת בדיוק.

להעריך את הביצועים על נתוני הבדיקה

res = model.evaluate(test_ds)
print("test loss, test acc:", res)
34/Unknown - 0s 2ms/step - loss: 0.4434 - accuracy: 0.8194
2022-01-07 20:34:29.402707: E tensorflow_io/core/kernels/kafka_kernels.cc:774] REBALANCE: Local: Assign partitions
2022-01-07 20:34:29.406789: E tensorflow_io/core/kernels/kafka_kernels.cc:776] Retrieved committed offsets with status code: 0
625/625 [==============================] - 11s 17ms/step - loss: 0.4437 - accuracy: 0.7915
test loss, test acc: [0.4436523914337158, 0.7915250062942505]
2022-01-07 20:34:40.051954: E tensorflow_io/core/kernels/kafka_kernels.cc:1001] Local: Timed out

מכיוון שההסקה מבוססת על סמנטיקה של 'פעם אחת בדיוק', ניתן להפעיל את ההערכה במערך המבחן פעם אחת בלבד. על מנת להפעיל שוב את ההסקה על נתוני הבדיקה, יש להשתמש בקבוצת צרכנים חדשה.

עקוב אחר בפיגור לקזז של testcg קבוצת הצרכנים

./kafka_2.13-2.7.2/bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --describe --group testcg
GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                  HOST            CLIENT-ID
testcg          susy-test       0          21626           21626           0               rdkafka-534f63d0-b91e-4976-a3ca-832b6c91210e /10.142.0.103   rdkafka
testcg          susy-test       1          18374           18374           0               rdkafka-534f63d0-b91e-4976-a3ca-832b6c91210e /10.142.0.103   rdkafka

לאחר current-offset בו תואם log-end-offset עבור כל המחיצות, זה מצביע על כך הצרכן (ים) השלימו מושכת את כל ההודעות מהנושא קפקא.

למידה מקוונת

פרדיגמת למידת המכונה המקוונת מעט שונה מהדרך המסורתית/קונבנציונלית לאימון מודלים של למידת מכונה. במקרה הקודם, המודל ממשיך ללמוד/לעדכן בהדרגה את הפרמטרים שלו ברגע שנקודות הנתונים החדשות יהיו זמינות ותהליך זה צפוי להימשך ללא הגבלת זמן. זאת בניגוד לגישות האחרונות שבו המערך נתון קבוע המודל סובב על זה n מספר פעמים. בלמידה מקוונת, ייתכן שהנתונים שנצרכו על ידי המודל לא יהיו זמינים שוב לאימון.

על ידי ניצול streaming.KafkaBatchIODataset , ניתן כיום להכשיר את הדגמים בצורה זו. בואו נמשיך להשתמש במערך הנתונים של SUSY שלנו להדגמת פונקציונליות זו.

מערך ההדרכה של tfio ללמידה מקוונת

streaming.KafkaBatchIODataset דומה streaming.KafkaGroupIODataset ב API של זה. בנוסף, מומלץ לנצל את stream_timeout הפרמטר כדי להגדיר את משך הזמן שבו נתון יחסמו הודעות חדשות לפני שתיסגר. ב בערכאה להלן, הנתונים מוגדר עם stream_timeout של 10000 מילישניות. זה מרמז שאחרי שכל ההודעות מהנושא נצרכו, מערך הנתונים ימתין 10 שניות נוספות לפני תום הזמן וההתנתקות מאשכול הקפקא. אם הודעות חדשות מוזרמות לנושא לפני הזמן הקצוב, צריכת הנתונים והדרכת המודלים יתחדשו עבור אותן נקודות נתונים שנצרכו לאחרונה. כדי לחסום לזמן בלתי מוגבל, ולהגדיר אותו -1 .

online_train_ds = tfio.experimental.streaming.KafkaBatchIODataset(
    topics=["susy-train"],
    group_id="cgonline",
    servers="127.0.0.1:9092",
    stream_timeout=10000, # in milliseconds, to block indefinitely, set it to -1.
    configuration=[
        "session.timeout.ms=7000",
        "max.poll.interval.ms=8000",
        "auto.offset.reset=earliest"
    ],
)

כל פריט כי online_train_ds מייצר הוא tf.data.Dataset כשלעצמו. לפיכך, ניתן ליישם את כל התמורות הסטנדרטיות כרגיל.

def decode_kafka_online_item(raw_message, raw_key):
  message = tf.io.decode_csv(raw_message, [[0.0] for i in range(NUM_COLUMNS)])
  key = tf.strings.to_number(raw_key)
  return (message, key)

for mini_ds in online_train_ds:
  mini_ds = mini_ds.shuffle(buffer_size=32)
  mini_ds = mini_ds.map(decode_kafka_online_item)
  mini_ds = mini_ds.batch(32)
  if len(mini_ds) > 0:
    model.fit(mini_ds, epochs=3)
2022-01-07 20:34:42.024915: E tensorflow_io/core/kernels/kafka_kernels.cc:774] REBALANCE: Local: Assign partitions
2022-01-07 20:34:42.025797: E tensorflow_io/core/kernels/kafka_kernels.cc:776] Retrieved committed offsets with status code: 0
Epoch 1/3
313/313 [==============================] - 1s 2ms/step - loss: 0.4561 - accuracy: 0.7909
Epoch 2/3
313/313 [==============================] - 1s 2ms/step - loss: 0.4538 - accuracy: 0.7909
Epoch 3/3
313/313 [==============================] - 1s 2ms/step - loss: 0.4499 - accuracy: 0.7947
Epoch 1/3
313/313 [==============================] - 1s 2ms/step - loss: 0.4347 - accuracy: 0.8018
Epoch 2/3
313/313 [==============================] - 1s 2ms/step - loss: 0.4314 - accuracy: 0.8048
Epoch 3/3
313/313 [==============================] - 1s 2ms/step - loss: 0.4286 - accuracy: 0.8063
Epoch 1/3
313/313 [==============================] - 1s 2ms/step - loss: 0.4480 - accuracy: 0.7910
Epoch 2/3
313/313 [==============================] - 1s 2ms/step - loss: 0.4425 - accuracy: 0.7945
Epoch 3/3
313/313 [==============================] - 1s 2ms/step - loss: 0.4390 - accuracy: 0.7970
Epoch 1/3
313/313 [==============================] - 1s 2ms/step - loss: 0.4434 - accuracy: 0.7965
Epoch 2/3
313/313 [==============================] - 1s 2ms/step - loss: 0.4380 - accuracy: 0.7974
Epoch 3/3
313/313 [==============================] - 1s 2ms/step - loss: 0.4354 - accuracy: 0.7992
Epoch 1/3
313/313 [==============================] - 1s 2ms/step - loss: 0.4522 - accuracy: 0.7909
Epoch 2/3
313/313 [==============================] - 1s 2ms/step - loss: 0.4475 - accuracy: 0.7910
Epoch 3/3
313/313 [==============================] - 1s 2ms/step - loss: 0.4435 - accuracy: 0.7947
Epoch 1/3
313/313 [==============================] - 1s 2ms/step - loss: 0.4464 - accuracy: 0.7906
Epoch 2/3
313/313 [==============================] - 1s 2ms/step - loss: 0.4467 - accuracy: 0.7922
Epoch 3/3
313/313 [==============================] - 1s 2ms/step - loss: 0.4424 - accuracy: 0.7933
2022-01-07 20:35:04.916208: E tensorflow_io/core/kernels/kafka_kernels.cc:1001] Local: Timed out

ניתן לשמור את המודל שהוכשר באופן מצטבר באופן תקופתי (בהתבסס על מקרי שימוש) וניתן להשתמש בו כדי להסיק על נתוני הבדיקה במצב מקוון או במצב לא מקוון.

הפניות:

  • Baldi, P., P. Sadowski, and D. Whiteson. "חיפוש אחר חלקיקים אקזוטיים בפיזיקה באנרגיה גבוהה עם למידה עמוקה." Nature Communications 5 (2 ביולי 2014)

  • סוזי בסיס הנתונים: https://archive.ics.uci.edu/ml/datasets/SUSY#