Цель: Непрерывная подача обнюханных сетевых пакетов в Kafka Producer, подключение его к Spark Streaming, чтобы иметь возможность обрабатывать данные пакета, после этого с использованием предварительно обработанных данных в Tensorflow или Keras.
Я обрабатываю непрерывные данные в Spark Streaming (PySpark), который поступает из Kafka, и теперь я хочу отправить обработанные данные в Tensorflow. Как я могу использовать эти преобразованные потоки DStream в Tensorflow с Python? Спасибо.
В настоящее время обработка в Spark Streaming не применяется, но будет добавлена позже. Вот код py:
import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from pyspark.conf import SparkConf
from datetime import datetime
if __name__ == '__main__':
sc = SparkContext(appName='Kafkas')
ssc = StreamingContext(sc, 2)
brokers, topic = sys.argv[1:]
kvs = KafkaUtils.createDirectStream(ssc, [topic],
{'metadata.broker.list': brokers})
lines = kvs.map(lambda x: x[1])
lines.pprint()
ssc.start()
ssc.awaitTermination()
Также я использую это для запуска искрового стриминга:
spark-submit --packages org.apache.spark:spark-streaming-kafka-0–8_2.11:2.0.0
spark-kafka.py localhost:9092 topic
@LaSul Я добавил дополнительную информацию, о которой идет речь. Я использую tshark для прослушивания сетевых пакетов, а затем загружаю данные в Kafka в реальном времени. Kafka отправляет данные в Spark Streaming, чтобы иметь возможность передавать данные и обрабатывать их в реальном времени. Общая цель - конвейер машинного обучения, который работает в реальном времени с большими данными. Я застрял в использовании обработанных данных (DStreams) в Tensorflow в реальном времени.
У вас есть два способа решить вашу проблему:
После обработки данных вы можете сохранить их, а затем самостоятельно запустить модель (в Keras?). Просто создайте файл паркета / добавьте к нему, если он уже существует:
if os.path.isdir(DATA_TREATED_PATH):
data.write.mode('append').parquet(DATA_TREATED)
else:
data.write.parquet(DATA_TREATED_PATH)
А затем вы просто создаете свою модель с помощью keras / tensorflow и запускаете ее, может быть, каждый час? Или столько раз, сколько вы хотите, чтобы он обновлялся. Так что это каждый раз запускается с нуля.
Скажите, если вам нужно больше подробностей, но вы узнаете, как легко запускать / загружать модели keras
Я решил проблему с помощью saveAsTextFiles и слил их в другом искровом задании. Parquet не работает с KafkaDStreams в моем приложении и выдает ошибку
Я могу ответить на этот вопрос, но мне нужны подробности. Какой у вас текущий код? Где тебя заблокировали? Что тебе нужно ? Куда ты хочешь пойти ?