Я разрабатываю приложение для весенней загрузки, которое будет слушать ibm mq с
@JmsListener(id = "abc", destination = "${queueName}", containerFactory = "defaultJmsListenerContainerFactory")
У меня есть JmsListenerEndpointRegistry, который запускает listenerContainer.
В сообщении будет пытаться отправить то же сообщение с некоторой бизнес-логикой в кафку. Код плаката:
kafkaTemplate.send(kafkaProp.getTopic(), uniqueId, message)
Теперь, если производитель kafka выйдет из строя, я хочу, чтобы мое загрузочное приложение было остановлено. Итак, я добавил обычай
setErrorHandler.
Итак, я попробовал
`System.exit(1)`, `configurableApplicationContextObject.close()`, `Runtime.getRuntime.exit(1)`.
Но ни один из них не работает. Ниже приведен журнал, который создается после
System.exit(0) или выше других.
2018-05-24 12:12:47.981 INFO 18904 --- [ Thread-4] s.c.a.AnnotationConfigApplicationContext : Closing org.springframework.context.annotation.AnnotationConfigApplicationContext@1d08376: startup date [Thu May 24 12:10:35 IST 2018]; root of context hierarchy
2018-05-24 12:12:48.027 INFO 18904 --- [ Thread-4] o.s.c.support.DefaultLifecycleProcessor : Stopping beans in phase 2147483647
2018-05-24 12:12:48.028 INFO 18904 --- [ Thread-4] o.s.c.support.DefaultLifecycleProcessor : Stopping beans in phase 0
2018-05-24 12:12:48.028 INFO 18904 --- [ Thread-4] o.s.j.e.a.AnnotationMBeanExporter : Unregistering JMX-exposed beans on shutdown
2018-05-24 12:12:48.028 INFO 18904 --- [ Thread-4] o.a.k.clients.producer.KafkaProducer : Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
2018-05-24 12:12:48.044 INFO 18904 --- [ Thread-4] o.a.k.clients.producer.KafkaProducer : Closing the Kafka producer with timeoutMillis = 30000 ms.
Но приложение все еще работает, и ниже находятся запущенные потоки
Daemon Thread [Tomcat JDBC Pool Cleaner[14341596:1527144039908]] (Running)
Thread [DefaultMessageListenerContainer-1] (Running)
Thread [DestroyJavaVM] (Running)
Daemon Thread [JMSCCThreadPoolMaster] (Running)
Daemon Thread [RcvThread: com.ibm.mq.jmqi.remote.impl.RemoteTCPConnection@12474910[qmid=*******,fap=**,channel=****,ccsid=***,sharecnv=***,hbint=*****,peer=*******,localport=****,ssl=****]] (Running)
Thread [Thread-4] (Running)
Помощь очень ценится. Заранее спасибо. Я просто хочу, чтобы приложение закрылось.
Ниже приведен дамп потока перед вызовом System.exit(1).
"DefaultMessageListenerContainer-1"
java.lang.Thread.State: RUNNABLE
at sun.management.ThreadImpl.getThreadInfo1(Native Method)
at sun.management.ThreadImpl.getThreadInfo(ThreadImpl.java:174)
at com.QueueErrorHandler.handleError(QueueErrorHandler.java:42)
at org.springframework.jms.listener.AbstractMessageListenerContainer.invokeErrorHandler(AbstractMessageListenerContainer.java:931)
at org.springframework.jms.listener.AbstractMessageListenerContainer.handleListenerException(AbstractMessageListenerContainer.java:902)
at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:326)
at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:235)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1166)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1158)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:1055)
at java.lang.Thread.run(Thread.java:745)




Вы должны сделать дамп потока, чтобы увидеть, что делает Thread [DefaultMessageListenerContainer-1] (Running).
Now in case a kafka producer fails
Что за неудача? Если брокер не работает, поток по умолчанию блокируется в библиотеке производителя на срок до 60 секунд.
Вы можете сократить это время, установив свойство производителя max.block.ms.
Не помещайте подобные вещи в комментарии - это нечитабельно - вместо этого отредактируйте свой вопрос, чтобы добавить дамп потока. Включите название потока и т. д.
Пара решений, которые помогли мне решить выше.
Решения 1. Получите все потоки в обработчике ошибок и прервите их все, а затем создайте систему.
ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();
ThreadInfo[] threadInfos = threadMXBean.getThreadInfo(threadMXBean.getAllThreadIds(), 100);
for (ThreadInfo threadInfo : threadInfos) {
Thread.currentThread().interrupt();
}
System.exit(1);
Решение 2. Определите диспетчер контекста приложения. Нравиться
public class AppContextManager implements ApplicationContextAware {
private static ApplicationContext _appCtx;
@Override
public void setApplicationContext(ApplicationContext ctx){
_appCtx = ctx;
}
public static ApplicationContext getAppContext(){
return _appCtx;
}
public static void exit(Integer exitCode) {
System.exit(SpringApplication.exit(_appCtx,() -> exitCode));
}
}
Затем используйте тот же менеджер для выхода из обработчика ошибок
Executors.newSingleThreadExecutor().execute(new Runnable() {
public void run() {
jmsListenerEndpointRegistry.stop();
AppContextManager.exit(-1);
}
});
Спасибо. Да, я хочу, чтобы приложение не сработало, если мой продюсер не разместит сообщение на kafka. Это также может быть неудача брокера. Пробовал с
ProducerConfig.MAX_BLOCK_MS_CONFIG, 500Но не повезло.