Сценарий:
Я пишу данные объекта данных JSON в тему kafka во время чтения. Я хочу прочитать только определенный набор сообщений на основе значения, присутствующего в сообщении. Я использую библиотеку kafka-python.
образцы сообщений:
{flow_status: "completed", value: 1, active: yes}
{flow_status:"failure",value 2, active:yes}
Здесь я хочу читать только сообщения, имеющие статус flow_Status как завершенный.
Вы можете создать две разные темы; один для завершенного, а другой для состояния отказа. А потом читать сообщения из заполненных тем, чтобы обрабатывать их.
В противном случае, если вы хотите, чтобы они были в одной теме и хотите читать только завершенные, я считаю, что вам нужно прочитать их все и игнорировать неудачные, используя простое условие if-else.
В Кафке сделать что-то подобное невозможно. Потребитель потребляет сообщения одно за другим, одно за другим, начиная с последнего зафиксированного смещения (или с самого начала, или ища по определенному смещению). Зависит от вашего варианта использования, возможно, у вас может быть другой поток в вашем сценарии: сообщение, выполняющее процесс, переходит в тему, но затем приложение, которое обрабатывает действие, записывает результат (завершено или не выполнено) в двух разных темах. : таким образом у вас все завершено отделяется от неудавшегося. Другой способ - использовать приложение Kafka Streams для фильтрации, но, принимая во внимание, что это просто сахар, на самом деле приложение потоков всегда будет читать все сообщения, но позволяет легко фильтровать сообщения.
точно, каким-то образом статус для вас - это тип сообщения, который заслуживает другой темы в этом случае использования (один для выполненного и один для отказа)
это хороший подход, иметь единую тему с двумя разделами (один для завершения, один для отказа), при отправке будет сохраняться логика в производителе для отправки данных в соответствующие разделы... на конце потребителя будут созданы отдельные Consumer_groups, одна группа для чтение из неисправного раздела и другие для чтения из завершенного раздела
сторона производителя может быть хороша, да, но для этого вам нужно реализовать собственный разделитель. На стороне потребителя все наоборот: два потребителя должны быть в одной группе потребителей, чтобы каждому из них был назначен один раздел. Если они входят в разные группы потребителей, они будут получать все сообщения из обоих разделов. В любом случае это не работает, потому что, если один потребитель выйдет из строя, другой получит другой раздел (получив завершенные и неудачные сообщения). Вы можете не использовать группы потребителей, а напрямую назначать разделы.
Потребитель Kafka не поддерживает такие функции заранее. Вам придется потреблять все события последовательно, отфильтровывать завершенные события статуса и помещать их куда-нибудь. Вместо этого вы можете рассмотреть возможность использования приложения Kafka Streams, где вы можете читать данные в виде потока и фильтровать события, где flow_status = «completed», и публиковать в какой-либо выходной теме или в другом месте назначения.
Пример :
KStream<String,JsonNode> inputStream= builder.stream(inputTopic);
KStream<String,JsonNode> completedFlowStream = inputStream.filter(value-> value.get("flow_status").equals("completed"));
P.S. У Kafka нет официального релиза Python API для KStream, но есть проект с открытым исходным кодом: https://github.com/wintoncode/winton-kafka-streams
На сегодняшний день невозможно достичь этого на стороне брокера, есть запрос функции Jira, открытый для apache kafka, чтобы реализовать эту функцию, вы можете отслеживать ее здесь, я надеюсь, что они реализуют это в ближайшем будущем: https://issues.apache.org/jira/browse/KAFKA-6020
Я считаю, что лучший способ - использовать интерфейс RecordFilterStrategy (Java/spring) и фильтровать его на стороне потребителя.
поэтому у меня может быть 3 темы: 1 для всего журнала, 1 для состояния завершения, 1 для состояния сбоя... задание будет писать в тему 1, а затем фильтровать данные на основе статуса в другую тему.