Я новичок в потоках кафки. Мне нужно динамически создавать потоки kafka из файлов конфигурации, которые содержат имена исходных и целевых тем. Можно ли перезапустить и остановить потоки Kafka? Моя цель — периодически переносить сообщения из одной темы в другую с помощью потоков kafka. Я использовал работу spring cron и пытался закрыть и открыть поток, но я не могу запустить его снова, когда закрываю поток. Я получил эту ошибку --> Клиент либо уже запущен, либо уже остановлен, не может перезапуститься. Я пишу код на java
+--------------+
+<----- | Created (0) |
| +-----+--------+
| |
| v
| +----+--+------+
| | Re- |
+<----- | Balancing (1)| -------->+
| +-----+-+------+ |
| | ^ |
| v | |
| +--------------+ v
| | Running (2) | -------->+
| +------+-------+ |
| | |
| v |
| +------+-------+ +----+-------+
+-----> | Pending |<--- | Error (5) |
| Shutdown (3) | +------------+
+------+-------+
|
v
+------+-------+
| Not |
| Running (4) |
+--------------+
На самом деле я хочу периодически приостанавливать и возобновлять потоки. Есть ли способ сделать это?
Вы разместили диаграмму состояний потока потоков и говорите «Я получил эту ошибку», не показывая ошибку. Не могли бы вы отредактировать свой вопрос, чтобы просто отразить «Я хочу периодически приостанавливать и возобновлять потоки»?
В классе pause
есть два метода resume
и KafkaStreams
, которые можно использовать для приостановки и возобновления обработки.
https://kafka.apache.org/33/javadoc/org/apache/kafka/streams/KafkaStreams.html#pause--https://kafka.apache.org/33/javadoc/org/apache/kafka/streams/KafkaStreams.html#resume--
Вы можете использовать метод scheduleAtFixedRate
из java.util.concurrent.ScheduledExecutorService
, чтобы запланировать паузу и возобновление каждые 5 минут.
Спасибо @ Лукас Бручи. Сработали методы паузы и возобновления.
если вы используете Spring Cloud Stream - вы можете подключиться к BindingsLifecycleController
см. https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream.html
другой вариант - предоставить элементы управления через конечные точки актуатора и вызвать его (подробнее по той же ссылке)
Покажите соответствующую часть кода и точное сообщение об ошибке. Вопрос слишком расплывчатый, чтобы на него можно было ответить.