Ошибка потоковой передачи Faust в демонстрационном коде hello_world: объект не имеет атрибута build_request_header

Я знакомлюсь с Кафкой через фауст-стриминг. Я создал среду Python 3.10 и кластер redpanda на локальном хосте: 9092. Код, который я использовал, представляет собой демонстрационную версию hello world, представленную в документации faust-streaming. https://faust-streaming.github.io/faust/playbooks/quickstart.html

import faust

app = faust.App(
    'hello-world',
    broker='kafka://localhost:9092',
    value_serializer='raw',
)

greetings_topic = app.topic('greetings')

@app.agent(greetings_topic)
async def greet(greetings):
    async for greeting in greetings:
        print(greeting)

Когда я запускаю приложение с помощью «faust -A hello_world worker -l info», я получаю ошибку:

[^Worker]: Ошибка: AttributeError("Объект CreateTopicsRequest_v1 не имеет атрибута build_request_header.

Я проверил, может ли проблема быть в кластере, но установка aiokafka с производителем/потребителем, нацеленным на localhost:9092, кажется, работает гладко. Поскольку я не могу найти никаких упоминаний об этой проблеме, я надеюсь, что кто-нибудь здесь сможет помочь мне запустить faust-streaming.

Стоит ли изучать PHP в 2023-2024 годах?
Стоит ли изучать PHP в 2023-2024 годах?
Привет всем, сегодня я хочу высказать свои соображения по поводу вопроса, который я уже много раз получал в своем сообществе: "Стоит ли изучать PHP в...
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
В JavaScript одним из самых запутанных понятий является поведение ключевого слова "this" в стрелочной и обычной функциях.
Приемы CSS-макетирования - floats и Flexbox
Приемы CSS-макетирования - floats и Flexbox
Здравствуйте, друзья-студенты! Готовы совершенствовать свои навыки веб-дизайна? Сегодня в нашем путешествии мы рассмотрим приемы CSS-верстки - в...
Тестирование функциональных ngrx-эффектов в Angular 16 с помощью Jest
В системе управления состояниями ngrx, совместимой с Angular 16, появились функциональные эффекты. Это здорово и делает код определенно легче для...
Концепция локализации и ее применение в приложениях React ⚡️
Концепция локализации и ее применение в приложениях React ⚡️
Локализация - это процесс адаптации приложения к различным языкам и культурным требованиям. Это позволяет пользователям получить опыт, соответствующий...
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
0
0
195
2
Перейти к ответу Данный вопрос помечен как решенный

Ответы 2

Кроме того, Faust — это проект, управляемый сообществом, и, хотя он стабилен, поддержка Stack Overflow практически отсутствует, если посмотреть на тег faust.

Существуют и другие альтернативы Faust на Python с открытым исходным кодом, которые стоит проверить. Я работаю над Quix Streams и, как и Faust, это чистая альтернатива Python, не требует серверного кластера, имеет хорошее распространение, и мы сотрудничаем с Redpanda, поэтому в нашем репозитории есть много примеров. Команда сопровождающих постоянно растет, и, что наиболее важно, в группе Slack есть активные члены сообщества, поэтому ответы на запросы поддержки обычно очень быстрые. Мы не просто обсуждаем собственную библиотеку, поэтому я приглашаю присоединиться всех, кто заинтересован в работе с Python и Kafka, независимо от уровня навыков.

Я просматривал потоки Quix, однако не уверен, поможет ли это мне в моем случае использования. По сути, мне нужно просмотреть последние 5 строк и определить, является ли средняя вершиной. Могут ли потоки Quix решить эту проблему без дополнительного кэширования?

T1mminat0r 14.07.2024 20:33

Я посмотрел далее на Quix Streaming. Я создал POC с помощью Quix и Redis, что оказалось намного проще, чем я ожидал. В долгосрочной перспективе это может показаться лучшим решением... как с технической точки зрения, так и с точки зрения поддержки сообщества.

T1mminat0r 15.07.2024 23:01

Я рад, что вы проверили это и нашли решение самостоятельно! Это звучит неплохо. Я рекомендую передавать данные из Kafka в базу данных для требований отчетности. Однако вы также можете добиться того, что вам нужно, с помощью потоковой обработки, используя функцию с отслеживанием состояния. Функция потокового фрейма данных apply() имеет параметр stateful, который вы можете установить на True, чтобы предоставить вам доступ к объекту состояния, который работает как словарь. Вы можете сохранить последние 5 строк и определить, находится ли максимум в среднем индексе (документация здесь).

stereosky 17.07.2024 10:32
Ответ принят как подходящий

Неудачное время, вот и все.

Это проблема управления зависимостями (со стороны faust-streaming). Их пакет (faust-streaming в версии 0.11.0) позволяет использовать любую версию aiokafka >= 0.9.0 , однако aiokafka только что выпустила 0.11.0 две недели назад.

Просто добавьте верхний предел в свою среду, чтобы предотвратить использование последней версии aiokafka до тех пор, пока не будет обновлена ​​faust-streaming.

# Update your requirements file
echo 'aiokafka>=0.9.0,<0.11.0' >> requirements.txt

# Or, just install aiokafka with a version range
pip install 'aiokafka>=0.9.0,<0.11.0'

# Or, if you're using poetry
poetry add 'aiokafka>=0.9.0,<0.11.0'

Редактировать: я просмотрел github и нашел проблему, решение которой вы можете отслеживать: https://github.com/faust-streaming/faust/issues/633#issue-2406150814

Я столкнулся с этим при запуске обновления пакета в проекте, где я уже использую faust-streaming.

Zev Isert 14.07.2024 21:35

Это сделало свою работу! Спасибо

T1mminat0r 15.07.2024 10:57

Привет, я сопровождающий faust-streaming! Мне не повезло со сроками выхода сборок CI/CD из-за aredis тестов, и выпуск патча был отложен. Это должно быть исправлено в faust-streaming==0.11.1!

wbarnha 17.07.2024 20:53

Другие вопросы по теме