Я использую Reactive Extensions (RxJava 2) для выполнения вызова RPC к устройству Bluetooth, что приводит к входящему потоку данных, который я впоследствии анализирую, также используя Rx. В результате API представляет собой простой Flowable<DownloadedRecord>. Для этого я строю поверх Rx API библиотеки Sweetblue для Android.
Моя проблема заключается в том, что существует состояние гонки между «запросом» устройства на запуск потоковой передачи и своевременной подпиской на поток, чтобы убедиться, что пакеты не пропущены.
Я использую Completable, чтобы сначала выполнить вызов RPC, чтобы запросить начало потоковой передачи данных, andThen( readRecords ). Возникает состояние гонки, когда Sweetblue отправляет некоторые пакеты до того, как readRecords успевает подписаться на этот поток, тем самым «ломая» readRecords.
Чтобы абстрагироваться от этого конкретного сценария, возьмите следующий автономный код:
val numbers = PublishSubject.create<Int>()
var currentTotal = 0
val sumToTen = numbers
.doOnNext { currentTotal += it }
.doOnNext { println( "Produced $it" ) }
.takeUntil { currentTotal >= 10 }
.doOnComplete { println( "Produced a total of $currentTotal." ) }
Completable.fromAction { numbers.onNext( 9 ) } ) // Mimic race condition.
.andThen( sumToTen )
.subscribe { println( "Observed: $it, Current total: $currentTotal" ) }
numbers.onNext( 1 )
Вызов numbers.onNext( 9 ) имитирует состояние гонки. sumToTen никогда не наблюдает этот номер, поскольку sumToTen подписывается только на следующей строке. Таким образом, поток никогда не завершается.
После некоторого исследования я понял, что могу «решить» эту проблему, используя replay и connect.
val numbers = PublishSubject.create<Int>()
var currentTotal = 0
val sumToTen = numbers
.doOnNext { currentTotal += it }
.doOnNext { println( "Produced $it" ) }
.takeUntil { currentTotal >= 10 }
.doOnComplete { println( "Produced a total of $currentTotal." ) }
.replay( 1 ) // Always replay last item upon subscription.
Completable.fromAction { sumToTen.connect() }
.andThen( Completable.fromAction { numbers.onNext( 9 ) } )
.andThen( sumToTen )
.subscribe { println( "Observed: $it, Current total: $currentTotal" ) }
numbers.onNext( 1 )
Теперь поток sumToTen завершается, поскольку при первом подключении к sumToThen до «начала потоковой передачи данных» (onNext( 9 )) этот поток подписывается на numbers, поэтому возникают предполагаемые побочные эффекты (currentTotal). Но, '9' наблюдается только тогда, когда буфер replay достаточно велик (в данном случае это так). Например, замена replay( 1 ) на publish сделает поток полным («Создано всего 10»), но не будет наблюдать «9».
Я не полностью удовлетворен этим решением по двум причинам:
replay произволен.replay в памяти, хотя намерение состоит в том, чтобы сделать это только до тех пор, пока на них не будет подписано.Практически говоря, ни одна из них не является реальной проблемой, но с точки зрения ремонтопригодности это взлет на глазах: код не ясно передает намерение.
Есть ли лучший способ справиться с этим сценарием? Например.:
replay, который воспроизводит только для одного подписчика (таким образом, сбрасывает кеш после первого запуска).publish/connect?@ Daniel82 не что иное, как описанные решения.
Привет, у меня похожая проблема. Вы когда-нибудь находили для него решение?