Блокировать утилиту GetMapping для получения сообщения от Redis при загрузке Spring

У меня есть две службы: Служба A и Служба B, использующие Redis для связи между службами.

Служба A с использованием Spring загрузки.

Служба B с использованием expressjs (это похоже на потребителя, потребляет сообщение, а затем публикует сообщение, уведомляющее службу A: «Эй, служба A, я закончил»)

Служба А должна предоставить две конечные точки: /api-publish и /api-consume. Когда пользователь отправляет запрос к конечной точке /api-publish, Служба A должна опубликовать сообщение в теме Redis с именем topic1.

Служба B должна быть настроена на получение сообщений от topic1. После успешной обработки сообщения от topic1 Служба B должна опубликовать сообщение в другой теме Redis с именем topic2.

Все, что я хочу сделать — запросить конечную точку /api-consume в Службе A, служба должна подождать, пока Служба B успешно не опубликует сообщение для topic2. Только тогда Служба А должна вернуть ответ пользователю.

Вот моя конфигурация Redis в сервисе A:

@Configuration
public class RedisConfig {

    @Value("${spring.data.redis.host}")
    private String redisHost;

    @Value("${spring.data.redis.port}")
    private int redisPort;

    @Bean
    public RedisConnectionFactory lettuceConnectionFactory() {
        return new LettuceConnectionFactory(new RedisStandaloneConfiguration(redisHost, redisPort));
    }

    @Bean
    public RedisTemplate<?, ?> redisTemplate() {
        RedisTemplate<?, ?> template = new RedisTemplate<>();
        template.setConnectionFactory(lettuceConnectionFactory());
        template.setHashKeySerializer(new StringRedisSerializer());
        template.setKeySerializer(new StringRedisSerializer());
        template.setHashValueSerializer(new StringRedisSerializer());
        template.setValueSerializer(new StringRedisSerializer());
        return template;
    }

    @Bean
    MessageListenerAdapter messageListener() {
        return new MessageListenerAdapter(new RedisMessageSubscriber());
    }

    @Bean
    ChannelTopic topic() {
        return new ChannelTopic("topic2");
    }

    @Bean
    RedisMessageListenerContainer redisContainer() {
        RedisMessageListenerContainer container
                = new RedisMessageListenerContainer();
        container.setConnectionFactory(lettuceConnectionFactory());
        container.addMessageListener(messageListener(), topic());
        return container;
    }

}

Вот сервис, который получает сообщение из темы 2:

@Service
public class RedisMessageSubscriber implements MessageListener {
    private final BlockingQueue<String> queue = new LinkedBlockingQueue<>();

    @Override
    public void onMessage(Message message, byte[] pattern) {
        String receivedMessage = new String(message.getBody());
        try {
            queue.put(receivedMessage);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    public String waitForMessage(long timeout, TimeUnit unit) throws InterruptedException {
        return queue.poll(timeout, unit);
    }
}

Вот простой контроллер:

@RestController
public class TestController {
    @Autowired
    private RedisTemplate<String, Object> template;
    @Autowired
    private RedisMessageSubscriber subscriber;

    @GetMapping("/api/v1/public/test")
    public String test() {
        try {
            String message = subscriber.waitForMessage(5, TimeUnit.SECONDS);
            if (message != null) {
                return "Received message from Service B: " + message;
            } else {
                return "Timeout waiting for message from Service B";
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return "Failed to receive message";
        }
    }
}

я уже отредактировал свой вопрос

ginbarca 19.07.2024 05:58

Даже тогда вопрос все еще не очень сфокусирован. "ensure that messages are not lost in the process and handle potential errors or exceptions that might occur during the publish/subscribe operations. Consider edge cases such as what should happen if Service B is down or if it takes too long to process a message." Вы хоть представляете, насколько это широко и расплывчато?

BIBOO unit 19.07.2024 06:00

извините, потому что я не знаю ключевого слова, чтобы описать. я удалил это

ginbarca 19.07.2024 06:03

Все, что я хочу сделать запрос к конечной точке /api-consume в Службе A, служба должна подождать, пока Служба B успешно не опубликует сообщение в теме2. Только тогда Служба А должна вернуть ответ пользователю. Я уже использовал expressjs для Сервиса A и использую eventEmitter. Когда Служба А получила сообщение из темы, я отправлю событие и в контроллере прослушаю это событие. Но при загрузке Spring я не знаю, как это сделать.

ginbarca 19.07.2024 06:05

Да, а что не так с вашим текущим кодом? Это не работает? Как это не работает? Если вы хотите, чтобы оно дождалось публикации сообщения, просто используйте метод poll() без таймаутов.

BIBOO unit 19.07.2024 06:08

Я вызываю метод waitForMessage в контроллере, но когда я вызываю API в почтальоне, запрос был видеовстречей без возврата какого-либо ответа. Он загружается навсегда

ginbarca 19.07.2024 06:25

Можете ли вы поставить точку останова в методе waitForMessage и посмотреть, где он навсегда зависнет во время отладки?

BIBOO unit 19.07.2024 07:24

да. Я поставил точку останова как в контроллере, так и в методе waitForMessage onMessage. Когда я начинаю вызывать API, сначала дело доходит до контроллера -> waitForMessage. В методе waitForMessage очередь пуста, когда я публикую сообщение в redis-cli, в методе waitForMessage ничего не произошло, очередь все еще пуста

ginbarca 19.07.2024 08:19
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
В компьютерном программировании биты играют важнейшую роль в представлении и манипулировании данными на двоичном уровне. Побитовые операции...
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Приходилось ли вам сталкиваться с требованиями, в которых вас могли попросить поднять тревогу или выдать ошибку, когда метод Java занимает больше...
Полный курс Java для разработчиков веб-сайтов и приложений
Полный курс Java для разработчиков веб-сайтов и приложений
Получите сертификат Java Web и Application Developer, используя наш курс.
1
8
56
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Я подозреваю, что у вас есть 2 экземпляра RedisMessageSubscriber. Ваш контроллер вводит @Service аннотированный, но сообщения приходят на:

@Bean
MessageListenerAdapter messageListener() {
    return new MessageListenerAdapter(new RedisMessageSubscriber()); // <--- this one
}

Итак, чтобы использовать только один экземпляр, вам необходимо:

  • Скиньте аннотацию @ServiceRedisMessageSubscriber
  • Немного подкорректируйте RedisConfig вот так:
@Bean
public RedisMessageSubscriber redisMessageSubscriber() {
    return new RedisMessageSubscriber();
} 

@Bean
MessageListenerAdapter messageListener() {
    return new MessageListenerAdapter(redisMessageSubscriber());
}

Но если я не инициализирую RedisMessageSubscriber в MessageListenerAdapter, это будет ошибка, потому что MessageListenerAdapter приму MessageListener

ginbarca 19.07.2024 10:28

Я понимаю. В RedisMessageSubscriber я добавляю @Service, чтобы Spring получил его как боб. Также я инициализирую new RedisMessageSubscriber(), поэтому у меня работает 2 экземпляра. Но как я могу гарантировать, что сообщения не будут потеряны в процессе, и обработать потенциальные ошибки или исключения, которые могут возникнуть во время операций публикации/подписки? Также рассмотрите крайние случаи, например, что должно произойти, если служба B не работает или если обработка сообщения занимает слишком много времени.

ginbarca 19.07.2024 12:17

Другие вопросы по теме

Похожие вопросы

Пытаюсь запустить сервер Minecraft и постоянно получаю сообщение об ошибке: невозможно получить доступ к jarfile
Spring Cloud Gateway с клиентом обнаружения Kubernetes для запросов http и grpc
Невозможно создать массив функций в Java 8
Понимание порядка выполнения с помощью .then(Mono{})
Изменение часового пояса не приводит к тому, что LocalTime.now() возвращает другое значение, почему?
Может ли один и тот же код, использующий HttpClient, скомпилироваться с Spring-5.x и 6.x?
Улучшите качество подписи, извлеченной с помощью OpenCV из отсканированного листа бумаги
Переверните последовательные нули в единицы за k операций, чтобы получить максимальное количество единиц, найдите максимальное количество единиц
Влияет ли расстояние между операторами, отступами и комментариями в коде на его временную и пространственную сложность?
Почему мой код не удаляет zip-файл, созданный моим модульным тестом?