Проблема с подпиской на Observable в RxJava

У меня проблема, когда моя программа не получает результат от метода Observable в subscribe().

Я пытаюсь создать простое консольное приложение, которое запускает запрос на сервер, а затем печатает, чтобы утешить результат. И как я вижу все работает нормально но я никак не могу получить результат в subsribe(). Кажется, что приложение завершается до того, как результат возвращается методу.

Вот мой код, выполняющий запрос:

coffeeShopApi.getCoffeeShops("")
    .subscribeOn(Schedulers.io())
    .subscribe({
         state.onNext(CoffeeShopViewState.CoffeeShopsLoaded(it))
    }, {
         it.printStackTrace()
         state.onNext(CoffeeShopViewState.Error(it.localizedMessage))
    })

После выполнения этого кода программа просто завершается с кодом выхода 0. А также она запускается из основной функции в основном потоке. В чем здесь может быть проблема?

Я предполагаю, что вы подписываетесь на другой поток (Schedulers.io()), и основной поток не ждет завершения/подписки другого потока. Удалите .subscribeOn и попробуйте еще раз - тогда следует использовать основной поток

Kevin Wallis 22.03.2022 18:11

Рекомендуемое чтение: github.com/ReactiveX/RxJava#простые фоновые вычисления

akarnokd 22.03.2022 19:37

@KevinWallis да, ты был прав. Но разве программа не должна ждать, пока IO закончит свою работу? Или все потоки rx работают как потоки демона?

Nick 22.03.2022 20:31

@Ник, это зависит - см. мой ответ. Есть много разных сценариев, иногда он должен ждать, а иногда он должен просто выполнять дорогостоящую работу в фоновом режиме.

Kevin Wallis 23.03.2022 21:10
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
В компьютерном программировании биты играют важнейшую роль в представлении и манипулировании данными на двоичном уровне. Побитовые операции...
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Приходилось ли вам сталкиваться с требованиями, в которых вас могли попросить поднять тревогу или выдать ошибку, когда метод Java занимает больше...
Полный курс Java для разработчиков веб-сайтов и приложений
Полный курс Java для разработчиков веб-сайтов и приложений
Получите сертификат Java Web и Application Developer, используя наш курс.
1
4
50
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Проблема с вашим кодом заключается в том, что вы подписываетесь на другой поток в качестве текущего потока (основного). Таким образом, ваш основной поток завершается до того, как будет вызвана подписка на отдельные элементы.

Вот пример кода с трассировкой стека:

System.out.println(Thread.currentThread().getName() + ": start...");

Observable<String> stream = Observable.fromIterable(List.of("a", "b", "c"));
stream.subscribeOn(Schedulers.io())
      .subscribe(next -> System.out.println(Thread.currentThread().getName() + ": " + next), 
                 error -> error.printStackTrace());

Thread.sleep(1000L);

System.out.println(Thread.currentThread().getName() + ": stop...");

Трассировки стека:

main: start...
RxCachedThreadScheduler-1: a
RxCachedThreadScheduler-1: b
RxCachedThreadScheduler-1: c
main: stop...

Показано, что наблюдаемая создается в потоке main. Выполнение из подписки выполняется в другом потоке RxCachedThreadScheduler-1.

Пример с выполнением в том же потоке:

System.out.println(Thread.currentThread().getName() + ": start...");

Observable<String> stream = Observable.fromIterable(List.of("a", "b", "c"));
stream.subscribe(next -> System.out.println(Thread.currentThread().getName() + ": " + next), 
                 error -> error.printStackTrace());

System.out.println(Thread.currentThread().getName() + ": stop...");

Трассировки стека:

main: start...
main: a
main: b
main: c
main: stop...

В зависимости от варианта использования может потребоваться дождаться завершения выполнения потока перед дальнейшей обработкой метода, например. используя ту же нить или FutureObserver. В других случаях использования аутсорсинг дорогостоящих вычислительных операций в другой поток является возможным решением.


На вопрос, это поток демона? Ответ да. Я прикрепил тестовый код:

String message = String.format("%s-isDaemon?%s: start...", Thread.currentThread().getName(), Thread.currentThread().isDaemon());
System.out.println(message);

Observable<String> stream = Observable.fromIterable(List.of("a", "b", "c"));
stream.subscribeOn(Schedulers.io())
      .subscribe(next -> {
                 final String innerMessage = String.format("%s-isDaemon?%s: %s", Thread.currentThread().getName(), Thread.currentThread().isDaemon(), next);
                 System.out.println(innerMessage);
                 },
                 Throwable::printStackTrace);

Thread.sleep(1000L);

message = String.format("%s-isDaemon?%s: stop...", Thread.currentThread().getName(), Thread.currentThread().isDaemon());
System.out.println(message);

Вывод стека:

main-isDaemon?false: start...
RxCachedThreadScheduler-1-isDaemon?true: a
RxCachedThreadScheduler-1-isDaemon?true: b
RxCachedThreadScheduler-1-isDaemon?true: c
main-isDaemon?false: stop...

Да я вижу. Если мы усыпим основной поток, он завершит все вычисления, и все будет в порядке. Но я до сих пор не понимаю, почему процесс не ждет завершения потока Rx. Насколько я знаю, если мы запускаем в программе несколько потоков, не являющихся демонами, тогда процесс должен дождаться завершения последнего потока, а затем завершиться сам. Так что вся эта ситуация наводит меня на мысль, что Rx использует для работы потоки демона. Это правда или нет?

Nick 24.03.2022 10:56

@Nick Ник, я обновил свой ответ - это поток демона.

Kevin Wallis 24.03.2022 13:38

Другие вопросы по теме