У меня проблема, когда моя программа не получает результат от метода Observable
в subscribe()
.
Я пытаюсь создать простое консольное приложение, которое запускает запрос на сервер, а затем печатает, чтобы утешить результат. И как я вижу все работает нормально но я никак не могу получить результат в subsribe()
. Кажется, что приложение завершается до того, как результат возвращается методу.
Вот мой код, выполняющий запрос:
coffeeShopApi.getCoffeeShops("")
.subscribeOn(Schedulers.io())
.subscribe({
state.onNext(CoffeeShopViewState.CoffeeShopsLoaded(it))
}, {
it.printStackTrace()
state.onNext(CoffeeShopViewState.Error(it.localizedMessage))
})
После выполнения этого кода программа просто завершается с кодом выхода 0. А также она запускается из основной функции в основном потоке. В чем здесь может быть проблема?
Рекомендуемое чтение: github.com/ReactiveX/RxJava#простые фоновые вычисления
@KevinWallis да, ты был прав. Но разве программа не должна ждать, пока IO закончит свою работу? Или все потоки rx работают как потоки демона?
@Ник, это зависит - см. мой ответ. Есть много разных сценариев, иногда он должен ждать, а иногда он должен просто выполнять дорогостоящую работу в фоновом режиме.
Проблема с вашим кодом заключается в том, что вы подписываетесь на другой поток в качестве текущего потока (основного). Таким образом, ваш основной поток завершается до того, как будет вызвана подписка на отдельные элементы.
Вот пример кода с трассировкой стека:
System.out.println(Thread.currentThread().getName() + ": start...");
Observable<String> stream = Observable.fromIterable(List.of("a", "b", "c"));
stream.subscribeOn(Schedulers.io())
.subscribe(next -> System.out.println(Thread.currentThread().getName() + ": " + next),
error -> error.printStackTrace());
Thread.sleep(1000L);
System.out.println(Thread.currentThread().getName() + ": stop...");
Трассировки стека:
main: start...
RxCachedThreadScheduler-1: a
RxCachedThreadScheduler-1: b
RxCachedThreadScheduler-1: c
main: stop...
Показано, что наблюдаемая создается в потоке main
. Выполнение из подписки выполняется в другом потоке RxCachedThreadScheduler-1
.
Пример с выполнением в том же потоке:
System.out.println(Thread.currentThread().getName() + ": start...");
Observable<String> stream = Observable.fromIterable(List.of("a", "b", "c"));
stream.subscribe(next -> System.out.println(Thread.currentThread().getName() + ": " + next),
error -> error.printStackTrace());
System.out.println(Thread.currentThread().getName() + ": stop...");
Трассировки стека:
main: start...
main: a
main: b
main: c
main: stop...
В зависимости от варианта использования может потребоваться дождаться завершения выполнения потока перед дальнейшей обработкой метода, например. используя ту же нить или FutureObserver
. В других случаях использования аутсорсинг дорогостоящих вычислительных операций в другой поток является возможным решением.
На вопрос, это поток демона? Ответ да. Я прикрепил тестовый код:
String message = String.format("%s-isDaemon?%s: start...", Thread.currentThread().getName(), Thread.currentThread().isDaemon());
System.out.println(message);
Observable<String> stream = Observable.fromIterable(List.of("a", "b", "c"));
stream.subscribeOn(Schedulers.io())
.subscribe(next -> {
final String innerMessage = String.format("%s-isDaemon?%s: %s", Thread.currentThread().getName(), Thread.currentThread().isDaemon(), next);
System.out.println(innerMessage);
},
Throwable::printStackTrace);
Thread.sleep(1000L);
message = String.format("%s-isDaemon?%s: stop...", Thread.currentThread().getName(), Thread.currentThread().isDaemon());
System.out.println(message);
Вывод стека:
main-isDaemon?false: start...
RxCachedThreadScheduler-1-isDaemon?true: a
RxCachedThreadScheduler-1-isDaemon?true: b
RxCachedThreadScheduler-1-isDaemon?true: c
main-isDaemon?false: stop...
Да я вижу. Если мы усыпим основной поток, он завершит все вычисления, и все будет в порядке. Но я до сих пор не понимаю, почему процесс не ждет завершения потока Rx. Насколько я знаю, если мы запускаем в программе несколько потоков, не являющихся демонами, тогда процесс должен дождаться завершения последнего потока, а затем завершиться сам. Так что вся эта ситуация наводит меня на мысль, что Rx использует для работы потоки демона. Это правда или нет?
@Nick Ник, я обновил свой ответ - это поток демона.
Я предполагаю, что вы подписываетесь на другой поток (Schedulers.io()), и основной поток не ждет завершения/подписки другого потока. Удалите .subscribeOn и попробуйте еще раз - тогда следует использовать основной поток