Пример VertX, когда вам нужно запросить несколько асинхронных ресурсов и использовать их все за одну операцию:
Future<HttpServer> httpServerFuture = Future.future();
httpServer.listen(httpServerFuture.completer());
Future<NetServer> netServerFuture = Future.future();
netServer.listen(netServerFuture.completer());
CompositeFuture.all(httpServerFuture, netServerFuture).setHandler(ar -> {
if (ar.succeeded()) {
// All servers started
} else {
// At least one server failed
}
});
Нам нужно запросить две разные базы данных, а затем использовать результаты в бизнес-логике, но поток эквивалентен.
Что эквивалентно VertX / RxJava?
В настоящее время люди делают это, вставляя новый вызов .flatMap () каждый раз, когда им нужна новая переменная. У меня осталось чувство, что должен быть способ получше ...
На самом деле нам не нужно, чтобы запросы выполнялись одновременно, но нам нужно кэшировать оба результата и каким-то образом передавать их бизнес-логике одновременно.
Как бы вы использовали это в контексте VertX / RxJava?
Я не понимаю, что вы подразумеваете под «в контексте», и на самом деле я не знаю, почему у Vert.x все еще есть свое будущее даже после перехода на Java 8. Но вы можете завершить CompletableFutures в асинхронных обратных вызовах vertx, а затем запустить ...allOf(futures.toArray(new CompletableFuture[0])).thenApply($ -> (action on success)).exceptionally(t -> (action on exception if needed)), это просто стандартные операции CompletableFuture. Учитывая, что это все асинхронно, вы должны иметь возможность делать то же самое с RxJava, если знаете, какое будущее завершить.




есть много способов сделать это, но я попытался выбрать подход, близкий к вашему образцу:
@Override
public void start(Future<Void> startFuture) throws Exception {
final HttpServer httpServer = vertx.createHttpServer();
final Completable initializeHttpServer = httpServer.rxListen().toCompletable();
final NetServer netServer = vertx.createNetServer();
final Completable initializeNetServer = netServer.rxListen().toCompletable();
initializeHttpServer.andThen(initializeNetServer)
.subscribe(
() -> { /* All servers started */ },
error -> { /* At least one server failed */ }
);
}
Вызовы rxListen() преобразуются в экземпляры Completable, которые затем запускаются последовательно после подписки.
onComplete абонента будет вызван, когда оба сервера завершат привязку к своим соответствующим портам, или ...onError будет вызван, если произойдет исключение(также, fwiw, «вложение» операций flatMap для чего-то столь тривиального, как это не должно быть необходимым. «связывание» таких операций, однако, было бы идиоматическим использованием).
надеюсь, это поможет!
--ОБНОВИТЬ--
прочитав вопрос более внимательно, я теперь вижу, что вы действительно спрашивали о том, как обрабатывать результаты двух дискретных асинхронных операций.
Альтернативой flatMap для комбинирования результатов было бы использование оператора zip, например:
@Override
public void start(Future<Void> startFuture) throws Exception {
final Single<String> dbQuery1 = Single.fromCallable(() -> { return "db-query-result-1"; });
final Single<String> dbQuery2 = Single.fromCallable(() -> { return "db-query-result-2"; });
Single.zip(dbQuery1, dbQuery2, (result1, result2) -> {
// handle the results from both db queries
// (with Pair being a standard tuple-like class)
return new Pair(result1, result2);
})
.subscribe(
pair -> {
// handle the results
},
error -> {
// something went wrong
}
);
}
согласно документы, zip позволяет вам указать серию реактивных типов (Single, Observable и т. д.) вместе с функцией для преобразования всех результатов сразу, при этом основная идея заключается в том, что он не будет испускать ничего, пока все источники не испускают один раз (или больше, в зависимости от типа реакции).
Вы можете использовать
CompletableFuture.allOf(CompleatableFuture<?>...): CompletableFuture<Void>? Это будет стандартная Java 8.