Я не могу преобразовать поток Streamz в поток Dask, созданный с использованием кода Kafka source.PFB.
from streamz import Stream
from streamz.dataframe import Random
from streamz.dataframe import DataFrame
import json
from dask.distributed import Client
client = Client()
source = Stream.from_kafka(['logs'],
{'bootstrap.servers': 'kafkaXX:9092',
'group.id': 'streamz'})
source.scatter().map(json.loads).buffer(8).gather().sink(print)
source.start()
Я получаю это сообщение об ошибке
ValueError: Two different event loops active

Источник kafka, если не указано иное, запустит собственный цикл обработки событий в потоке. Вызов Client() также делает это. Чтобы передать цикл от одного к другому, вы можете сделать
Stream.from_kafka(..., loop=client.loop)
Обратите внимание, что для вызова .scatter() также требуется явный доступ к циклу событий, но, поскольку это зависит от dask, он знает, что нужно использовать цикл любого активного клиента.