Я изучаю Rx-Java2 с Vert.x, и я хотел бы связать получение успешной конфигурации с некоторыми параллельными задачами.
Я создал метод, который ищет конфигурацию и возвращает одну подписку на нее, и он отлично работает. Но сомневаюсь, где и как вызывать последующие задачи:
public void start(Future<Void> startFuture) throws Exception {
Single<JsonObject> configSingle = prepareConfigurationAsync();
configSingle.subscribe(onSuccess -> {
System.out.println(onSuccess);
--> Single<Boolean> task1 = prepareLongAsyncTask1(onSuccess).subscribe(...);
--> Completable task2 = prepareLongAsyncTask2(onSuccess)..subscribe(...);
}, onError -> {
startFuture.fail(onError);
}));
То, как я это сделал, похоже, работает, но без параллелизма. как я мог этого добиться?
Как и где мне разместить эти подписки?




Переход к другому источнику обычно осуществляется через flatMap. Параллельная работа часто выполняется с помощью zip или merge. В вашем случае я не думаю, что вам нужно значение внутреннего Single как часть вывода, поэтому вы можете попробовать это:
Completable config = prepareConfigurationAsync()
.flatMapCompletable(success ->
System.out.println(success);
return Completable.mergeArray (
prepareLongAsyncTask1(success)
.doOnSuccess(innerSuccess -> /* ... */)
.toCompletable(),
prepareLongAsyncTask2(success)
.doOnComplete(() -> /* ... */)
)
);
config
.subscribe( () -> /* completed */, error -> /* error'd */);
Вау, очень интересно ... Подписка внутренних задач выполняется "автоматически" оператором слияния?
Да. Всякий раз, когда вы чувствуете, что хотите подписаться на источник в методе subscribe или doOnX, вам нужно чаще думать о merge, zip или flatMap.
Привет, акарнокд, что должен делать .ignoreElement()? У меня ошибка компиляции: The method ignoreElement() is undefined for the type Single<Boolean>. Когда я его удалил, то в методе merge() возникла ошибка. Мне нужно преобразовать внутренний Single в Completable?
Правильно, в toCompletable он называется Single.
Да, мне нужно было преобразовать сингл с помощью .toCompletable() и вместо этого использовать Completable.mergeArray.
В Начиная есть раздел о зависимых потоках и продолжениях; это может помочь вам начать работу и оттуда работать.