У меня есть обработчик kafka в весенней загрузке:
@KafkaListener(topics = "topic-one", groupId = "response")
public void listen(String response) {
myService.processResponse(response);
}
Например, производитель отправляет одно сообщение каждую секунду. Но myService.processResponse работа 10 секунд. Мне нужно обрабатывать каждое сообщение и начинать myService.processResponse в новой теме. Я могу создать своего исполнителя и делегировать ему каждый ответ. Но думаю для них в кафке есть другие конфиги. Я нашел 2:
1) добавить concurrency = "5" к аннотации @KafkaListener - вроде работает. Но я не уверен, насколько правильно, потому что у меня есть второй способ:
2) Я могу создать ConcurrentKafkaListenerContainerFactory и установить на него ConsumerFactory и concurrency
Я не понимаю разницы между этими методами? достаточно просто добавить concurrency = "5" в аннотацию @KafkaListener или нужно создать ConcurrentKafkaListenerContainerFactory?
Или я вообще ничего не понимаю и есть ли другой способ?




Использование исполнителя усложняет управление зафиксированными смещениями; это не рекомендуется.
С @KafkaListener фреймворк создает ConcurrentKafkaListenerContainerFactory для вас.
concurrency на аннотации — это просто удобство; он отменяет заводскую настройку.
Это позволяет использовать одну и ту же фабрику с несколькими слушателями, каждый из которых имеет различный параллелизм.
Вы можете установить параллелизм контейнера (по умолчанию) с помощью свойства загрузки; это значение переопределяется значением аннотации; см. javadocs...
/**
* Override the container factory's {@code concurrency} setting for this listener. May
* be a property placeholder or SpEL expression that evaluates to a {@link Number}, in
* which case {@link Number#intValue()} is used to obtain the value.
* <p>SpEL {@code #{...}} and property place holders {@code ${...}} are supported.
* @return the concurrency.
* @since 2.2
*/
String concurrency() default "";
Для concurrency=5 нужно минимум 5 партиций по теме; разделы будут распределены по потокам — раздел может быть обработан только одним потоком. Вам не нужно создавать фабрику, потому что загрузчик сделает это за вас с автоматической настройкой.
Использование исполнителя усложняет управление зафиксированными смещениями; это не рекомендуется.
если проще, подскажите, что мне сделать, чтобы получить эту логику - 1) я обрабатываю сообщение в методе @KafkaListener. 2) начать обработку этого сообщения в новом потоке. 3) обрабатывать новое сообщение.... и т.д.
Таким образом вы можете потерять сообщения; мне потребовалось бы слишком много времени, чтобы описать подводные камни и код, который вам нужно написать, чтобы убедиться, что смещения управляются правильно, работают с дубликатами и т. д. и т. д. Я предлагаю вам использовать Kafka Consumer напрямую, а не использовать контейнер прослушивателя.
кажется, я начал немного понимать. вы предлагаете не использовать аннотацию @KafkaListener. Вместо этого мне нужно создать что-то вроде «пула потребителей»? Если это правда, можете ли вы поделиться каким-нибудь примером или статьей об этом?
Извиняюсь; У меня нет времени учить вас тому, как работает Кафка; Я могу помочь только с использованием Spring Kafka. Даже если вы сами напишете код для опроса Consumer, у вас все равно будут те же проблемы с управлением смещениями, если вы передадите работу асинхронным потокам.
Извините, что беспокою вас. Я просто хочу понять, как это сделать в spring kafka. У меня был шедулер. Он опросил сторонний сервис и, получив ответ, передал его новому потоку для обработки. Теперь я перемещаю шедулер в микросервис, и он отправляет ответ кафке. И мой основной микросервис стал потребителем. Добавляю @KafkaListener и обрабатываю ответы. Но раньше я делал это в несколько потоков, а теперь в один. Я просто хочу понять, как я могу быстрее обрабатывать очередь. Поэтому я подумал, что это возможно в spring kafka. Сделай так, чтобы у меня было 5 одинаковых кунсьюмеров в одном приложении
Снова; вы можете увеличить параллелизм, ТОЛЬКО если у вас есть хотя бы такое количество разделов по теме; Kafka не допускает нескольких потребителей в разделе для одной и той же группы. Если вы перейдете к другому потоку, это значительно усложнит управление смещениями, зафиксированными для раздела (разделов). Вам нужно больше узнать о Кафке.
concurrency не имеет ничего общего с одновременной обработкой сообщений, полученных одним и тем же потребителем. Это для групп потребителей, когда у вас есть несколько потребителей, каждый из которых обрабатывает свои собственные разделы.
Передача обработки в отдельный поток очень сложна, и команда Spring-Kafka решила не делать этого «по задумке», я полагаю. Вам даже не нужно копаться в Spring-Kafka, чтобы понять почему. Проверьте документ КафкаПотребительскийОбнаружение отказов потребителей:
Some care must be taken to ensure that committed offsets do not get ahead of the actual position. Typically, you must disable automatic commits and manually commit processed offsets for records only after the thread has finished handling them (depending on the delivery semantics you need). Note also that you will need to pause the partition so that no new records are received from poll until after thread has finished handling those previously returned.
еще не до конца понял. То есть, если я аннотировал метод
@KafkaListener- каждый дескриптор в этом методе будет в новом потоке? потому что Spring создает по умолчаниюConcurrentKafkaListenerContainerFactoryи мне не нужно создаватьConcurrentKafkaListenerContainerFactory?