Чтение только определенных сообщений из темы кафки

Сценарий:

Я пишу данные объекта данных JSON в тему kafka во время чтения. Я хочу прочитать только определенный набор сообщений на основе значения, присутствующего в сообщении. Я использую библиотеку kafka-python.

образцы сообщений:

{flow_status: "completed", value: 1, active: yes}
{flow_status:"failure",value 2, active:yes}

Здесь я хочу читать только сообщения, имеющие статус flow_Status как завершенный.

Построение конвейеров данных в реальном времени с Apache Kafka: Руководство по Python
Построение конвейеров данных в реальном времени с Apache Kafka: Руководство по Python
Apache Kafka - популярная платформа распределенной потоковой передачи данных, которую можно использовать для построения конвейеров данных в реальном...
2
0
8 598
4
Перейти к ответу Данный вопрос помечен как решенный

Ответы 4

Вы можете создать две разные темы; один для завершенного, а другой для состояния отказа. А потом читать сообщения из заполненных тем, чтобы обрабатывать их.

В противном случае, если вы хотите, чтобы они были в одной теме и хотите читать только завершенные, я считаю, что вам нужно прочитать их все и игнорировать неудачные, используя простое условие if-else.

Ответ принят как подходящий

В Кафке сделать что-то подобное невозможно. Потребитель потребляет сообщения одно за другим, одно за другим, начиная с последнего зафиксированного смещения (или с самого начала, или ища по определенному смещению). Зависит от вашего варианта использования, возможно, у вас может быть другой поток в вашем сценарии: сообщение, выполняющее процесс, переходит в тему, но затем приложение, которое обрабатывает действие, записывает результат (завершено или не выполнено) в двух разных темах. : таким образом у вас все завершено отделяется от неудавшегося. Другой способ - использовать приложение Kafka Streams для фильтрации, но, принимая во внимание, что это просто сахар, на самом деле приложение потоков всегда будет читать все сообщения, но позволяет легко фильтровать сообщения.

поэтому у меня может быть 3 темы: 1 для всего журнала, 1 для состояния завершения, 1 для состояния сбоя... задание будет писать в тему 1, а затем фильтровать данные на основе статуса в другую тему.

Prabhanj 18.02.2019 10:00

точно, каким-то образом статус для вас - это тип сообщения, который заслуживает другой темы в этом случае использования (один для выполненного и один для отказа)

ppatierno 18.02.2019 10:03

это хороший подход, иметь единую тему с двумя разделами (один для завершения, один для отказа), при отправке будет сохраняться логика в производителе для отправки данных в соответствующие разделы... на конце потребителя будут созданы отдельные Consumer_groups, одна группа для чтение из неисправного раздела и другие для чтения из завершенного раздела

Prabhanj 19.02.2019 10:44

сторона производителя может быть хороша, да, но для этого вам нужно реализовать собственный разделитель. На стороне потребителя все наоборот: два потребителя должны быть в одной группе потребителей, чтобы каждому из них был назначен один раздел. Если они входят в разные группы потребителей, они будут получать все сообщения из обоих разделов. В любом случае это не работает, потому что, если один потребитель выйдет из строя, другой получит другой раздел (получив завершенные и неудачные сообщения). Вы можете не использовать группы потребителей, а напрямую назначать разделы.

ppatierno 19.02.2019 10:52

Потребитель Kafka не поддерживает такие функции заранее. Вам придется потреблять все события последовательно, отфильтровывать завершенные события статуса и помещать их куда-нибудь. Вместо этого вы можете рассмотреть возможность использования приложения Kafka Streams, где вы можете читать данные в виде потока и фильтровать события, где flow_status = «completed», и публиковать в какой-либо выходной теме или в другом месте назначения.

Пример :

KStream<String,JsonNode> inputStream= builder.stream(inputTopic);
KStream<String,JsonNode> completedFlowStream = inputStream.filter(value-> value.get("flow_status").equals("completed"));

P.S. У Kafka нет официального релиза Python API для KStream, но есть проект с открытым исходным кодом: https://github.com/wintoncode/winton-kafka-streams

На сегодняшний день невозможно достичь этого на стороне брокера, есть запрос функции Jira, открытый для apache kafka, чтобы реализовать эту функцию, вы можете отслеживать ее здесь, я надеюсь, что они реализуют это в ближайшем будущем: https://issues.apache.org/jira/browse/KAFKA-6020

Я считаю, что лучший способ - использовать интерфейс RecordFilterStrategy (Java/spring) и фильтровать его на стороне потребителя.

Другие вопросы по теме