Как я могу периодически приостанавливать и возобновлять обработку потока (каждые 5 минут) с помощью Kafka Streams и Spring Kafka Streams?

Я новичок в потоках кафки. Мне нужно динамически создавать потоки kafka из файлов конфигурации, которые содержат имена исходных и целевых тем. Можно ли перезапустить и остановить потоки Kafka? Моя цель — периодически переносить сообщения из одной темы в другую с помощью потоков kafka. Я использовал работу spring cron и пытался закрыть и открыть поток, но я не могу запустить его снова, когда закрываю поток. Я получил эту ошибку --> Клиент либо уже запущен, либо уже остановлен, не может перезапуститься. Я пишу код на java

         +--------------+
               +<----- | Created (0)  |
               |       +-----+--------+
               |             |
               |             v
               |       +----+--+------+
               |       | Re-          |
               +<----- | Balancing (1)| -------->+
               |       +-----+-+------+          |
               |             | ^                 |
               |             v |                 |
               |       +--------------+          v
               |       | Running (2)  | -------->+
               |       +------+-------+          |
               |              |                  |
               |              v                  |
               |       +------+-------+     +----+-------+
               +-----> | Pending      |<--- | Error (5)  |
                       | Shutdown (3) |     +------------+
                       +------+-------+
                              |
                              v
                       +------+-------+
                       | Not          |
                       | Running (4)  |
                       +--------------+

Покажите соответствующую часть кода и точное сообщение об ошибке. Вопрос слишком расплывчатый, чтобы на него можно было ответить.

Lucas Brutschy 21.11.2022 15:15

На самом деле я хочу периодически приостанавливать и возобновлять потоки. Есть ли способ сделать это?

ozdemirahime 26.11.2022 10:43

Вы разместили диаграмму состояний потока потоков и говорите «Я получил эту ошибку», не показывая ошибку. Не могли бы вы отредактировать свой вопрос, чтобы просто отразить «Я хочу периодически приостанавливать и возобновлять потоки»?

Lucas Brutschy 28.11.2022 10:41
Редкие достижения на Github ✨
Редкие достижения на Github ✨
Редкая коллекция доступна в профиле на GitHub ✨
LeetCode запись решения 2536. Увеличение подматриц на единицу
LeetCode запись решения 2536. Увеличение подматриц на единицу
Увеличение подматриц на единицу - LeetCode
Версия Java на основе версии загрузки
Версия Java на основе версии загрузки
Если вы зайдете на официальный сайт Spring Boot , там представлен start.spring.io , который упрощает создание проектов Spring Boot, как показано ниже.
Документирование API с помощью Swagger на Springboot
Документирование API с помощью Swagger на Springboot
В предыдущей статье мы уже узнали, как создать Rest API с помощью Springboot и MySql .
Как включить TLS в gRPC-клиенте и сервере : 2
Как включить TLS в gRPC-клиенте и сервере : 2
Здравствуйте! 🙏🏻 Надеюсь, у вас все хорошо и добро пожаловать в мой блог.
0
3
148
2
Перейти к ответу Данный вопрос помечен как решенный

Ответы 2

Ответ принят как подходящий

В классе pause есть два метода resume и KafkaStreams, которые можно использовать для приостановки и возобновления обработки.

Https://kafka.apache.org/33/javadoc/org/apache/kafka/streams/KafkaStreams.html#pause-- https://kafka.apache.org/33/javadoc/org/apache/kafka/streams/KafkaStreams.html#resume--

Вы можете использовать метод scheduleAtFixedRate из java.util.concurrent.ScheduledExecutorService, чтобы запланировать паузу и возобновление каждые 5 минут.

Спасибо @ Лукас Бручи. Сработали методы паузы и возобновления.

ozdemirahime 04.12.2022 18:57

Если вы используете Spring Cloud Stream - вы можете подключиться к BindingsLifecycleController

См. https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream.html

Другой вариант - предоставить элементы управления через конечные точки актуатора и вызвать его (подробнее по той же ссылке)

Другие вопросы по теме