Подписаться на Observable ничего не делает

Я пытаюсь понять, как выполняются Observables, но не могу заставить этот простой код работать.

public class RxJavaExample {
    public static void main(String[] args) {
        Observable<String> hello = Observable.fromCallable(() -> 
            getHello()).subscribeOn(Schedulers.newThread());

        hello.subscribe();

        System.out.println("End of main!");
    }

    public static String getHello() {
        System.out.println("Hello called in " + 
            Thread.currentThread().getName());
        return "Hello";
    }
}

Разве hello.subscribe() не следует выполнять getHello()?

разве getHello() не казнен?

Santanu Sur 02.07.2019 07:44

@SantanuSur Нет, никогда не звонили.

Krishnaraj 02.07.2019 08:00
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
В компьютерном программировании биты играют важнейшую роль в представлении и манипулировании данными на двоичном уровне. Побитовые операции...
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Приходилось ли вам сталкиваться с требованиями, в которых вас могли попросить поднять тревогу или выдать ошибку, когда метод Java занимает больше...
Полный курс Java для разработчиков веб-сайтов и приложений
Полный курс Java для разработчиков веб-сайтов и приложений
Получите сертификат Java Web и Application Developer, используя наш курс.
0
2
247
3
Перейти к ответу Данный вопрос помечен как решенный

Ответы 3

Это потому, что ваш основной поток завершается до того, как фоновый поток доберется до getHello. Попробуйте добавить Thread.sleep(5000) в метод main перед выходом.

В качестве альтернативы подождите, пока не будет вызвана onCompleted вашей подписки.

Обновлено: Причина, по которой программа завершается, заключается в том, что RxJava порождает потоки демон. В поисках хорошего источника я также нашел вопрос это, который, вероятно, также отвечает на него.

Я не думаю, что программа Java завершится, пока не будут завершены все потоки. Более того, я поставил точку останова в методе getHello(), он так и не был вызван.

Krishnaraj 02.07.2019 08:01

Почему бы вам не попробовать это вместо того, чтобы предположить, что вы правы и знаете, как это работает? Пожалуйста, посмотрите на мои правки. Может быть, также взглянуть на stackoverflow.com/questions/2213340/…

sfiss 02.07.2019 08:17

Java успешно завершает работу, не дожидаясь завершения потоков, поэтому у нас есть ExecutorService.awaitTermination.

LeffeBrune 02.07.2019 08:24

LeffeBrune, я изначально не знал, что RxJava создает потоки демона, пока @sfiss не указал на это. Спасибо вам обоим.

Krishnaraj 02.07.2019 08:42
Ответ принят как подходящий

@sfiss прав, это работает так, как и следовало ожидать:

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;

public class RxJavaExample {
  public static void main(String[] args) throws InterruptedException {
    ExecutorService exec = Executors.newCachedThreadPool();
    Observable<String> hello = Observable.fromCallable(() -> getHello())
        .subscribeOn(Schedulers.from(exec));

    hello.subscribe();

    System.out.println("End of main!");

    exec.shutdown();
    exec.awaitTermination(10, TimeUnit.SECONDS);
  }

  public static String getHello() {
    System.out.println("Hello called in " + Thread.currentThread().getName());
    return "Hello";
  }
}

Со следующим выводом:

End of main!
Hello called in pool-1-thread-1

Принять это через @sfiss, поскольку этот ответ более полный.

Krishnaraj 02.07.2019 08:44

Это не распространяется на наблюдаемые только потоки и, как таковые, фактически не отвечает на вопрос заголовка. Кажется, есть путаница с потоками и наблюдаемыми

Theresa Forster 02.07.2019 09:19

@ TheresaForster, хотя технически вы правы, проблема, с которой столкнулся OP, была связана с асинхронным характером подписчика.

LeffeBrune 03.07.2019 02:11

Похоже, мы также можем сделать подписку на блокировку -- subscribe(Observable::blockingSubscribe)

Krishnaraj 06.07.2019 17:36

Возможно, вы путаетесь между потоками и наблюдаемыми,

В прошлом я использовал Observables для таймера в плагине Minecraft, у меня есть событие, которое запускается каждую минуту.

public class TimerHandler extends Observable implements Runnable{

    @Override
    public void run() {
        this.setChanged();
        this.notifyObservers();
    }
}

Таким образом, это срабатывает каждую минуту, а затем, чтобы добавить события в очередь таймера, вы просто подписываетесь на наблюдаемое, что означает, что подписанные вызовы запускаются каждую минуту.

public class PlotTimer implements Observer {

    @Override
    public void update(Observable o, Object arg) {
        ......

чтобы подписаться, я вызываю следующее

getServer().getScheduler().scheduleAsyncRepeatingTask(this,timerHandler,1200,1200);
timerHandler.addObserver(new PayDayTimer());
timerHandler.addObserver(new ProfileTimer());
timerHandler.addObserver(new PlotTimer());

Вопрос не в java.util.Observable, а в RxJava. И мой, и принятый ответ подробно описывают, как RxJava использует потоки демона и как предотвратить преждевременное завершение работы.

sfiss 02.07.2019 09:38

@ Theresa AFAIK рекомендуемый способ - использовать теги для указания технологии вместо упоминания в заголовке.

Krishnaraj 02.07.2019 15:06

Другие вопросы по теме