Есть ли способ установить задержку для сообщения, отправляемого производителем kafka?

Или, может быть, даже способ отложить получение сообщения потребителем. Мне нужно выполнять вызов функции в nodejs через каждые 90 секунд, поэтому я хочу добавить задержку в 90 секунд для каждого сообщения kafka

Итак, установите интервал ожидания или таймер в цикле опроса потребителей (или производителя, но я бы рекомендовал потребителя) ... Вы не пробовали это?

OneCricketeer 08.05.2018 03:06

Я хочу, чтобы задержка произошла в конце кафки, а не в конце моего основного проекта. Я думаю, что интервал таймера и сон, которые вы предложили, будут внутри основного проекта

Samarth Juneja 08.05.2018 06:59

У вас есть два конца трубы Кафки. Я обычно предлагаю получать события в реальном времени как можно быстрее, тогда потребители могут быть настолько медленными, насколько они хотят, в течение периода хранения темы.

OneCricketeer 08.05.2018 07:03

Меня беспокоит не сохранение темы, а задержка получения в самом сообщении. Вот что мне нужно. Мне нужен вызов функции после тайм-аута 90 секунд. Моя проблема в том, что я не могу найти эту функциональность в кафке

Samarth Juneja 08.05.2018 07:25

Сожалею, что такого не существует в Кафке. Вам нужно реализовать это самостоятельно как внешний клиент.

OneCricketeer 08.05.2018 07:28

Например, в Spark Streaming вы можете установить размер окна пакета 90 секунд. Я уверен, что в аналогичных фреймворках вы можете определять временные окна, к которым может применяться агрегатная функция, если это то, что вы пытаетесь сделать.

OneCricketeer 08.05.2018 07:30
Построение конвейеров данных в реальном времени с Apache Kafka: Руководство по Python
Построение конвейеров данных в реальном времени с Apache Kafka: Руководство по Python
Apache Kafka - популярная платформа распределенной потоковой передачи данных, которую можно использовать для построения конвейеров данных в реальном...
9
6
2 361
1

Ответы 1

Вы можете использовать конфигурацию KafkaProducer linger.ms, которая описывается как:

"[...] adds a small amount of artificial delay — that is, rather than immediately sending out a record the producer will wait for up to the given delay to allow other records to be sent so that the sends can be batched together. [...]"

Эта конфигурация по умолчанию - 0 и может быть установлена ​​на любое значение long.

В зависимости от количества данных, которые ваш производитель отправляет в тему, вы также можете захотеть увеличить конфигурацию размер партии. В противном случае, если этот предел размера достигнут до истечения задержки linger.ms, KafkaProducer отправит сообщения до истечения 90 секунд.

Имейте в виду, что если вы увеличиваете linger.ms, вы также можете захотеть увеличить delivery.timeout.ms. Согласно его документация:

"The value of this config should be greater than or equal to the sum of request.timeout.ms and linger.ms."

Другие вопросы по теме