Я НЕ могу остановить потребителя JMS
динамически, используя конечную точку Spring Boot
REST.
Количество потребителей остается прежним. Исключений тоже нет.
IBM MQ Version: 9.2.0.5
пом.xml
<dependency>
<groupId>com.ibm.mq</groupId>
<artifactId>mq-jms-spring-boot-starter</artifactId>
<version>2.0.8</version>
</dependency>
JmsConfig.java
@Configuration
@EnableJms
@Log4j2
public class JmsConfig {
@Bean
public MQQueueConnectionFactory mqQueueConnectionFactory() {
MQQueueConnectionFactory mqQueueConnectionFactory = new MQQueueConnectionFactory();
mqQueueConnectionFactory.setHostName("my-ibm-mq-host.com");
try {
mqQueueConnectionFactory.setTransportType(WMQConstants.WMQ_CM_CLIENT);
mqQueueConnectionFactory.setCCSID(1208);
mqQueueConnectionFactory.setChannel("my-channel");
mqQueueConnectionFactory.setPort(1234);
mqQueueConnectionFactory.setQueueManager("my-QM");
} catch (Exception e) {
log.error("Exception while creating JMS connecion...", e.getMessage());
}
return mqQueueConnectionFactory;
}
}
JmsListenerConfig.java
@Configuration
@Log4j2
public class JmsListenerConfig implements JmsListenerConfigurer {
@Autowired
private JmsConfig jmsConfig;
private Map<String, String> queueMap = new HashMap<>();
@Bean
public DefaultJmsListenerContainerFactory mqJmsListenerContainerFactory() throws JMSException {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(jmsConfig.mqQueueConnectionFactory());
factory.setDestinationResolver(new DynamicDestinationResolver());
factory.setSessionTransacted(true);
factory.setConcurrency("5");
return factory;
}
@Override
public void configureJmsListeners(JmsListenerEndpointRegistrar registrar) {
queueMap.put("my-queue-101", "101");
log.info("queueMap: " + queueMap);
queueMap.entrySet().forEach(e -> {
SimpleJmsListenerEndpoint endpoint = new SimpleJmsListenerEndpoint();
endpoint.setDestination(e.getKey());
endpoint.setId(e.getValue());
try {
log.info("Reading message....");
endpoint.setMessageListener(message -> {
try {
log.info("Receieved ID: {} Destination {}", message.getJMSMessageID(), message.getJMSDestination());
} catch (JMSException ex) {
log.error("Exception while reading message - " + ex.getMessage());
}
});
registrar.setContainerFactory(mqJmsListenerContainerFactory());
} catch (JMSException ex) {
log.error("Exception while reading message - " + ex.getMessage());
}
registrar.registerEndpoint(endpoint);
});
}
}
JmsController.java
@RestController
@RequestMapping("/jms")
@Log4j2
public class JmsController {
@Autowired
ApplicationContext context;
@RequestMapping(value = "/stop", method = RequestMethod.GET)
public @ResponseBody
String haltJmsListener() {
JmsListenerEndpointRegistry listenerEndpointRegistry = context.getBean(JmsListenerEndpointRegistry.class);
Set<String> containerIds = listenerEndpointRegistry.getListenerContainerIds();
log.info("containerIds: " + containerIds);
//stops all consumers
listenerEndpointRegistry.stop(); //DOESN'T WORK :(
//stops a consumer by id, used when there are multiple consumers and want to stop them individually
//listenerEndpointRegistry.getListenerContainer("101").stop(); //DOESN'T WORK EITHER :(
return "Jms Listener stopped";
}
}
Вот результат, который я заметил.
http://localhost:8080/jms/stop
общее количество потребителей: 1 (НЕ, как ожидалось, должно вернуться к 0)Я пропустил какую-либо конфигурацию?
Вам также нужно вызвать shutDown
на контейнере; см. мой комментарий к этому ответу «isActive» DefaultMessageListenerContainer против «isRunning»
start()/stop()
установить/сброситьrunning
;initialize()/shutDown()
установить/сброситьactive
. Это зависит от ваших требований.stop()
просто останавливает потребителей от получения новых сообщений, но потребители все еще существуют.shutDown()
закрывает потребителей. Большинство людей звонятstop + shutdown
, а затемinitialize + start
, чтобы перезапустить. Но если вы просто хотите ненадолго перестать есть, стоп/старт — это все, что вам нужно.
Вам нужно будет перебрать контейнеры и привести их к вызову shutDown()
.
Спасибо, Гэри, как мне вызвать метод shutDown()
из listenerEndpointRegistry
? Не уверен, к чему приводить.
Понял, неважно. DefaultMessageListenerContainer dmlc = (DefaultMessageListenerContainer) listenerEndpointRegistry.getListenerContainer(containerId); dmlc.shutdown();
Соответствует ли результат
log.info("containerIds: " + containerIds);
"101"
?