Я пытаюсь понять, как выполняются Observables, но не могу заставить этот простой код работать.
public class RxJavaExample {
public static void main(String[] args) {
Observable<String> hello = Observable.fromCallable(() ->
getHello()).subscribeOn(Schedulers.newThread());
hello.subscribe();
System.out.println("End of main!");
}
public static String getHello() {
System.out.println("Hello called in " +
Thread.currentThread().getName());
return "Hello";
}
}
Разве hello.subscribe() не следует выполнять getHello()?
@SantanuSur Нет, никогда не звонили.




Это потому, что ваш основной поток завершается до того, как фоновый поток доберется до getHello. Попробуйте добавить Thread.sleep(5000) в метод main перед выходом.
В качестве альтернативы подождите, пока не будет вызвана onCompleted вашей подписки.
Обновлено: Причина, по которой программа завершается, заключается в том, что RxJava порождает потоки демон. В поисках хорошего источника я также нашел вопрос это, который, вероятно, также отвечает на него.
Я не думаю, что программа Java завершится, пока не будут завершены все потоки. Более того, я поставил точку останова в методе getHello(), он так и не был вызван.
Почему бы вам не попробовать это вместо того, чтобы предположить, что вы правы и знаете, как это работает? Пожалуйста, посмотрите на мои правки. Может быть, также взглянуть на stackoverflow.com/questions/2213340/…
Java успешно завершает работу, не дожидаясь завершения потоков, поэтому у нас есть ExecutorService.awaitTermination.
LeffeBrune, я изначально не знал, что RxJava создает потоки демона, пока @sfiss не указал на это. Спасибо вам обоим.
@sfiss прав, это работает так, как и следовало ожидать:
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;
public class RxJavaExample {
public static void main(String[] args) throws InterruptedException {
ExecutorService exec = Executors.newCachedThreadPool();
Observable<String> hello = Observable.fromCallable(() -> getHello())
.subscribeOn(Schedulers.from(exec));
hello.subscribe();
System.out.println("End of main!");
exec.shutdown();
exec.awaitTermination(10, TimeUnit.SECONDS);
}
public static String getHello() {
System.out.println("Hello called in " + Thread.currentThread().getName());
return "Hello";
}
}
Со следующим выводом:
End of main!
Hello called in pool-1-thread-1
Принять это через @sfiss, поскольку этот ответ более полный.
Это не распространяется на наблюдаемые только потоки и, как таковые, фактически не отвечает на вопрос заголовка. Кажется, есть путаница с потоками и наблюдаемыми
@ TheresaForster, хотя технически вы правы, проблема, с которой столкнулся OP, была связана с асинхронным характером подписчика.
Похоже, мы также можем сделать подписку на блокировку -- subscribe(Observable::blockingSubscribe)
Возможно, вы путаетесь между потоками и наблюдаемыми,
В прошлом я использовал Observables для таймера в плагине Minecraft, у меня есть событие, которое запускается каждую минуту.
public class TimerHandler extends Observable implements Runnable{
@Override
public void run() {
this.setChanged();
this.notifyObservers();
}
}
Таким образом, это срабатывает каждую минуту, а затем, чтобы добавить события в очередь таймера, вы просто подписываетесь на наблюдаемое, что означает, что подписанные вызовы запускаются каждую минуту.
public class PlotTimer implements Observer {
@Override
public void update(Observable o, Object arg) {
......
чтобы подписаться, я вызываю следующее
getServer().getScheduler().scheduleAsyncRepeatingTask(this,timerHandler,1200,1200);
timerHandler.addObserver(new PayDayTimer());
timerHandler.addObserver(new ProfileTimer());
timerHandler.addObserver(new PlotTimer());
Вопрос не в java.util.Observable, а в RxJava. И мой, и принятый ответ подробно описывают, как RxJava использует потоки демона и как предотвратить преждевременное завершение работы.
@ Theresa AFAIK рекомендуемый способ - использовать теги для указания технологии вместо упоминания в заголовке.
разве
getHello()не казнен?