Выполнение параллельных вызовов API с использованием java rx.Observable

У меня есть Set<Object>, и для каждой записи в Set я должен сделать вызов API, передав его как параметр. И мне нужно обработать каждый из этих ответов и заполнить другую карту с некоторой собственной логикой.

Пример последовательного выполнения:

List<MyResponse> responses = newArrayList<>();
Set<StoreNode> nodes = // Assume we have a Set
nodes.forEach(storeNode -> responses.add(myAPI.myMethod(storeNode.getId()));
responses.forEach(response -> processResponse(response, myMap); // This is a common map & I have some custom logic to populate this map

Как я могу добиться того же с помощью Observables? Я хочу сделать эти вызовы параллельно и заполнить мою общую карту myMap

Я столкнулся с map (), flatMap () и zip (), но большинство примеров, которые я видел, были простыми, которые не выполняли вызовы API и не обрабатывали их ответ.

Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
В компьютерном программировании биты играют важнейшую роль в представлении и манипулировании данными на двоичном уровне. Побитовые операции...
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Приходилось ли вам сталкиваться с требованиями, в которых вас могли попросить поднять тревогу или выдать ошибку, когда метод Java занимает больше...
Полный курс Java для разработчиков веб-сайтов и приложений
Полный курс Java для разработчиков веб-сайтов и приложений
Получите сертификат Java Web и Application Developer, используя наш курс.
4
0
2 306
1

Ответы 1

Это зависит от того, какую версию RxJava вы используете. Если он старше 2.0.5, то вам нужно сделать flatMap, где вы создадите еще один Observable и убедитесь, что все там размещено. См. этот ответ в StackOverflow.

В противном случае я рекомендую использовать Flowable, а затем вы можете использовать оператор parallel(), который изменяет ваш Flowable на ParallelFlowable.

Итак, вы могли бы сделать это так:

Flowable.fromIterable(nodes)
        .parallel() // you can also specify number of rails here
        .runOn(Schedulers.computation())
        .map(node -> myAPI.myMethod(node.getId()))
        .sequential()
        .subscribe(
                response -> processResponse(response, myMap),
                error -> log(error)
        );

См. Документацию Параллельные потоки для получения дополнительной информации.

Другие вопросы по теме