Сообщение Axon получено, но обработчик событий не вызван

Сообщение Axon получено, но обработчик события не вызван.

Я пытаюсь реализовать поиск событий на обеих сторонах с буксировкой другой очереди. Моя первая очередь - контрольная работа, а вторая - testdemo

У меня есть два отдельных приложения, работающих на одном сервере.

  1. Управление пользователями
  2. Управление кошельком

Я реализовал источник событий от управления пользователями до управления кошельком. и он работает нормально.

Теперь я пытаюсь реализовать управление кошельком в UserManagement, это означает, что когда я опубликую событие из управления кошельком (производитель) и (использовать) приложение для управления пользователями. Итак, событие получено, но обработчик события не вызывается.

Ниже приведен код моего приложения. Пожалуйста, помогите мне понять, чего мне будет не хватать.

Мой класс конфигурации Axon

package com.peaas.ngapblueprintdemo.config;

import org.axonframework.amqp.eventhandling.DefaultAMQPMessageConverter;
import org.axonframework.amqp.eventhandling.spring.SpringAMQPMessageSource;
import org.axonframework.serialization.Serializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.ExchangeBuilder;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import com.rabbitmq.client.Channel;

@Configuration
public class AxonConfiguration {

    private final static Logger logger = LoggerFactory.getLogger(AxonConfiguration.class);

    @Value("${axon.amqp.exchange}")
    private String exchange;

    @Bean
    public Exchange exchange() {
        logger.info(exchange + " AMQP Exchange Registering ");
        return ExchangeBuilder.fanoutExchange(exchange).build();
    }

    @Bean
    public Queue queue() {
        return QueueBuilder.durable(exchange).build();
    }

    @Bean
    public Binding binding() {
        return BindingBuilder.bind(queue()).to(exchange()).with("*").noargs();
    }

    @Autowired
    public void configure(AmqpAdmin amqpAdmin) {
        amqpAdmin.declareExchange(exchange());
        amqpAdmin.declareQueue(queue());
        amqpAdmin.declareBinding(binding());
    }   

    @Bean
    public SpringAMQPMessageSource testdemo(Serializer serializer) {
        System.out.println("--- On Message Call ---");
        return new SpringAMQPMessageSource(new DefaultAMQPMessageConverter(serializer)) {

            @RabbitListener(queues = "testdemo")

            @Override
            public void onMessage(Message message, Channel channel) throws Exception {
                System.out.println(message.getMessageProperties());
                System.out.println("channel == "+channel);
                super.onMessage(message, channel);
            }
        };
    }
}

Класс WalletCreatedEvent

package com.peaas.ngapblueprintdemo.events;

public class WalletCreatedEvent {
    private Long id;
    private String walletId;
    private Double amount;
    private Long userId;

    public WalletCreatedEvent(Long id, String walletId, Double amount, Long userId) {       
        super();
        System.out.println("--- call ---");
        this.id = id;
        this.walletId = walletId;
        this.amount = amount;
        this.userId = userId;
    }

    public Long getId() {
        return id;
    }

    public void setId(Long id) {
        this.id = id;
    }

    public Double getAmount() {
        return amount;
    }

    public void setAmount(Double amount) {
        this.amount = amount;
    }

    public Long getUserId() {
        return userId;
    }

    public void setUserId(Long userId) {
        this.userId = userId;
    }

    public String getWalletId() {
        return walletId;
    }

    public void setWalletId(String walletId) {
        this.walletId = walletId;
    }

    @Override
    public String toString() {
        return "WalletCreatedEvent [id = " + id + ", walletId = " + walletId + ", amount = " + amount + ", userId = " + userId
                + "]";
    }

}

Класс EventHandler

package com.peaas.ngapblueprintdemo.eventHandlers;

import org.axonframework.eventhandling.EventHandler;
import org.springframework.stereotype.Component;

import com.peaas.ngapblueprintdemo.events.WalletCreatedEvent;

@Component
public class UserEventHandler {

    @EventHandler
    public void onCreateWalletEvent(WalletCreatedEvent event) {
        System.out.println("--- Wallet Created Successfully ---");
        System.out.println(event);
    }   
}

Ниже приведены мои свойства файла application.yml.

axon:
    amqp:
        exchange: test
    eventhandling:
        processors:
            amqpEvents:
                source: testdemo

Ниже приведены данные моего журнала, показывающие, что получено событие.

MessageProperties [headers = {axon-message-id=fa60968c-6905-46b5-8afe-6da853a4c51a, axon-message-aggregate-seq=0, axon-metadata-correlationId=589ef284-176f-49b8-aae0-0ad1588fa735, axon-message-aggregate-type=WalletAggregate, axon-message-revision=null, axon-message-timestamp=2018-08-06T11:09:26.345Z, axon-message-type=com.peaas.ngapblueprintdemo.events.WalletCreatedEvent, axon-metadata-traceId=589ef284-176f-49b8-aae0-0ad1588fa735, axon-message-aggregate-id=9524f7df-44fb-477f-83b8-d176583a126e}, contentLength=0, receivedDeliveryMode=PERSISTENT, redelivered=false, receivedExchange=testdemo, receivedRoutingKey=com.peaas.ngapblueprintdemo.events, deliveryTag=1, consumerTag=amq.ctag-fGm3jQcP_JIoTGf4ZMhAIg, consumerQueue=testdemo]
channel == Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/,1), conn: Proxy@3dcd657d Shared Rabbit Connection: SimpleConnection@19b12fd2 [delegate=amqp://[email protected]:5672/, localPort= 52963]
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
В компьютерном программировании биты играют важнейшую роль в представлении и манипулировании данными на двоичном уровне. Побитовые операции...
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Приходилось ли вам сталкиваться с требованиями, в которых вас могли попросить поднять тревогу или выдать ошибку, когда метод Java занимает больше...
Полный курс Java для разработчиков веб-сайтов и приложений
Полный курс Java для разработчиков веб-сайтов и приложений
Получите сертификат Java Web и Application Developer, используя наш курс.
1
0
1 237
1

Ответы 1

У вас есть большая часть правильной конфигурации, но вы забываете привязать свой SpringAMQPMessageSource к процессору событий, под которым находится ваш компонент обработки событий.

См. Правильный пример того, как этого добиться, в справочное руководство.

Вот прямой фрагмент из этого справочного руководства для настройки источника сообщения для обработчика событий:

@Autowired
public void configure(EventHandlingConfiguration ehConfig, SpringAmqpMessageSource myMessageSource) {
ehConfig.registerSubscribingEventProcessor("myProcessor", c -> myMessageSource);
}

Редактировать

Думаю, я понимаю, какой части вам не хватает. Вы правильно подключили очередь как подписываемый источник сообщений к обработчику событий. Это следует из вашего application.yml, который связывает источник сообщения testdemo с процессором событий amqpEvents. Таким образом, извините за мое предыдущее предположение по этому поводу.

Причина, по которой вы не получаете свои события в UserEventHandler, заключается в том, что этот обработчик событий не привязан к процессору событий amqpEvents. Чтобы решить эту проблему, вы должны добавить аннотацию @ProcessingGroup("amqpEvents") к компоненту UserEventHandler.

Я пробовал использовать ту же конфигурацию, но проблема все равно осталась.

user2745328 06.08.2018 15:50

Я хотел бы предложить вам предоставить фрагмент того, как вы подписываете свой SubscribingEventProcessor, как я указал, обновив исходный билет, чтобы я мог понять, что вам там не хватает.

Steven 07.08.2018 09:22

Привет, @Steven, у меня такая же конфигурация и привязка SagaConfiguration к моему SpringAMQPMessageSource, но вместо SagaEventHandler ее слушает обработчик событий.

TGW 09.08.2018 08:45

@TGW Я не уверен, понимаю ли я ваш вопрос или утверждение. Вы имеете в виду, что это работает для вас, или что это не работает для вас в данный момент?

Steven 09.08.2018 09:27

Хорошо, на самом деле это сага, которая публикует событие, которое используется другой службой, которая, в свою очередь, возвращает событие подтверждения успеха, но в идеале это событие должно обрабатываться в sagaeventhandler для завершения саги. Но вместо этого он прослушивается только обычным обработчиком событий, а не @Sagaeventhandler.

TGW 09.08.2018 10:03

Я также столкнулся с той же проблемой, касающейся саги, которую не обрабатывает событие. Вот мой репозиторий битбакетов ссылка на сайт

user2745328 09.08.2018 12:17

Если @SagaEventHandler не работает, я могу подумать о двух проблемах, которые могут вызвать это: 1. associationValue не соответствует ни одной существующей саге 2. Ваша сага поддерживается TrackingEventProcessor, который по определению не получает события от SubscribableMessageSource, как очереди AMQP.

Steven 13.08.2018 13:11

Другие вопросы по теме