KafkaStreams не выключается

Если я создаю и запускаю экземпляр KafkaStream, а затем при вызове ловушки выключения .close (), ничего не происходит - журнал показывает, что один существующий StreamThread перешел в состояние PENDING_SHUTDOWN, но затем просто сидит так навсегда. Я пробовал все безуспешно - прочитал исходный код потоков Kafka, чтобы увидеть, что он делает во время выключения, но, на мой взгляд, код подразумевает, что StreamThread, если он находится в рабочем состоянии, никогда не остановится (что не может быть правильным - Я не видел подобных ошибок в Kafka JIRA).

Вот соответствующий код моего простого приложения KafkaStream (scala):

val props: Properties = {
  val p = new Properties()
  p.put(StreamsConfig.APPLICATION_ID_CONFIG, "test-application")
  p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
  p.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, "1")
 p
}


implicit val produced = Produced.`with`(new StringSerde(), new StringSerde())

val builder: StreamsBuilder = new StreamsBuilder()
val in: KStream[String, String] = builder.stream[String, String]("input-topic")

in.map((k,v) =>{ 
         println("Consumed and transforming value")
         (k,s"$v_transformed") 
     }).to("output-topic")

val streams: KafkaStreams = new KafkaStreams(builder.build(), props)

streams.start()

sys.addShutdownHook(streams.stop())

Как видите, он просто читает из одной темы, вносит небольшое изменение в значение из потребляемой записи и записывает его в другую тему.

Поток запускается, и приложение вызывает stop (), когда ему отправляется SIGINT.

Когда я ^ C, чтобы завершить процесс, я вижу, что в журнале Kafka говорится, что StreamThread-1 переводится в PENDING_SHUTDOWN, и это все, что когда-либо было. В конечном итоге (в течение нескольких секунд) он должен достичь состояния NOT_RUNNING, но этого не происходит, и он продолжает выводить мой оператор println для каждой записи, которую он продолжает читать из темы ввода.

Что я здесь делаю не так?

ОБНОВЛЕНИЕ: согласно предложению комментатора, я попытался вызвать close () с тайм-аутом 60 секунд и в конце концов получил это, но все еще не выключился: -

[shutdownHook1] INFO o.apache.kafka.streams.KafkaStreams - stream-client [-] Streams client cannot stop completely within the timeout

Вы можете попасть в тупиковую ситуацию, и вам следует применить тайм-аут к операциям close(). Cf issues.apache.org/jira/browse/KAFKA-5571 и issues.apache.org/jira/browse/KAFKA-4366

Matthias J. Sax 12.09.2018 23:08

Спасибо, Матиас, но я просто изо всех сил пытаюсь заставить его что-нибудь сделать прямо сейчас - я действительно попытался ввести явный тайм-аут в 60 секунд, просто чтобы посмотреть, что произойдет, и все, что произошло, было то, что он продолжал обрабатывать материал в течение 60 секунд, а затем зарегистрировал, что не может перейти в состояние в течение тайм-аута.

AndyMoose 13.09.2018 02:55

Не уверен, в чем может быть проблема. Какую версию ты используешь?

Matthias J. Sax 13.09.2018 07:29

Версия 2.0.0 - в конце концов я отказался и вместо этого использую соединитель Alpakka Kafka - он все равно лучше работает с Futures и использует полнофункциональные потоки Akka, поэтому больше подходит для того, что я делаю.

AndyMoose 15.09.2018 23:41
Стоит ли изучать PHP в 2023-2024 годах?
Стоит ли изучать PHP в 2023-2024 годах?
Привет всем, сегодня я хочу высказать свои соображения по поводу вопроса, который я уже много раз получал в своем сообществе: "Стоит ли изучать PHP в...
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
В JavaScript одним из самых запутанных понятий является поведение ключевого слова "this" в стрелочной и обычной функциях.
Приемы CSS-макетирования - floats и Flexbox
Приемы CSS-макетирования - floats и Flexbox
Здравствуйте, друзья-студенты! Готовы совершенствовать свои навыки веб-дизайна? Сегодня в нашем путешествии мы рассмотрим приемы CSS-верстки - в...
Тестирование функциональных ngrx-эффектов в Angular 16 с помощью Jest
В системе управления состояниями ngrx, совместимой с Angular 16, появились функциональные эффекты. Это здорово и делает код определенно легче для...
Концепция локализации и ее применение в приложениях React ⚡️
Концепция локализации и ее применение в приложениях React ⚡️
Локализация - это процесс адаптации приложения к различным языкам и культурным требованиям. Это позволяет пользователям получить опыт, соответствующий...
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
3
4
733
0

Другие вопросы по теме