Кэширование элементов от наблюдаемых до первой подписки для борьбы с состоянием гонки

Я использую Reactive Extensions (RxJava 2) для выполнения вызова RPC к устройству Bluetooth, что приводит к входящему потоку данных, который я впоследствии анализирую, также используя Rx. В результате API представляет собой простой Flowable<DownloadedRecord>. Для этого я строю поверх Rx API библиотеки Sweetblue для Android.

Моя проблема заключается в том, что существует состояние гонки между «запросом» устройства на запуск потоковой передачи и своевременной подпиской на поток, чтобы убедиться, что пакеты не пропущены.

Я использую Completable, чтобы сначала выполнить вызов RPC, чтобы запросить начало потоковой передачи данных, andThen( readRecords ). Возникает состояние гонки, когда Sweetblue отправляет некоторые пакеты до того, как readRecords успевает подписаться на этот поток, тем самым «ломая» readRecords.

Чтобы абстрагироваться от этого конкретного сценария, возьмите следующий автономный код:

val numbers = PublishSubject.create<Int>()

var currentTotal = 0
val sumToTen = numbers
    .doOnNext { currentTotal += it }
    .doOnNext { println( "Produced $it" ) }
    .takeUntil { currentTotal >= 10 }
    .doOnComplete { println( "Produced a total of $currentTotal." ) }

Completable.fromAction { numbers.onNext( 9 ) } ) // Mimic race condition.
    .andThen( sumToTen )
    .subscribe { println( "Observed: $it, Current total: $currentTotal" ) }

numbers.onNext( 1 )

Вызов numbers.onNext( 9 ) имитирует состояние гонки. sumToTen никогда не наблюдает этот номер, поскольку sumToTen подписывается только на следующей строке. Таким образом, поток никогда не завершается.

После некоторого исследования я понял, что могу «решить» эту проблему, используя replay и connect.

val numbers = PublishSubject.create<Int>()

var currentTotal = 0
val sumToTen = numbers
    .doOnNext { currentTotal += it }
    .doOnNext { println( "Produced $it" ) }
    .takeUntil { currentTotal >= 10 }
    .doOnComplete { println( "Produced a total of $currentTotal." ) }
    .replay( 1 ) // Always replay last item upon subscription.

Completable.fromAction { sumToTen.connect() }
    .andThen( Completable.fromAction { numbers.onNext( 9 ) } )
    .andThen( sumToTen )
    .subscribe { println( "Observed: $it, Current total: $currentTotal" ) }

numbers.onNext( 1 )

Теперь поток sumToTen завершается, поскольку при первом подключении к sumToThen до «начала потоковой передачи данных» (onNext( 9 )) этот поток подписывается на numbers, поэтому возникают предполагаемые побочные эффекты (currentTotal). Но, '9' наблюдается только тогда, когда буфер replay достаточно велик (в данном случае это так). Например, замена replay( 1 ) на publish сделает поток полным («Создано всего 10»), но не будет наблюдать «9».

Я не полностью удовлетворен этим решением по двум причинам:

  1. Это просто сводит к минимуму вероятность возникновения состояния гонки. Размер буфера replay произволен.
  2. Это всегда будет хранить указанное количество элементов в replay в памяти, хотя намерение состоит в том, чтобы сделать это только до тех пор, пока на них не будет подписано.

Практически говоря, ни одна из них не является реальной проблемой, но с точки зрения ремонтопригодности это взлет на глазах: код не ясно передает намерение.

Есть ли лучший способ справиться с этим сценарием? Например.:

  • Оператор replay, который воспроизводит только для одного подписчика (таким образом, сбрасывает кеш после первого запуска).
  • Совершенно другой подход, чем тот, который я исследовал здесь с publish/connect?

Привет, у меня похожая проблема. Вы когда-нибудь находили для него решение?

Daniel82 02.06.2021 17:36

@ Daniel82 не что иное, как описанные решения.

Steven Jeuris 03.06.2021 09:13
2
2
170
0

Другие вопросы по теме