Если я создаю и запускаю экземпляр KafkaStream, а затем при вызове ловушки выключения .close (), ничего не происходит - журнал показывает, что один существующий StreamThread перешел в состояние PENDING_SHUTDOWN, но затем просто сидит так навсегда. Я пробовал все безуспешно - прочитал исходный код потоков Kafka, чтобы увидеть, что он делает во время выключения, но, на мой взгляд, код подразумевает, что StreamThread, если он находится в рабочем состоянии, никогда не остановится (что не может быть правильным - Я не видел подобных ошибок в Kafka JIRA).
Вот соответствующий код моего простого приложения KafkaStream (scala):
val props: Properties = {
val p = new Properties()
p.put(StreamsConfig.APPLICATION_ID_CONFIG, "test-application")
p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
p.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, "1")
p
}
implicit val produced = Produced.`with`(new StringSerde(), new StringSerde())
val builder: StreamsBuilder = new StreamsBuilder()
val in: KStream[String, String] = builder.stream[String, String]("input-topic")
in.map((k,v) =>{
println("Consumed and transforming value")
(k,s"$v_transformed")
}).to("output-topic")
val streams: KafkaStreams = new KafkaStreams(builder.build(), props)
streams.start()
sys.addShutdownHook(streams.stop())
Как видите, он просто читает из одной темы, вносит небольшое изменение в значение из потребляемой записи и записывает его в другую тему.
Поток запускается, и приложение вызывает stop (), когда ему отправляется SIGINT.
Когда я ^ C, чтобы завершить процесс, я вижу, что в журнале Kafka говорится, что StreamThread-1 переводится в PENDING_SHUTDOWN, и это все, что когда-либо было. В конечном итоге (в течение нескольких секунд) он должен достичь состояния NOT_RUNNING, но этого не происходит, и он продолжает выводить мой оператор println для каждой записи, которую он продолжает читать из темы ввода.
Что я здесь делаю не так?
ОБНОВЛЕНИЕ: согласно предложению комментатора, я попытался вызвать close () с тайм-аутом 60 секунд и в конце концов получил это, но все еще не выключился: -
[shutdownHook1] INFO o.apache.kafka.streams.KafkaStreams - stream-client [-] Streams client cannot stop completely within the timeout
Спасибо, Матиас, но я просто изо всех сил пытаюсь заставить его что-нибудь сделать прямо сейчас - я действительно попытался ввести явный тайм-аут в 60 секунд, просто чтобы посмотреть, что произойдет, и все, что произошло, было то, что он продолжал обрабатывать материал в течение 60 секунд, а затем зарегистрировал, что не может перейти в состояние в течение тайм-аута.
Не уверен, в чем может быть проблема. Какую версию ты используешь?
Версия 2.0.0 - в конце концов я отказался и вместо этого использую соединитель Alpakka Kafka - он все равно лучше работает с Futures и использует полнофункциональные потоки Akka, поэтому больше подходит для того, что я делаю.
Вы можете попасть в тупиковую ситуацию, и вам следует применить тайм-аут к операциям
close()
. Cf issues.apache.org/jira/browse/KAFKA-5571 и issues.apache.org/jira/browse/KAFKA-4366