Spring аспект интерфейса прослушивателя Kafka

Я пытаюсь создать аспект и обработать каждую запись, которую мое приложение использовало через Kafka.

Этот метод не работает

import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.springframework.stereotype.Component;

@Slf4j
@Aspect
@Component
public class Test{

    @Around("execution(* org.springframework.kafka.listener.MessageListener.onMessage(..))")
    public Object onInvoke(ProceedingJoinPoint  joinPoint) throws Throwable {
        log.debug("In aspect");
        Object[] args = joinPoint.getArgs();
       return joinPoint.proceed();
    }
}

Я также пробовал получать записи по аспектам с аннотацией @KafkaListener. Он работает, как и ожидалось, но я также хочу определять имена тем для записей (я использую RetryTopicConfiguration и @KafkaListener(topics = "mainTopic") ).

Вы забыли указать целевой класс. Это Spring-управляется?

kriegaex 16.04.2024 17:36
0
1
67
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

@KafkaListener не использует MessageListener, и даже если бы это было так, я все еще не управлял им как bean-компонентом в контексте приложения, поэтому создание аспекта для этого типа - не лучший вариант.

У вас другое решение метода @KafkaListener правильное. Вы можете указать тему, из которой будет использоваться запись, с помощью дополнительного аргумента метода против @Header(KafkaHeaders.RECEIVED_TOPIC).

Существует также абстракция RecordInterceptor, которую нужно внедрить в AbstractKafkaListenerContainerFactory, используемую для упомянутой @KafkaListener инфраструктуры.

Спасибо, RecordInterceptor это то, что нужно.

Ytug Ilya 17.04.2024 10:26

Другие вопросы по теме