Rxkotlin - неправильная подписка на подписку, наблюдение за тем, что тема неактивна?

У меня есть объект, который генерирует разные строки в случайные моменты времени, и мне нужно sudscribe на этот генератор, чтобы взять эти строки и предоставить их пользовательскому интерфейсу (возможно, это будет несколько подписчиков в разных действиях). Допустим, у меня есть такой код:

генератор:

class Generator {

    private var stringToGenerate = ""

    var subject: BehaviorSubject<String> = BehaviorSubject.create<String>()

    init {
        //seems like these instructions are skipped
        subject
                .subscribeOn(AndroidSchedulers.mainThread())
                .doOnNext { t -> Log.i("subject doOnNext", Thread.currentThread().name + " " + Thread.currentThread().id) }
                .observeOn(AndroidSchedulers.mainThread())
                .map { _ -> Log.i("subject map", Thread.currentThread().name + " " + Thread.currentThread().id) }

        //imitation of async creating of strings in separate thread
        timer("timerThread", false, 2000L, 2000L) {
            stringToGenerate = System.currentTimeMillis().toString()
            subject.onNext(stringToGenerate)
        }
    }
}

Одно из действий, которое должно потреблять сгенерированные строки:

class TestActivity : AppCompatActivity() {

    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)

        setContentView(R.layout.activity_test)

        val wrongThreadObserver = object : Observer<String> {
            override fun onComplete() {
            }

            override fun onSubscribe(d: Disposable) {
            }

            override fun onNext(t: String) {
                Log.i("wrongThreadObserver", Thread.currentThread().name + " " + Thread.currentThread().id)
            }

            override fun onError(e: Throwable) {
            }
        }

        val generator = Generator()
        generator.subject.subscribe(wrongThreadObserver)

        //for correct work illustration
        val correctThreadObserver = object : Observer<String> {
            override fun onComplete() {
            }

            override fun onSubscribe(d: Disposable) {
            }

            override fun onNext(t: String) {
                Log.i("correctThreadObserver", Thread.currentThread().name + " " + Thread.currentThread().id)
            }

            override fun onError(e: Throwable) {
            }
        }

        val mainThreadSubject = BehaviorSubject.create<String>()
        mainThreadSubject
                .doOnNext { obj -> Log.i("correctThread doOnNext", Thread.currentThread().name + " " + Thread.currentThread().id) }
                .subscribeOn(Schedulers.io())
                .observeOn(Schedulers.newThread())
                .subscribe(correctThreadObserver)
        mainThreadSubject.onNext("test thread")
        val handler = Handler()
        handler.postDelayed({ mainThreadSubject.onNext("test thread 2") }, 1000)
        handler.postDelayed({ mainThreadSubject.onNext("test thread 3") }, 2000)
    }
}

В этом случае правильноThreadObserver, созданный только в действии, работает нормально, но неправильныйThreadObserver продолжает работу в потоке таймера, кажется, что он игнорирует инструкции subscribeOn, ObserveOn, doOnNext в генераторе, независимо от того, где эти инструкции вызывались - в init, в потоке таймера, в действии путем получения объекта к нему от генератора - неправильныйThreadObserver все еще работает в потоке таймера. Итак, журнал:

I/correctThread doOnNext: main 2

I/correctThreadObserver: RxNewThreadScheduler-1 941

I/correctThread doOnNext: main 2

I/correctThreadObserver: RxNewThreadScheduler-1 941

I/wrongThreadObserver: timerThread 937

I/correctThread doOnNext: main 2

I/correctThreadObserver: RxNewThreadScheduler-1 941

I/wrongThreadObserver: timerThread 937

I/wrongThreadObserver: timerThread 937

I/wrongThreadObserver: timerThread 937

Нет doOnNext и нет основного потока для wrongThreadObserver Что я делаю не так?

Замените subscribeOn на observeOn, и он должен работать, я думаю

EpicPandaForce 10.08.2018 14:49
0
1
214
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Я нахожу следующее решение: Мы должны подписаться на Observable, а не на Subject, поэтому мы должны использовать результат метода observeOn(), а не сам объект «субъект». Если требуется множественная подписка, мы можем кэшировать результат observeOn в отдельной переменной:

//in Generator
.......
var observableInSeparateVar = subject.observeOn(AndroidSchedulers.mainThread())
.......

//in TestActivity
.......
generator.observableInSeparateVar.subscribe(wrongThreadObserver)
.......

Также мы можем вызвать observeOn() в тему и после этого подписаться:

//in TestActivity
.......
generator.subject
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(wrongThreadObserver)

Другие вопросы по теме