Как проверить, находится ли сообщение уже в очереди

Есть ли способ проверить, есть ли в очереди кафки определенное сообщение? Я не хочу его потреблять, а просто проверяю, не стоит ли он уже в очереди. Например. мое сообщение представляет собой простой объект JSON:

{
    id: 123,
    name: "message"
}

Итак, я хочу проверить, находится ли сообщение с идентификатором: 123 уже в очереди, поэтому мое приложение не отправляет его во второй раз. У меня есть служба Node.js и я использую библиотеку kafkajs npm.

Построение конвейеров данных в реальном времени с Apache Kafka: Руководство по Python
Построение конвейеров данных в реальном времени с Apache Kafka: Руководство по Python
Apache Kafka - популярная платформа распределенной потоковой передачи данных, которую можно использовать для построения конвейеров данных в реальном...
0
0
69
3
Перейти к ответу Данный вопрос помечен как решенный

Ответы 3

Kafka поддерживает идемпотентность через семантику Idempotent Producer/Exactly Once. Это означает, что сообщения, опубликованные в темах Kafka, не должны дублироваться со стороны производителя. Для потребителей.

флаг: EnableIdempotence = true

не уверен, что это то, что мне нужно. Я заставляю отправлять одно и то же сообщение через код JavaScript. Мне просто нужно знать, существует ли уже такое же сообщение/тело в очереди

John Glabb 05.01.2023 22:27

Это не предотвращает дублирование во всей теме. Только предотвращает повторную попытку того же события в том же вызове производителя.

OneCricketeer 05.01.2023 23:41
Ответ принят как подходящий

Я не хочу его потреблять, а просто проверяю, не стоит ли он уже в очереди

Это невозможно. Вам нужно использовать тему, чтобы проверить существование любого события.

Вам нужно будет потреблять и записывать куда-то еще (Redis, MongoDB и т. д.), а затем запрашивать индекс для предотвращения дублирования с темами Kafka.

Или наоборот, внедрите эту логику в своих нижестоящих потребителей, а не беспокойтесь о том, что относится к теме, а что нет, учитывая тот факт, что записи в конечном итоге удаляются из темы из-за политик хранения.

Может быть ksqlDB может помочь. Он преобразует поток в запрашиваемое состояние.

Но он по-прежнему должен потреблять тему и не предотвращает дублирование.

OneCricketeer 11.01.2023 03:25

Другие вопросы по теме

Похожие вопросы