У меня есть проект springboot + spring cloud streams, там у меня настроены несколько прослушивателей потоков, и я хочу добиться перехвата каждого входящего сообщения, чтобы извлечь из него заголовок и что-то сделать с его значением до того, как сообщение будет фактически обработанный. Для этого я создал совет АОП следующим образом:
@Aspect
@Component
@Order(Ordered.HIGHEST_PRECEDENCE)
public class StreamDyeContextPropagator {
@Pointcut(value = "@annotation(listener) && execution(* com.mycompany.subpackage..*(org.springframework.messaging.Message+)) && args(message)", argNames = "listener, message")
public void streamListener(StreamListener listener, Message<?> message) {
}
@Around(value = "streamListener(listener, message)", argNames = "pjp, listener, message")
public Object retrieveDye(final ProceedingJoinPoint pjp, StreamListener listener, Message<?> message) throws Throwable {
// Some logic here
}
}
Ниже приведен код прослушивателя весенних облачных потоков:
@EnableBinding(ExchangeRateSink.class)
public class ExchangeRateFromStreamListener {
@Loggable
@StreamListener(ExchangeRateSink.NEWEXCHANGERATE)
public void handle(Message<NewExchangeRateMessage> message) {
// Some logic here
}
Но когда я запускаю проект, я получаю следующее исключение:
Caused by: java.lang.IllegalStateException: Required to bind 3 arguments, but only bound 1 (JoinPointMatch was NOT bound in invocation) at org.springframework.aop.aspectj.AbstractAspectJAdvice.argBinding(AbstractAspectJAdvice.java:605) ...
Любая помощь, пожалуйста?
PS: я знаю, что могу использовать @GlobalChannelInterceptor, чтобы сделать что-то подобное, но я хочу сделать это через АОП, и я хочу знать, что здесь происходит. Заранее спасибо!
Ну, @StreamListener сам по себе приводит к прокси, так что он не будет работать, не слишком углубляясь в сорняки. @GlobalChannelInterceptor является естественным и рекомендуемым решением в контексте весеннего облачного потока (SCSt).
Я не уверен, что @StreamListener, приводящий к тому, что прокси-сервер помешает ему работать, поскольку у меня есть другой аспект для выполнения ведения журнала, который действует по тому же методу. Смысл в том, чтобы делать это с помощью аспектов, заключается в том, что, как я только что сказал, другой аспект заботится о регистрации (также HTTP-запросов), и мне нужна эта функция для извлечения «уникального идентификатора запроса» из входящих сообщений и добавления его в MDC, чтобы он присутствовал в каждой трассировке журнала. Таким образом, если мы добавим его через перехватчик канала, в каком порядке они будут выполняться? Я не могу это указать.




Я тестирую и обнаруживаю, что вы не можете использовать @Order(Ordered.HIGHEST_PRECEDENCE), поскольку Ordered.HIGHEST_PRECEDENCE обычно используется для инициализации запуска.
Я предлагаю вам заменить его на @Order(1).
Не ясно, является ли это вашей проблемой, но это была проблема для меня, и этот вопрос является первым результатом Google, поэтому я публикую его здесь как ответ.
Кажется, вы не можете вызывать proceedingJoinPoint.proceed() откуда угодно, кроме метода, аннотированного @Around. Вы не можете вызвать его из лямбды. Вы не можете передать его методу в другом классе.
Я не проверял, но вы даже не сможете передать его другому методу в том же классе.
Разве вам не нужно указывать pointcut в аннотации
@Around?