Получение элементов, исходящих из Observable

Вот мой код:

  public static void main(String[] args) {
    Observable.just("747", "737", "777")
        .flatMap(
            a -> {
              try {
                Thread.sleep(5000);
              } catch (InterruptedException e) {
                e.printStackTrace();
              }
              return Observable.just(a).subscribeOn(Schedulers.newThread());
            })
        .subscribe(p -> System.out.println("Received " + p + " on thread " + Thread.currentThread().getName()));
  }

Насколько я понял, каждый из элементов наблюдаемого должен выполняться в отдельном потоке (что действительно происходит), а результаты будут отправлены в тот же поток, который выполнил работу (это тоже происходит). Но я не могу понять, почему поток main не завершается и не ожидает завершения фоновых потоков? Программа продолжается до тех пор, пока работает каждый из фоновых потоков.

Попробуйте проверить, запущен ли какой-либо поток в фоновом режиме, и увеличьте Thread.sleep(5000);, а также уменьшите sleep в фоновом потоке.

I_Al-thamary 21.11.2018 13:54

что произойдет, если вы добавите .dispose() после .subscribe(...)? :)

Shark 21.11.2018 14:53

@i_th единственный запущенный фоновый поток - это основной поток.

Prashant Pandey 21.11.2018 16:48
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
В компьютерном программировании биты играют важнейшую роль в представлении и манипулировании данными на двоичном уровне. Побитовые операции...
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Приходилось ли вам сталкиваться с требованиями, в которых вас могли попросить поднять тревогу или выдать ошибку, когда метод Java занимает больше...
Полный курс Java для разработчиков веб-сайтов и приложений
Полный курс Java для разработчиков веб-сайтов и приложений
Получите сертификат Java Web и Application Developer, используя наш курс.
1
3
47
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Если вы посмотрите на дамп потока, вы увидите, что поток main фактически застрял на операторе sleep. Вот почему он не выходит.

Это потому, что это поток, выполняющий оператор flatMap, поэтому он застревает. Это также является причиной того, что выполнение кода занимает много времени. Вы можете легко проверить это, вставив оператор печати непосредственно перед sleep:

try {
  System.out.println(Thread.currentThread().getName() + " is sleeping");
  Thread.sleep(5000);
} catch (InterruptedException e) {
  e.printStackTrace();
}

Результат будет примерно таким:

main is sleeping
main is sleeping
Received 747 on thread RxNewThreadScheduler-1
main is sleeping
Received 737 on thread RxNewThreadScheduler-2
Received 777 on thread RxNewThreadScheduler-3

Думаю, вы хотели написать что-то вроде этого:

System.out.println(Thread.currentThread().getName()  + " is creating the observable");
Observable.just("747", "737", "777")
        .flatMap(a ->
                 Observable.fromCallable(() -> {
                  try {
                    System.out.println(Thread.currentThread().getName() + " is sleeping");
                    Thread.sleep(5000);
                  } catch (InterruptedException e) {
                    e.printStackTrace();
                  }
                  return a;
                }).subscribeOn(Schedulers.newThread())
      ).subscribe(p -> System.out.println("Received " + p + " on thread " + Thread.currentThread().getName()));
System.out.println(Thread.currentThread().getName() + " is going to exit");

Вывод:

main is creating the observable
main is going to exit
RxNewThreadScheduler-3 is sleeping
RxNewThreadScheduler-2 is sleeping
RxNewThreadScheduler-1 is sleeping
Received 777 on thread RxNewThreadScheduler-3
Received 747 on thread RxNewThreadScheduler-1
Received 737 on thread RxNewThreadScheduler-1

В этой версии main закрывается сразу после создания Observable.

Другие вопросы по теме