Сообщение Axon получено, но обработчик события не вызван.
Я пытаюсь реализовать поиск событий на обеих сторонах с буксировкой другой очереди. Моя первая очередь - контрольная работа, а вторая - testdemo
У меня есть два отдельных приложения, работающих на одном сервере.
Я реализовал источник событий от управления пользователями до управления кошельком. и он работает нормально.
Теперь я пытаюсь реализовать управление кошельком в UserManagement, это означает, что когда я опубликую событие из управления кошельком (производитель) и (использовать) приложение для управления пользователями. Итак, событие получено, но обработчик события не вызывается.
Ниже приведен код моего приложения. Пожалуйста, помогите мне понять, чего мне будет не хватать.
Мой класс конфигурации Axon
package com.peaas.ngapblueprintdemo.config;
import org.axonframework.amqp.eventhandling.DefaultAMQPMessageConverter;
import org.axonframework.amqp.eventhandling.spring.SpringAMQPMessageSource;
import org.axonframework.serialization.Serializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.ExchangeBuilder;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import com.rabbitmq.client.Channel;
@Configuration
public class AxonConfiguration {
private final static Logger logger = LoggerFactory.getLogger(AxonConfiguration.class);
@Value("${axon.amqp.exchange}")
private String exchange;
@Bean
public Exchange exchange() {
logger.info(exchange + " AMQP Exchange Registering ");
return ExchangeBuilder.fanoutExchange(exchange).build();
}
@Bean
public Queue queue() {
return QueueBuilder.durable(exchange).build();
}
@Bean
public Binding binding() {
return BindingBuilder.bind(queue()).to(exchange()).with("*").noargs();
}
@Autowired
public void configure(AmqpAdmin amqpAdmin) {
amqpAdmin.declareExchange(exchange());
amqpAdmin.declareQueue(queue());
amqpAdmin.declareBinding(binding());
}
@Bean
public SpringAMQPMessageSource testdemo(Serializer serializer) {
System.out.println("--- On Message Call ---");
return new SpringAMQPMessageSource(new DefaultAMQPMessageConverter(serializer)) {
@RabbitListener(queues = "testdemo")
@Override
public void onMessage(Message message, Channel channel) throws Exception {
System.out.println(message.getMessageProperties());
System.out.println("channel == "+channel);
super.onMessage(message, channel);
}
};
}
}
Класс WalletCreatedEvent
package com.peaas.ngapblueprintdemo.events;
public class WalletCreatedEvent {
private Long id;
private String walletId;
private Double amount;
private Long userId;
public WalletCreatedEvent(Long id, String walletId, Double amount, Long userId) {
super();
System.out.println("--- call ---");
this.id = id;
this.walletId = walletId;
this.amount = amount;
this.userId = userId;
}
public Long getId() {
return id;
}
public void setId(Long id) {
this.id = id;
}
public Double getAmount() {
return amount;
}
public void setAmount(Double amount) {
this.amount = amount;
}
public Long getUserId() {
return userId;
}
public void setUserId(Long userId) {
this.userId = userId;
}
public String getWalletId() {
return walletId;
}
public void setWalletId(String walletId) {
this.walletId = walletId;
}
@Override
public String toString() {
return "WalletCreatedEvent [id = " + id + ", walletId = " + walletId + ", amount = " + amount + ", userId = " + userId
+ "]";
}
}
Класс EventHandler
package com.peaas.ngapblueprintdemo.eventHandlers;
import org.axonframework.eventhandling.EventHandler;
import org.springframework.stereotype.Component;
import com.peaas.ngapblueprintdemo.events.WalletCreatedEvent;
@Component
public class UserEventHandler {
@EventHandler
public void onCreateWalletEvent(WalletCreatedEvent event) {
System.out.println("--- Wallet Created Successfully ---");
System.out.println(event);
}
}
Ниже приведены мои свойства файла application.yml.
axon:
amqp:
exchange: test
eventhandling:
processors:
amqpEvents:
source: testdemo
Ниже приведены данные моего журнала, показывающие, что получено событие.
MessageProperties [headers = {axon-message-id=fa60968c-6905-46b5-8afe-6da853a4c51a, axon-message-aggregate-seq=0, axon-metadata-correlationId=589ef284-176f-49b8-aae0-0ad1588fa735, axon-message-aggregate-type=WalletAggregate, axon-message-revision=null, axon-message-timestamp=2018-08-06T11:09:26.345Z, axon-message-type=com.peaas.ngapblueprintdemo.events.WalletCreatedEvent, axon-metadata-traceId=589ef284-176f-49b8-aae0-0ad1588fa735, axon-message-aggregate-id=9524f7df-44fb-477f-83b8-d176583a126e}, contentLength=0, receivedDeliveryMode=PERSISTENT, redelivered=false, receivedExchange=testdemo, receivedRoutingKey=com.peaas.ngapblueprintdemo.events, deliveryTag=1, consumerTag=amq.ctag-fGm3jQcP_JIoTGf4ZMhAIg, consumerQueue=testdemo]
channel == Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/,1), conn: Proxy@3dcd657d Shared Rabbit Connection: SimpleConnection@19b12fd2 [delegate=amqp://[email protected]:5672/, localPort= 52963]
У вас есть большая часть правильной конфигурации, но вы забываете привязать свой SpringAMQPMessageSource
к процессору событий, под которым находится ваш компонент обработки событий.
См. Правильный пример того, как этого добиться, в справочное руководство.
Вот прямой фрагмент из этого справочного руководства для настройки источника сообщения для обработчика событий:
@Autowired
public void configure(EventHandlingConfiguration ehConfig, SpringAmqpMessageSource myMessageSource) {
ehConfig.registerSubscribingEventProcessor("myProcessor", c -> myMessageSource);
}
Редактировать
Думаю, я понимаю, какой части вам не хватает.
Вы правильно подключили очередь как подписываемый источник сообщений к обработчику событий. Это следует из вашего application.yml
, который связывает источник сообщения testdemo
с процессором событий amqpEvents
. Таким образом, извините за мое предыдущее предположение по этому поводу.
Причина, по которой вы не получаете свои события в UserEventHandler
, заключается в том, что этот обработчик событий не привязан к процессору событий amqpEvents
.
Чтобы решить эту проблему, вы должны добавить аннотацию @ProcessingGroup("amqpEvents")
к компоненту UserEventHandler
.
Я хотел бы предложить вам предоставить фрагмент того, как вы подписываете свой SubscribingEventProcessor
, как я указал, обновив исходный билет, чтобы я мог понять, что вам там не хватает.
Привет, @Steven, у меня такая же конфигурация и привязка SagaConfiguration к моему SpringAMQPMessageSource, но вместо SagaEventHandler ее слушает обработчик событий.
@TGW Я не уверен, понимаю ли я ваш вопрос или утверждение. Вы имеете в виду, что это работает для вас, или что это не работает для вас в данный момент?
Хорошо, на самом деле это сага, которая публикует событие, которое используется другой службой, которая, в свою очередь, возвращает событие подтверждения успеха, но в идеале это событие должно обрабатываться в sagaeventhandler для завершения саги. Но вместо этого он прослушивается только обычным обработчиком событий, а не @Sagaeventhandler.
Я также столкнулся с той же проблемой, касающейся саги, которую не обрабатывает событие. Вот мой репозиторий битбакетов ссылка на сайт
Если @SagaEventHandler
не работает, я могу подумать о двух проблемах, которые могут вызвать это: 1. associationValue
не соответствует ни одной существующей саге 2. Ваша сага поддерживается TrackingEventProcessor
, который по определению не получает события от SubscribableMessageSource
, как очереди AMQP.
Я пробовал использовать ту же конфигурацию, но проблема все равно осталась.