У меня есть наблюдаемое событие, которое может иметь несколько наблюдателей.
Первый наблюдатель всегда является очищающим кодом, который удаляет все остальные подписки на наблюдаемый объект (это связано с тем, что все остальные наблюдатели могут быть добавлены позже некоторыми другими частями кода).
CompositeDisposable disposable = new CompositeDisposable();
Observable<Object> eventObservable = Observable.just(1);
disposable.add(eventObservable.subscribe(event -> {
disposable.dispose();
System.out.println("Observer 1");
}));
disposable.add(eventObservable.subscribe(event -> {
System.out.println("Observer 2");
}));
disposable.add(eventObservable.subscribe(event -> {
System.out.println("Observer 3");
}));
Текущий выход:
Observer 1
Ожидаемый результат:
Observer 2
Observer 3
Observer 1
Один из способов добиться этого — добавить delay для первого наблюдателя. Но это не похоже на правильный способ сделать это.
Можно ли правильно гарантировать, что первый наблюдатель всегда вызывается после вызова других существующих наблюдателей?




Сделайте шаг назад; вам не нужно заниматься ручным удалением всех промежуточных этапов.
Observable<Object> eventObservable = Observable.just(1).take(1).share();
Это автоматически прекратится после 1 события, и это будет распространено на все подписки на eventObservable. Нет необходимости в ручной очистке.
Вы не можете задержаться в обработчике подписки, так как это нарушает асинхронную гарантию реактивных потоков.
Спасибо за ваш ответ. Но я все же хотел бы знать, возможно ли с учетом настроек, которые я предоставил, отложить выполнение первого наблюдателя. Это довольно легко сделать в RxJS из-за одного потока и цикла событий, но я не нашел способа сделать это в RxJS. Я приму ваш текущий ответ, если вы не думаете, что это возможно.