Я пытаюсь создать аспект и обработать каждую запись, которую мое приложение использовало через Kafka.
Этот метод не работает
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.springframework.stereotype.Component;
@Slf4j
@Aspect
@Component
public class Test{
@Around("execution(* org.springframework.kafka.listener.MessageListener.onMessage(..))")
public Object onInvoke(ProceedingJoinPoint joinPoint) throws Throwable {
log.debug("In aspect");
Object[] args = joinPoint.getArgs();
return joinPoint.proceed();
}
}
Я также пробовал получать записи по аспектам с аннотацией @KafkaListener. Он работает, как и ожидалось, но я также хочу определять имена тем для записей (я использую RetryTopicConfiguration и @KafkaListener(topics = "mainTopic") ).
@KafkaListener
не использует MessageListener
, и даже если бы это было так, я все еще не управлял им как bean-компонентом в контексте приложения, поэтому создание аспекта для этого типа - не лучший вариант.
У вас другое решение метода @KafkaListener
правильное. Вы можете указать тему, из которой будет использоваться запись, с помощью дополнительного аргумента метода против @Header(KafkaHeaders.RECEIVED_TOPIC)
.
Существует также абстракция RecordInterceptor
, которую нужно внедрить в AbstractKafkaListenerContainerFactory
, используемую для упомянутой @KafkaListener
инфраструктуры.
Спасибо, RecordInterceptor
это то, что нужно.
Вы забыли указать целевой класс. Это Spring-управляется?