Как интегрировать Spark Streaming с Tensorflow?

Цель: Непрерывная подача обнюханных сетевых пакетов в Kafka Producer, подключение его к Spark Streaming, чтобы иметь возможность обрабатывать данные пакета, после этого с использованием предварительно обработанных данных в Tensorflow или Keras.

Я обрабатываю непрерывные данные в Spark Streaming (PySpark), который поступает из Kafka, и теперь я хочу отправить обработанные данные в Tensorflow. Как я могу использовать эти преобразованные потоки DStream в Tensorflow с Python? Спасибо.

В настоящее время обработка в Spark Streaming не применяется, но будет добавлена ​​позже. Вот код py:

import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from pyspark.conf import SparkConf
from datetime import datetime

if __name__ == '__main__':
    sc = SparkContext(appName='Kafkas')
    ssc = StreamingContext(sc, 2)
    brokers, topic = sys.argv[1:]
    kvs = KafkaUtils.createDirectStream(ssc, [topic], 
                                       {'metadata.broker.list': brokers})
    lines = kvs.map(lambda x: x[1])
    lines.pprint()
    ssc.start()
    ssc.awaitTermination()

Также я использую это для запуска искрового стриминга:

spark-submit --packages org.apache.spark:spark-streaming-kafka-0–8_2.11:2.0.0 
spark-kafka.py localhost:9092 topic

Я могу ответить на этот вопрос, но мне нужны подробности. Какой у вас текущий код? Где тебя заблокировали? Что тебе нужно ? Куда ты хочешь пойти ?

LaSul 19.12.2018 10:58

@LaSul Я добавил дополнительную информацию, о которой идет речь. Я использую tshark для прослушивания сетевых пакетов, а затем загружаю данные в Kafka в реальном времени. Kafka отправляет данные в Spark Streaming, чтобы иметь возможность передавать данные и обрабатывать их в реальном времени. Общая цель - конвейер машинного обучения, который работает в реальном времени с большими данными. Я застрял в использовании обработанных данных (DStreams) в Tensorflow в реальном времени.

Burak 19.12.2018 12:05
Почему в Python есть оператор "pass"?
Почему в Python есть оператор "pass"?
Оператор pass в Python - это простая концепция, которую могут быстро освоить даже новички без опыта программирования.
Некоторые методы, о которых вы не знали, что они существуют в Python
Некоторые методы, о которых вы не знали, что они существуют в Python
Python - самый известный и самый простой в изучении язык в наши дни. Имея широкий спектр применения в области машинного обучения, Data Science,...
Основы Python Часть I
Основы Python Часть I
Вы когда-нибудь задумывались, почему в программах на Python вы видите приведенный ниже код?
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
Алиса и Боб имеют неориентированный граф из n узлов и трех типов ребер:
Оптимизация кода с помощью тернарного оператора Python
Оптимизация кода с помощью тернарного оператора Python
И последнее, что мы хотели бы показать вам, прежде чем двигаться дальше, это
Советы по эффективной веб-разработке с помощью Python
Советы по эффективной веб-разработке с помощью Python
Как веб-разработчик, Python может стать мощным инструментом для создания эффективных и масштабируемых веб-приложений.
2
2
891
1

Ответы 1

У вас есть два способа решить вашу проблему:

  1. После обработки данных вы можете сохранить их, а затем самостоятельно запустить модель (в Keras?). Просто создайте файл паркета / добавьте к нему, если он уже существует:

    if os.path.isdir(DATA_TREATED_PATH):
        data.write.mode('append').parquet(DATA_TREATED)
    else:
        data.write.parquet(DATA_TREATED_PATH)
    

А затем вы просто создаете свою модель с помощью keras / tensorflow и запускаете ее, может быть, каждый час? Или столько раз, сколько вы хотите, чтобы он обновлялся. Так что это каждый раз запускается с нуля.

  1. Вы обрабатываете свои данные, сохраняете их, как и раньше, но после этого вы загружаете модель, обучаете новые данные / новую партию, а затем сохраняете модель. Это называется онлайн-обучением, потому что вы не запускаете свою модель с нуля.

Скажите, если вам нужно больше подробностей, но вы узнаете, как легко запускать / загружать модели keras

LaSul 19.12.2018 13:58

Я решил проблему с помощью saveAsTextFiles и слил их в другом искровом задании. Parquet не работает с KafkaDStreams в моем приложении и выдает ошибку

Burak 25.12.2018 14:42

Другие вопросы по теме