Как связать несколько одновременных rx-java Single

Я изучаю Rx-Java2 с Vert.x, и я хотел бы связать получение успешной конфигурации с некоторыми параллельными задачами.

Я создал метод, который ищет конфигурацию и возвращает одну подписку на нее, и он отлично работает. Но сомневаюсь, где и как вызывать последующие задачи:

   public void start(Future<Void> startFuture) throws Exception {
      Single<JsonObject> configSingle = prepareConfigurationAsync();
      configSingle.subscribe(onSuccess -> {
        System.out.println(onSuccess);
        -->   Single<Boolean> task1 = prepareLongAsyncTask1(onSuccess).subscribe(...); 
        -->   Completable task2 = prepareLongAsyncTask2(onSuccess)..subscribe(...); 

    }, onError -> {
        startFuture.fail(onError);
    }));

То, как я это сделал, похоже, работает, но без параллелизма. как я мог этого добиться?

Как и где мне разместить эти подписки?

В Начиная есть раздел о зависимых потоках и продолжениях; это может помочь вам начать работу и оттуда работать.

akarnokd 03.04.2018 14:44
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
В компьютерном программировании биты играют важнейшую роль в представлении и манипулировании данными на двоичном уровне. Побитовые операции...
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Приходилось ли вам сталкиваться с требованиями, в которых вас могли попросить поднять тревогу или выдать ошибку, когда метод Java занимает больше...
Полный курс Java для разработчиков веб-сайтов и приложений
Полный курс Java для разработчиков веб-сайтов и приложений
Получите сертификат Java Web и Application Developer, используя наш курс.
0
1
709
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Переход к другому источнику обычно осуществляется через flatMap. Параллельная работа часто выполняется с помощью zip или merge. В вашем случае я не думаю, что вам нужно значение внутреннего Single как часть вывода, поэтому вы можете попробовать это:

 Completable config = prepareConfigurationAsync()
     .flatMapCompletable(success ->
         System.out.println(success);
         return Completable.mergeArray (
             prepareLongAsyncTask1(success)
                 .doOnSuccess(innerSuccess -> /* ... */)
                 .toCompletable(),
             prepareLongAsyncTask2(success)
                 .doOnComplete(() ->  /* ... */)
         )
      );

  config
  .subscribe( () -> /* completed */, error -> /* error'd */);

Вау, очень интересно ... Подписка внутренних задач выполняется "автоматически" оператором слияния?

Cristiano 03.04.2018 15:39

Да. Всякий раз, когда вы чувствуете, что хотите подписаться на источник в методе subscribe или doOnX, вам нужно чаще думать о merge, zip или flatMap.

akarnokd 03.04.2018 15:42

Привет, акарнокд, что должен делать .ignoreElement()? У меня ошибка компиляции: The method ignoreElement() is undefined for the type Single<Boolean>. Когда я его удалил, то в методе merge() возникла ошибка. Мне нужно преобразовать внутренний Single в Completable?

Cristiano 03.04.2018 16:12

Правильно, в toCompletable он называется Single.

akarnokd 03.04.2018 16:21

Да, мне нужно было преобразовать сингл с помощью .toCompletable() и вместо этого использовать Completable.mergeArray.

Cristiano 03.04.2018 16:22

Другие вопросы по теме