Вот мой код:
public static void main(String[] args) {
Observable.just("747", "737", "777")
.flatMap(
a -> {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return Observable.just(a).subscribeOn(Schedulers.newThread());
})
.subscribe(p -> System.out.println("Received " + p + " on thread " + Thread.currentThread().getName()));
}
Насколько я понял, каждый из элементов наблюдаемого должен выполняться в отдельном потоке (что действительно происходит), а результаты будут отправлены в тот же поток, который выполнил работу (это тоже происходит). Но я не могу понять, почему поток main не завершается и не ожидает завершения фоновых потоков? Программа продолжается до тех пор, пока работает каждый из фоновых потоков.
что произойдет, если вы добавите .dispose() после .subscribe(...)? :)
@i_th единственный запущенный фоновый поток - это основной поток.




Если вы посмотрите на дамп потока, вы увидите, что поток main фактически застрял на операторе sleep. Вот почему он не выходит.
Это потому, что это поток, выполняющий оператор flatMap, поэтому он застревает. Это также является причиной того, что выполнение кода занимает много времени. Вы можете легко проверить это, вставив оператор печати непосредственно перед sleep:
try {
System.out.println(Thread.currentThread().getName() + " is sleeping");
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
Результат будет примерно таким:
main is sleeping
main is sleeping
Received 747 on thread RxNewThreadScheduler-1
main is sleeping
Received 737 on thread RxNewThreadScheduler-2
Received 777 on thread RxNewThreadScheduler-3
Думаю, вы хотели написать что-то вроде этого:
System.out.println(Thread.currentThread().getName() + " is creating the observable");
Observable.just("747", "737", "777")
.flatMap(a ->
Observable.fromCallable(() -> {
try {
System.out.println(Thread.currentThread().getName() + " is sleeping");
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return a;
}).subscribeOn(Schedulers.newThread())
).subscribe(p -> System.out.println("Received " + p + " on thread " + Thread.currentThread().getName()));
System.out.println(Thread.currentThread().getName() + " is going to exit");
Вывод:
main is creating the observable
main is going to exit
RxNewThreadScheduler-3 is sleeping
RxNewThreadScheduler-2 is sleeping
RxNewThreadScheduler-1 is sleeping
Received 777 on thread RxNewThreadScheduler-3
Received 747 on thread RxNewThreadScheduler-1
Received 737 on thread RxNewThreadScheduler-1
В этой версии main закрывается сразу после создания Observable.
Попробуйте проверить, запущен ли какой-либо поток в фоновом режиме, и увеличьте
Thread.sleep(5000);, а также уменьшитеsleepв фоновом потоке.