Эквивалент VertX CompositeFuture в RxJava

Пример VertX, когда вам нужно запросить несколько асинхронных ресурсов и использовать их все за одну операцию:

Future<HttpServer> httpServerFuture = Future.future();
httpServer.listen(httpServerFuture.completer());

Future<NetServer> netServerFuture = Future.future();
netServer.listen(netServerFuture.completer());

CompositeFuture.all(httpServerFuture, netServerFuture).setHandler(ar -> {
  if (ar.succeeded()) {
    // All servers started
  } else {
    // At least one server failed
  }
});

Нам нужно запросить две разные базы данных, а затем использовать результаты в бизнес-логике, но поток эквивалентен.

Что эквивалентно VertX / RxJava?

В настоящее время люди делают это, вставляя новый вызов .flatMap () каждый раз, когда им нужна новая переменная. У меня осталось чувство, что должен быть способ получше ...

На самом деле нам не нужно, чтобы запросы выполнялись одновременно, но нам нужно кэшировать оба результата и каким-то образом передавать их бизнес-логике одновременно.

Вы можете использовать CompletableFuture.allOf(CompleatableFuture<?>...): CompletableFuture<Void>? Это будет стандартная Java 8.

Oleg Sklyar 03.05.2018 16:52

Как бы вы использовали это в контексте VertX / RxJava?

Sparky 03.05.2018 16:54

Я не понимаю, что вы подразумеваете под «в контексте», и на самом деле я не знаю, почему у Vert.x все еще есть свое будущее даже после перехода на Java 8. Но вы можете завершить CompletableFutures в асинхронных обратных вызовах vertx, а затем запустить ...allOf(futures.toArray(new CompletableFuture[0])).thenApply($ -> (action on success)).exceptionally(t -> (action on exception if needed)), это просто стандартные операции CompletableFuture. Учитывая, что это все асинхронно, вы должны иметь возможность делать то же самое с RxJava, если знаете, какое будущее завершить.

Oleg Sklyar 03.05.2018 17:00
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
В компьютерном программировании биты играют важнейшую роль в представлении и манипулировании данными на двоичном уровне. Побитовые операции...
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Приходилось ли вам сталкиваться с требованиями, в которых вас могли попросить поднять тревогу или выдать ошибку, когда метод Java занимает больше...
Полный курс Java для разработчиков веб-сайтов и приложений
Полный курс Java для разработчиков веб-сайтов и приложений
Получите сертификат Java Web и Application Developer, используя наш курс.
0
3
443
1

Ответы 1

есть много способов сделать это, но я попытался выбрать подход, близкий к вашему образцу:

@Override
public void start(Future<Void> startFuture) throws Exception {
    final HttpServer httpServer = vertx.createHttpServer();
    final Completable initializeHttpServer = httpServer.rxListen().toCompletable();

    final NetServer netServer = vertx.createNetServer();
    final Completable initializeNetServer = netServer.rxListen().toCompletable();

    initializeHttpServer.andThen(initializeNetServer)
        .subscribe(
            ()    -> { /* All servers started */ },
            error -> { /* At least one server failed */ }
        );
}

Вызовы rxListen() преобразуются в экземпляры Completable, которые затем запускаются последовательно после подписки.

  • обратный вызов onComplete абонента будет вызван, когда оба сервера завершат привязку к своим соответствующим портам, или ...
  • обратный вызов onError будет вызван, если произойдет исключение

(также, fwiw, «вложение» операций flatMap для чего-то столь тривиального, как это не должно быть необходимым. «связывание» таких операций, однако, было бы идиоматическим использованием).

надеюсь, это поможет!

--ОБНОВИТЬ--

прочитав вопрос более внимательно, я теперь вижу, что вы действительно спрашивали о том, как обрабатывать результаты двух дискретных асинхронных операций.

Альтернативой flatMap для комбинирования результатов было бы использование оператора zip, например:

    @Override
    public void start(Future<Void> startFuture) throws Exception {
        final Single<String> dbQuery1 = Single.fromCallable(() -> { return "db-query-result-1"; });
        final Single<String> dbQuery2 = Single.fromCallable(() -> { return "db-query-result-2"; });

        Single.zip(dbQuery1, dbQuery2, (result1, result2) -> {
            // handle the results from both db queries
            // (with Pair being a standard tuple-like class)
           return new Pair(result1, result2);
        })
            .subscribe(
                pair -> {
                    // handle the results
                },
                error -> {
                    // something went wrong
                }
            );
    }

согласно документы, zip позволяет вам указать серию реактивных типов (Single, Observable и т. д.) вместе с функцией для преобразования всех результатов сразу, при этом основная идея заключается в том, что он не будет испускать ничего, пока все источники не испускают один раз (или больше, в зависимости от типа реакции).

Другие вопросы по теме