Как я могу обрабатывать метод @KafkaListener в разных потоках?

У меня есть обработчик kafka в весенней загрузке:

    @KafkaListener(topics = "topic-one", groupId = "response")
    public void listen(String response) {
        myService.processResponse(response);
    }

Например, производитель отправляет одно сообщение каждую секунду. Но myService.processResponse работа 10 секунд. Мне нужно обрабатывать каждое сообщение и начинать myService.processResponse в новой теме. Я могу создать своего исполнителя и делегировать ему каждый ответ. Но думаю для них в кафке есть другие конфиги. Я нашел 2:

1) добавить concurrency = "5" к аннотации @KafkaListener - вроде работает. Но я не уверен, насколько правильно, потому что у меня есть второй способ:

2) Я могу создать ConcurrentKafkaListenerContainerFactory и установить на него ConsumerFactory и concurrency

Я не понимаю разницы между этими методами? достаточно просто добавить concurrency = "5" в аннотацию @KafkaListener или нужно создать ConcurrentKafkaListenerContainerFactory?

Или я вообще ничего не понимаю и есть ли другой способ?

Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
В компьютерном программировании биты играют важнейшую роль в представлении и манипулировании данными на двоичном уровне. Побитовые операции...
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Приходилось ли вам сталкиваться с требованиями, в которых вас могли попросить поднять тревогу или выдать ошибку, когда метод Java занимает больше...
Полный курс Java для разработчиков веб-сайтов и приложений
Полный курс Java для разработчиков веб-сайтов и приложений
Получите сертификат Java Web и Application Developer, используя наш курс.
0
0
5 707
2
Перейти к ответу Данный вопрос помечен как решенный

Ответы 2

Ответ принят как подходящий

Использование исполнителя усложняет управление зафиксированными смещениями; это не рекомендуется.

С @KafkaListener фреймворк создает ConcurrentKafkaListenerContainerFactory для вас.

concurrency на аннотации — это просто удобство; он отменяет заводскую настройку.

Это позволяет использовать одну и ту же фабрику с несколькими слушателями, каждый из которых имеет различный параллелизм.

Вы можете установить параллелизм контейнера (по умолчанию) с помощью свойства загрузки; это значение переопределяется значением аннотации; см. javadocs...

/**
 * Override the container factory's {@code concurrency} setting for this listener. May
 * be a property placeholder or SpEL expression that evaluates to a {@link Number}, in
 * which case {@link Number#intValue()} is used to obtain the value.
 * <p>SpEL {@code #{...}} and property place holders {@code ${...}} are supported.
 * @return the concurrency.
 * @since 2.2
 */
String concurrency() default "";

еще не до конца понял. То есть, если я аннотировал метод @KafkaListener - каждый дескриптор в этом методе будет в новом потоке? потому что Spring создает по умолчанию ConcurrentKafkaListenerContainerFactory и мне не нужно создавать ConcurrentKafkaListenerContainerFactory ?

ip696 19.03.2019 14:33

Для concurrency=5 нужно минимум 5 партиций по теме; разделы будут распределены по потокам — раздел может быть обработан только одним потоком. Вам не нужно создавать фабрику, потому что загрузчик сделает это за вас с автоматической настройкой.

Gary Russell 19.03.2019 14:40

Использование исполнителя усложняет управление зафиксированными смещениями; это не рекомендуется.

Gary Russell 19.03.2019 14:44

если проще, подскажите, что мне сделать, чтобы получить эту логику - 1) я обрабатываю сообщение в методе @KafkaListener. 2) начать обработку этого сообщения в новом потоке. 3) обрабатывать новое сообщение.... и т.д.

ip696 19.03.2019 14:46

Таким образом вы можете потерять сообщения; мне потребовалось бы слишком много времени, чтобы описать подводные камни и код, который вам нужно написать, чтобы убедиться, что смещения управляются правильно, работают с дубликатами и т. д. и т. д. Я предлагаю вам использовать Kafka Consumer напрямую, а не использовать контейнер прослушивателя.

Gary Russell 19.03.2019 14:50

кажется, я начал немного понимать. вы предлагаете не использовать аннотацию @KafkaListener. Вместо этого мне нужно создать что-то вроде «пула потребителей»? Если это правда, можете ли вы поделиться каким-нибудь примером или статьей об этом?

ip696 19.03.2019 14:53

Извиняюсь; У меня нет времени учить вас тому, как работает Кафка; Я могу помочь только с использованием Spring Kafka. Даже если вы сами напишете код для опроса Consumer, у вас все равно будут те же проблемы с управлением смещениями, если вы передадите работу асинхронным потокам.

Gary Russell 19.03.2019 15:28

Извините, что беспокою вас. Я просто хочу понять, как это сделать в spring kafka. У меня был шедулер. Он опросил сторонний сервис и, получив ответ, передал его новому потоку для обработки. Теперь я перемещаю шедулер в микросервис, и он отправляет ответ кафке. И мой основной микросервис стал потребителем. Добавляю @KafkaListener и обрабатываю ответы. Но раньше я делал это в несколько потоков, а теперь в один. Я просто хочу понять, как я могу быстрее обрабатывать очередь. Поэтому я подумал, что это возможно в spring kafka. Сделай так, чтобы у меня было 5 одинаковых кунсьюмеров в одном приложении

ip696 19.03.2019 15:40

Снова; вы можете увеличить параллелизм, ТОЛЬКО если у вас есть хотя бы такое количество разделов по теме; Kafka не допускает нескольких потребителей в разделе для одной и той же группы. Если вы перейдете к другому потоку, это значительно усложнит управление смещениями, зафиксированными для раздела (разделов). Вам нужно больше узнать о Кафке.

Gary Russell 19.03.2019 15:48

concurrency не имеет ничего общего с одновременной обработкой сообщений, полученных одним и тем же потребителем. Это для групп потребителей, когда у вас есть несколько потребителей, каждый из которых обрабатывает свои собственные разделы.

Передача обработки в отдельный поток очень сложна, и команда Spring-Kafka решила не делать этого «по задумке», я полагаю. Вам даже не нужно копаться в Spring-Kafka, чтобы понять почему. Проверьте документ КафкаПотребительскийОбнаружение отказов потребителей:

Some care must be taken to ensure that committed offsets do not get ahead of the actual position. Typically, you must disable automatic commits and manually commit processed offsets for records only after the thread has finished handling them (depending on the delivery semantics you need). Note also that you will need to pause the partition so that no new records are received from poll until after thread has finished handling those previously returned.

Другие вопросы по теме