У меня есть две службы: Служба A и Служба B, использующие Redis для связи между службами.
Служба A с использованием Spring загрузки.
Служба B с использованием expressjs (это похоже на потребителя, потребляет сообщение, а затем публикует сообщение, уведомляющее службу A: «Эй, служба A, я закончил»)
Служба А должна предоставить две конечные точки: /api-publish
и /api-consume
. Когда пользователь отправляет запрос к конечной точке /api-publish
, Служба A должна опубликовать сообщение в теме Redis с именем topic1
.
Служба B должна быть настроена на получение сообщений от topic1
. После успешной обработки сообщения от topic1
Служба B должна опубликовать сообщение в другой теме Redis с именем topic2
.
Все, что я хочу сделать — запросить конечную точку /api-consume
в Службе A, служба должна подождать, пока Служба B успешно не опубликует сообщение для topic2
. Только тогда Служба А должна вернуть ответ пользователю.
Вот моя конфигурация Redis в сервисе A:
@Configuration
public class RedisConfig {
@Value("${spring.data.redis.host}")
private String redisHost;
@Value("${spring.data.redis.port}")
private int redisPort;
@Bean
public RedisConnectionFactory lettuceConnectionFactory() {
return new LettuceConnectionFactory(new RedisStandaloneConfiguration(redisHost, redisPort));
}
@Bean
public RedisTemplate<?, ?> redisTemplate() {
RedisTemplate<?, ?> template = new RedisTemplate<>();
template.setConnectionFactory(lettuceConnectionFactory());
template.setHashKeySerializer(new StringRedisSerializer());
template.setKeySerializer(new StringRedisSerializer());
template.setHashValueSerializer(new StringRedisSerializer());
template.setValueSerializer(new StringRedisSerializer());
return template;
}
@Bean
MessageListenerAdapter messageListener() {
return new MessageListenerAdapter(new RedisMessageSubscriber());
}
@Bean
ChannelTopic topic() {
return new ChannelTopic("topic2");
}
@Bean
RedisMessageListenerContainer redisContainer() {
RedisMessageListenerContainer container
= new RedisMessageListenerContainer();
container.setConnectionFactory(lettuceConnectionFactory());
container.addMessageListener(messageListener(), topic());
return container;
}
}
Вот сервис, который получает сообщение из темы 2:
@Service
public class RedisMessageSubscriber implements MessageListener {
private final BlockingQueue<String> queue = new LinkedBlockingQueue<>();
@Override
public void onMessage(Message message, byte[] pattern) {
String receivedMessage = new String(message.getBody());
try {
queue.put(receivedMessage);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
public String waitForMessage(long timeout, TimeUnit unit) throws InterruptedException {
return queue.poll(timeout, unit);
}
}
Вот простой контроллер:
@RestController
public class TestController {
@Autowired
private RedisTemplate<String, Object> template;
@Autowired
private RedisMessageSubscriber subscriber;
@GetMapping("/api/v1/public/test")
public String test() {
try {
String message = subscriber.waitForMessage(5, TimeUnit.SECONDS);
if (message != null) {
return "Received message from Service B: " + message;
} else {
return "Timeout waiting for message from Service B";
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return "Failed to receive message";
}
}
}
Даже тогда вопрос все еще не очень сфокусирован. "ensure that messages are not lost in the process and handle potential errors or exceptions that might occur during the publish/subscribe operations. Consider edge cases such as what should happen if Service B is down or if it takes too long to process a message."
Вы хоть представляете, насколько это широко и расплывчато?
извините, потому что я не знаю ключевого слова, чтобы описать. я удалил это
Все, что я хочу сделать запрос к конечной точке /api-consume в Службе A, служба должна подождать, пока Служба B успешно не опубликует сообщение в теме2. Только тогда Служба А должна вернуть ответ пользователю. Я уже использовал expressjs для Сервиса A и использую eventEmitter. Когда Служба А получила сообщение из темы, я отправлю событие и в контроллере прослушаю это событие. Но при загрузке Spring я не знаю, как это сделать.
Да, а что не так с вашим текущим кодом? Это не работает? Как это не работает? Если вы хотите, чтобы оно дождалось публикации сообщения, просто используйте метод poll()
без таймаутов.
Я вызываю метод waitForMessage в контроллере, но когда я вызываю API в почтальоне, запрос был видеовстречей без возврата какого-либо ответа. Он загружается навсегда
Можете ли вы поставить точку останова в методе waitForMessage и посмотреть, где он навсегда зависнет во время отладки?
да. Я поставил точку останова как в контроллере, так и в методе waitForMessage onMessage. Когда я начинаю вызывать API, сначала дело доходит до контроллера -> waitForMessage. В методе waitForMessage очередь пуста, когда я публикую сообщение в redis-cli
, в методе waitForMessage ничего не произошло, очередь все еще пуста
Я подозреваю, что у вас есть 2 экземпляра RedisMessageSubscriber
.
Ваш контроллер вводит @Service
аннотированный, но сообщения приходят на:
@Bean
MessageListenerAdapter messageListener() {
return new MessageListenerAdapter(new RedisMessageSubscriber()); // <--- this one
}
Итак, чтобы использовать только один экземпляр, вам необходимо:
@Service
RedisMessageSubscriber
RedisConfig
вот так:@Bean
public RedisMessageSubscriber redisMessageSubscriber() {
return new RedisMessageSubscriber();
}
@Bean
MessageListenerAdapter messageListener() {
return new MessageListenerAdapter(redisMessageSubscriber());
}
Но если я не инициализирую RedisMessageSubscriber
в MessageListenerAdapter
, это будет ошибка, потому что MessageListenerAdapter
приму MessageListener
Я понимаю. В RedisMessageSubscriber
я добавляю @Service
, чтобы Spring получил его как боб. Также я инициализирую new RedisMessageSubscriber()
, поэтому у меня работает 2 экземпляра. Но как я могу гарантировать, что сообщения не будут потеряны в процессе, и обработать потенциальные ошибки или исключения, которые могут возникнуть во время операций публикации/подписки? Также рассмотрите крайние случаи, например, что должно произойти, если служба B не работает или если обработка сообщения занимает слишком много времени.
я уже отредактировал свой вопрос