Не удается получить предыдущие испускаемые значения из Flow

Не удается получить предыдущие переданные значения из Flow.

class TestActivity: ComponentActivity() {
...

private val flowA = MutableStateFlow(0)

private val flowB = MutableStateFlow("")

init {
    flowB.onEach { Log.d("flowtest", "test - $it") }
        .launchIn(lifecycleScope)
}

override fun onCreate(savedInstanceState: Bundle?) {
    super.onCreate(savedInstanceState)
    flowB.value = "1"
    flowB.value = "2"
    flowB.value = "3"

    flowA.map { "ignore" }
        .onEach {
            flowB.value = "4"
            flowB.value = "5"
            flowB.value = "6"
        }
        .launchIn(lifecycleScope)

    flowA.value = 0
    
...
}

ожидать

тест - 1
тест - 2
тест - 3
тест - 4
тест - 5
тест - 6

результат
тест - 1
тест - 2
тест - 3
тест - 6

Что отсутствует в этой концепции Потока?

Как я могу получить предыдущие испускаемые значения?

Пожалуйста, обновите свой вопрос, чтобы включить реальный код, который вы используете. Предоставленный вами образец кода не может даже скомпилироваться, и даже если бы он мог, он не должен вести себя так, как вы описали. Он должен ждать collect() вечно и даже не доходить до линии, где он излучает 3.

broot 11.02.2023 09:36
0
1
53
2
Перейти к ответу Данный вопрос помечен как решенный

Ответы 2

На самом деле с этим кодом что-то не так, потому что свойство value является свойством stateFlow, а не свойством sharedFlow. Кроме того, вы не можете добавить ценность в namedFlow после сбора данных, если сборщик и подписчик находятся в одной и той же сопрограмме. Итак, правильный код будет:

val flowA = MutableSharedFlow<Int>()

val flowB = MutableStateFlow("")

flowA.map { "test $it" }
    .onEach { flowB.value = it }

flowB.value = "1"
flowB.value = "2"
flowB.value = "3"

flowB.collect { println(it) }

и результат будет: 3

потому что stateFlow просто сохраняет последний выпущенный

если вы хотите получить все значения, вы можете использовать SharedFlow следующим образом:

suspend fun main():Unit = coroutineScope {

val flowA = MutableSharedFlow<Int>()

launch {
    flowA.collect {
        println(it)
    }
}

launch {
    flowA.emit(1)
    flowA.emit(2)
    flowA.emit(3)
}

И вы можете проверить документацию, чтобы получить больше информации SharedFlow StateFlow

Извините за предоставление слишком мало информации и неправильный код. Меняю код и описание. не могли бы вы проверить еще раз, пожалуйста? как вы упомянули, что, поскольку stateFlow просто сохраняет последний испущенный, почему 1, 2, 3 испускаются, а 4, 5 не испускаются?

user2243604 11.02.2023 13:29
Ответ принят как подходящий

Это всего лишь предположение, так как я не собираюсь собирать проект и тестировать его.

Во-первых, нужно помнить две вещи о StateFlow.

  1. Он ограничен историей 1. Он может воспроизводить только одно значение для новых подписчиков.
  2. Это объединено. Это означает, что если коллектор работает медленнее, чем входящие выбросы, коллектор пропустит некоторые из этих выданных значений и получит только самое последнее.

Итак, глядя на ваш код, на первый взгляд я ожидаю, что вы увидите только:

test - 3
test - 6

Это связано с тем, что вы испускаете в основном потоке и собираете в основном потоке, поэтому всякий раз, когда у вас есть несколько изменений значения StateFlow в строке, я ожидаю, что только последнее, вызванное в методе, будет «прилипать», поскольку сборщик имеет ждать своей очереди освобождения основного потока, прежде чем он сможет получить свое следующее значение.

Так почему же появляются 1 и 2?

Ну, на самом деле, lifecycleScope не использует Dispatchers.Main. Он использует Dispatchers.Main.immediate, который ведет себя немного иначе. Версия immediate немедленно запускает код приостановки на месте, если вы уже находитесь в потоке Main, вместо того, чтобы сначала уступать другим сопрограммам.

Итак, я предполагаю, что когда вы меняете значение в основном потоке, но вы собираете flowB's onEach в Dispatchers.Main.immediate, у него есть возможность немедленно запустить свой onEach прямо на месте каждый раз, когда вы передаете значение flowB.

Но напомню, что на самом деле я не тестировал потоки с immediate, чтобы проверить эту гипотезу. Это единственная причина, которая, как мне кажется, объясняет такое поведение. Чтобы проверить эту теорию самостоятельно, вы можете использовать launchIn(lifecycleScope + Dispatchers.Main) в обоих местах и ​​посмотреть, исчезнут ли 1 и 2.

Да, вы правы. Я меняю область действия, как вы упомянули (lifecycleScope + Dispatchers.Main), 1 и 2 исчезли. ключевым моментом является то, что Dispatchers.Main.immediate имеет гарантию немедленного запуска кода приостановки.

user2243604 13.02.2023 07:15

Поэтому, если я хочу получить все выдаваемые значения последовательно и немедленно, я должен использовать смешанный контекст, который немедленно запускает код приостановки. Спасибо.

user2243604 13.02.2023 07:27

Ну, я бы не стал полагаться на это. Этот пример надуманный. На практике вы обычно не определяете свой поток в том же классе, в котором вы его собираете. И из-за разделения задач и модульности в вашем дизайне вы, вероятно, не хотите подчинять класс сбора набору правил. ему нужно следовать, чтобы успешно что-то собрать. Вместо этого я бы использовал SharedFlow с буфером соответствующего размера.

Tenfour04 13.02.2023 13:22

Другие вопросы по теме