Как я могу периодически приостанавливать и возобновлять обработку потока (каждые 5 минут) с помощью Kafka Streams и Spring Kafka Streams?

Я новичок в потоках кафки. Мне нужно динамически создавать потоки kafka из файлов конфигурации, которые содержат имена исходных и целевых тем. Можно ли перезапустить и остановить потоки Kafka? Моя цель — периодически переносить сообщения из одной темы в другую с помощью потоков kafka. Я использовал работу spring cron и пытался закрыть и открыть поток, но я не могу запустить его снова, когда закрываю поток. Я получил эту ошибку --> Клиент либо уже запущен, либо уже остановлен, не может перезапуститься. Я пишу код на java

         +--------------+
               +<----- | Created (0)  |
               |       +-----+--------+
               |             |
               |             v
               |       +----+--+------+
               |       | Re-          |
               +<----- | Balancing (1)| -------->+
               |       +-----+-+------+          |
               |             | ^                 |
               |             v |                 |
               |       +--------------+          v
               |       | Running (2)  | -------->+
               |       +------+-------+          |
               |              |                  |
               |              v                  |
               |       +------+-------+     +----+-------+
               +-----> | Pending      |<--- | Error (5)  |
                       | Shutdown (3) |     +------------+
                       +------+-------+
                              |
                              v
                       +------+-------+
                       | Not          |
                       | Running (4)  |
                       +--------------+

Покажите соответствующую часть кода и точное сообщение об ошибке. Вопрос слишком расплывчатый, чтобы на него можно было ответить.

Lucas Brutschy 21.11.2022 15:15

На самом деле я хочу периодически приостанавливать и возобновлять потоки. Есть ли способ сделать это?

ozdemirahime 26.11.2022 10:43

Вы разместили диаграмму состояний потока потоков и говорите «Я получил эту ошибку», не показывая ошибку. Не могли бы вы отредактировать свой вопрос, чтобы просто отразить «Я хочу периодически приостанавливать и возобновлять потоки»?

Lucas Brutschy 28.11.2022 10:41
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
В компьютерном программировании биты играют важнейшую роль в представлении и манипулировании данными на двоичном уровне. Побитовые операции...
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Приходилось ли вам сталкиваться с требованиями, в которых вас могли попросить поднять тревогу или выдать ошибку, когда метод Java занимает больше...
Полный курс Java для разработчиков веб-сайтов и приложений
Полный курс Java для разработчиков веб-сайтов и приложений
Получите сертификат Java Web и Application Developer, используя наш курс.
0
3
148
2
Перейти к ответу Данный вопрос помечен как решенный

Ответы 2

Ответ принят как подходящий

В классе pause есть два метода resume и KafkaStreams, которые можно использовать для приостановки и возобновления обработки.

https://kafka.apache.org/33/javadoc/org/apache/kafka/streams/KafkaStreams.html#pause--https://kafka.apache.org/33/javadoc/org/apache/kafka/streams/KafkaStreams.html#resume--

Вы можете использовать метод scheduleAtFixedRate из java.util.concurrent.ScheduledExecutorService, чтобы запланировать паузу и возобновление каждые 5 минут.

Спасибо @ Лукас Бручи. Сработали методы паузы и возобновления.

ozdemirahime 04.12.2022 18:57

если вы используете Spring Cloud Stream - вы можете подключиться к BindingsLifecycleController

см. https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream.html

другой вариант - предоставить элементы управления через конечные точки актуатора и вызвать его (подробнее по той же ссылке)

Другие вопросы по теме