Кафка поиск текста в теме

Я хочу искать определенные сообщения в теме кафки, Единственное решение, которое я нашел, это использование grep.

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning |grep 'world\|hello'
  1. Есть ли эффективный способ сделать это?
  2. Есть ли способ, которым я могу ограничить потребителя определенным смещением, то есть чтением с самого начала до тех пор, пока не будет достигнуто определенное смещение?

Я использую этот инструмент github.com/fgeller/кт. Он позволяет выполнять различные манипуляции со смещением при чтении из темы. Grep - это то, как я это делаю.

pwaterz 12.02.2019 15:59

Если вы выполняете операцию фильтрации, используйте Streams API. Если вы действительно хотите использовать все сообщения, используйте Consumers API. Вы не должны выполнять операции на основе смещений. Производитель, отправивший 1000-е сообщение, не обязательно будет иметь 1000-е смещение в разделе. stackoverflow.com/questions/54544074/…stackoverflow.com/questions/54636524/…

JR ibkr 12.02.2019 21:29
Построение конвейеров данных в реальном времени с Apache Kafka: Руководство по Python
Построение конвейеров данных в реальном времени с Apache Kafka: Руководство по Python
Apache Kafka - популярная платформа распределенной потоковой передачи данных, которую можно использовать для построения конвейеров данных в реальном...
1
2
8 250
2

Ответы 2

Is there an efficient way do do it ?

да. Ваше решение - быстрое и грязное решение. Если вы хотите фильтровать данные, используйте Streams API и пишите отфильтрованную информацию по другой теме. https://kafka.apache.org/documentation/streams/

чем это отличается от моего быстрого и грязного решения?

igx 14.02.2019 06:37

Я не думаю, что вопрос касается написания кода Java. Найти событие только из CLI

OneCricketeer 14.02.2019 11:09

кафка - это не просто его высокоуровневый апис. совершенно не обязательно для этого создавать новую тему.

aran 14.02.2019 11:23

Ну, я думал, что ОП регулярно вытаскивает вещи из темы. Может ли кто-нибудь помочь мне с stackoverflow.com/questions/54674867/…?

JR ibkr 14.02.2019 15:21

Is there an efficient way do do it ?

Если у вас нет ключей сообщений, то нет.

Если вы это сделаете, вы можете вычислить хэш Murmur2, найти номер раздела и просканировать только его, все еще выполняя поиск с помощью --partition.

Is there a way that I can limit the consumer with a specific offset, meaning reading from the beginning until in reaches specific offset ?

Вы можете дать --max-messages

Если вы не хотите начинать всегда с самого начала, добавьте --group и продолжайте выполнять ту же команду с параметром max messages. Это позволит использовать одну и ту же группу потребителей и зафиксировать смещения, когда это будет сделано.

Вы также можете вручную зафиксировать смещения, чтобы начать с помощью команды kafka-consumer-groups.

Другие вопросы по теме