Я знакомлюсь с Кафкой через фауст-стриминг. Я создал среду Python 3.10 и кластер redpanda на локальном хосте: 9092. Код, который я использовал, представляет собой демонстрационную версию hello world, представленную в документации faust-streaming. https://faust-streaming.github.io/faust/playbooks/quickstart.html
import faust
app = faust.App(
'hello-world',
broker='kafka://localhost:9092',
value_serializer='raw',
)
greetings_topic = app.topic('greetings')
@app.agent(greetings_topic)
async def greet(greetings):
async for greeting in greetings:
print(greeting)
Когда я запускаю приложение с помощью «faust -A hello_world worker -l info», я получаю ошибку:
[^Worker]: Ошибка: AttributeError("Объект CreateTopicsRequest_v1 не имеет атрибута build_request_header.
Я проверил, может ли проблема быть в кластере, но установка aiokafka с производителем/потребителем, нацеленным на localhost:9092, кажется, работает гладко. Поскольку я не могу найти никаких упоминаний об этой проблеме, я надеюсь, что кто-нибудь здесь сможет помочь мне запустить faust-streaming.
Кроме того, Faust — это проект, управляемый сообществом, и, хотя он стабилен, поддержка Stack Overflow практически отсутствует, если посмотреть на тег faust
.
Существуют и другие альтернативы Faust на Python с открытым исходным кодом, которые стоит проверить. Я работаю над Quix Streams и, как и Faust, это чистая альтернатива Python, не требует серверного кластера, имеет хорошее распространение, и мы сотрудничаем с Redpanda, поэтому в нашем репозитории есть много примеров. Команда сопровождающих постоянно растет, и, что наиболее важно, в группе Slack есть активные члены сообщества, поэтому ответы на запросы поддержки обычно очень быстрые. Мы не просто обсуждаем собственную библиотеку, поэтому я приглашаю присоединиться всех, кто заинтересован в работе с Python и Kafka, независимо от уровня навыков.
Я посмотрел далее на Quix Streaming. Я создал POC с помощью Quix и Redis, что оказалось намного проще, чем я ожидал. В долгосрочной перспективе это может показаться лучшим решением... как с технической точки зрения, так и с точки зрения поддержки сообщества.
Я рад, что вы проверили это и нашли решение самостоятельно! Это звучит неплохо. Я рекомендую передавать данные из Kafka в базу данных для требований отчетности. Однако вы также можете добиться того, что вам нужно, с помощью потоковой обработки, используя функцию с отслеживанием состояния. Функция потокового фрейма данных apply()
имеет параметр stateful
, который вы можете установить на True
, чтобы предоставить вам доступ к объекту состояния, который работает как словарь. Вы можете сохранить последние 5 строк и определить, находится ли максимум в среднем индексе (документация здесь).
Неудачное время, вот и все.
Это проблема управления зависимостями (со стороны faust-streaming). Их пакет (faust-streaming в версии 0.11.0) позволяет использовать любую версию aiokafka >= 0.9.0 , однако aiokafka только что выпустила 0.11.0 две недели назад.
Просто добавьте верхний предел в свою среду, чтобы предотвратить использование последней версии aiokafka до тех пор, пока не будет обновлена faust-streaming.
# Update your requirements file
echo 'aiokafka>=0.9.0,<0.11.0' >> requirements.txt
# Or, just install aiokafka with a version range
pip install 'aiokafka>=0.9.0,<0.11.0'
# Or, if you're using poetry
poetry add 'aiokafka>=0.9.0,<0.11.0'
Редактировать: я просмотрел github и нашел проблему, решение которой вы можете отслеживать: https://github.com/faust-streaming/faust/issues/633#issue-2406150814
Я столкнулся с этим при запуске обновления пакета в проекте, где я уже использую faust-streaming.
Это сделало свою работу! Спасибо
Привет, я сопровождающий faust-streaming
! Мне не повезло со сроками выхода сборок CI/CD из-за aredis
тестов, и выпуск патча был отложен. Это должно быть исправлено в faust-streaming==0.11.1
!
Я просматривал потоки Quix, однако не уверен, поможет ли это мне в моем случае использования. По сути, мне нужно просмотреть последние 5 строк и определить, является ли средняя вершиной. Могут ли потоки Quix решить эту проблему без дополнительного кэширования?