Реактивные потоки: spring webflux - подпишитесь на существующего издателя

В настоящее время я переношу нашу существующую асинхронную архитектуру REST Spring на новую библиотеку WebFlux Spring, и у меня есть вопрос по объединению нескольких запросов, чтобы они могли прослушивать один и тот же опубликованный ответ.

Пример использования выглядит следующим образом:

  1. Клиент A подключается к нашему веб-серверу и запрашивает данные
  2. Мы обращаемся к нашему кешу, чтобы проверить, есть ли там данные
  3. Мы этого не делаем, поэтому мы идем и получаем эти данные (клиент A подписался и ждет ответа)
  4. Клиент B подключается к нашему веб-серверу и запрашивает те же данные (обращается к той же конечной точке)
  5. Проверяем кеш, данных все равно нет
  6. Поскольку мы уже получаем эти данные для клиента A, мы не хотим делать еще один запрос, однако мы также не хотим отказываться от клиента B. Клиент B должен иметь возможность прослушивать ту же информацию

Как клиент B может подписаться на тот же поток ответов, которого ожидает клиент A?

Не могли бы вы дать фрагменты кода, чтобы показать, как выглядит API библиотеки кеширования? Кроме того, каково здесь ожидаемое поведение? Я предполагаю, что кеш может не содержать необработанный ответ для отправки клиенту, поэтому речь идет о совместном использовании кешированных данных на уровне службы, а не обязательно на веб-уровне?

Brian Clozel 13.04.2018 19:18

Мне просто интересно, возможно ли это с реактивными паттернами. Кэш используется в примере, чтобы показать, почему запрос может не получить немедленный ответ. Мне интересно, если два запроса попадают в одну и ту же конечную точку, возможно ли с помощью WebFlux присоединиться к ним и позволить им обоим ждать одного и того же ответа? Очевидно, этого нельзя сделать с классическими REST и Spring, но я надеялся, что такое поведение возможно с реактивными потоками.

wild_nothing 14.04.2018 12:42
4
2
2 304
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

«Клиент А подписался и ждет ответа» Я полагаю, что запрос закодирован как Mono, и клиент A буквально подписывается на него:

Subscriber<Response> clientA = ... Mono<Response> request = makeRequest(...); request.subscribe(clientA);

тогда clientB должен подписаться таким же образом:

Subscriber<Response> clientB = ... request.subscribe(clientB);

Причем в кеше должны находиться не ранее сохраненные данные ответа, а сами запросы типа Mono<Response>. Затем, если такой запрос находится в кеше, новые клиенты просто подписываются на него, независимо от того, был ли этот запрос уже выполнен или нет.

«Кэш» в этом примере - это просто очень легкая структура данных Hazelcast IMap. Он заполняется данными из внешнего источника тогда и только тогда, когда упомянутых данных там еще нет. Я пытаюсь сделать так, чтобы вызов внешних данных (и заполнение кеша) не выполнялся дважды самым элегантным образом. Есть множество способов решить эту проблему, но я хотел бы знать, смогу ли я решить ее простым способом с помощью WebFlux - моя идея состоит в том, чтобы два запроса прослушивали одну и ту же последовательность событий. Я не могу удалить существующий IMap, но вы думаете, мне следует добавить новый, содержащий Mono <R>?

wild_nothing 15.04.2018 23:09

Так же, как процессоры имеют несколько уровней кеширования, вы можете добавить еще один уровень кеша для текущих запросов, содержащих Mono <R>, не касаясь IMap. Это может быть простой HashMap.

Alexei Kaigorodov 16.04.2018 00:25

Интересная идея. Спасибо.

wild_nothing 16.04.2018 09:41

Вы можете просто переместить полученный ответ из кеша для Mono в основной кеш: request.subscribe(response)->{ imapCache.put(response); responseCache.remove(request); }

Alexei Kaigorodov 17.04.2018 13:48

Другие вопросы по теме