Не удалось преобразовать Streamz Kafka Stream в Dask Stream

Я не могу преобразовать поток Streamz в поток Dask, созданный с использованием кода Kafka source.PFB.

from streamz import Stream
from streamz.dataframe import Random
from streamz.dataframe import DataFrame
import json
from dask.distributed import Client
client = Client()
source = Stream.from_kafka(['logs'],
       {'bootstrap.servers': 'kafkaXX:9092',
        'group.id': 'streamz'}) 
source.scatter().map(json.loads).buffer(8).gather().sink(print)
source.start()

Я получаю это сообщение об ошибке

ValueError: Two different event loops active
Построение конвейеров данных в реальном времени с Apache Kafka: Руководство по Python
Построение конвейеров данных в реальном времени с Apache Kafka: Руководство по Python
Apache Kafka - популярная платформа распределенной потоковой передачи данных, которую можно использовать для построения конвейеров данных в реальном...
2
0
333
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Источник kafka, если не указано иное, запустит собственный цикл обработки событий в потоке. Вызов Client() также делает это. Чтобы передать цикл от одного к другому, вы можете сделать

Stream.from_kafka(..., loop=client.loop)

Обратите внимание, что для вызова .scatter() также требуется явный доступ к циклу событий, но, поскольку это зависит от dask, он знает, что нужно использовать цикл любого активного клиента.

Другие вопросы по теме

Похожие вопросы