Я пытаюсь реализовать механизм тайм-аута в функции приостановки, которая возвращает SharedFlow. Я запускал сопрограмму, используя CoroutineScope(coroutineContext).launch { }, но читал, что это не очень хорошая практика. Вместо этого предлагается сделать функцию расширением CoroutineScope. Но мне необходимо приостановить эту функцию. Как реализовать механизм тайм-аута для функции приостановки, которая должна вернуть поток?
Вот упрощенная версия моей реализации:
class Client {
sealed interface State {
object Loading: State
data class Success(val message: String): State
object Error: State
}
interface MyCallListener {
suspend fun onResult(result: State)
}
val methodListeners: MutableMap<String, MyCallListener> = mutableMapOf()
suspend inline fun call(): SharedFlow<State> {
val id = randomUUID()
val flow = MutableSharedFlow<State>()
//onResult is called elsewhere. this way, it's possible to emit a result to the flow
methodListeners[id] = object : MyCallListener {
override suspend fun onResult(result: State) {
flow.emit(result)
}
}
flow.tryEmit(State.Loading)
send(methodMessage) //this is a suspend function which uses a suspend function of ktors websocket session
CoroutineScope(coroutineContext).launch {
var latestValue: State? = null
launch {
flow.collect {
latestValue = it
}
}
delay(5000)
if (latestValue is State.Loading) {
flow.emit(State.Error)
}
}
return flow
}
fun onReceiveMessage(id: String, message: String) {
methodListeners[id]?.onResult(State.Success(message))
}
}
Мне нужно вызвать эту функцию из другой функции приостановки:
class MyUseCase(
val client: Client
) {
suspend fun foo() = client.call()
}
В настоящее время это работает, но поскольку не рекомендуется использовать CoroutineScope(coroutineContext) и я не могу сделать его расширением CoroutineScope, есть ли лучший способ сделать это?
Обновлено: Вот дополнительный контекст того, чего я пытаюсь достичь. Клиент является обработчиком сообщений WebSocket. Я отправляю сообщения WebSocket под названием «метод» с идентификатором и сохраняю экземпляр прослушивателя с этим идентификатором. Когда получено результирующее сообщение с тем же идентификатором, я получаю экземпляр прослушивателя и вызываю Listener.onResult. Я реализовал эту функцию вызова, чтобы иметь возможность получать результат метода в потоке. Но иногда сервер никогда не отправляет сообщение о результате с этим идентификатором. Таким образом, состояние застревает в состоянии загрузки. В этом отношении я хотел реализовать механизм тайм-аута.
Разве withTimeout не приостановит функцию? Я хочу, чтобы он немедленно вернул поток.
Я не знаю, что вы подразумеваете под «это приостановит функцию». withTimeout — это функция приостановки, которая оборачивает тайм-аут вокруг функции приостановки, и это то, что вы ищете, насколько я могу судить по вопросу. Никаких задержек это не добавит.
В этом коде есть ряд странностей, поэтому сложно точно сказать, чего вы пытаетесь достичь. Функция приостановки для создания потока понадобится вам редко, поскольку поток может приостанавливаться внутренне при первом выпуске. И особенно с SharedFlow. Если он общий, предположительно, вы собираетесь его кэшировать после вызова этой функции, но, поскольку он приостанавливается, вам придется кэшировать его в свойстве, допускающем значение NULL, или в свойстве lateinit, что делает его бесполезным для сбора данных из нескольких мест. Почему эта функция не может быть расширением CoroutineScope?
Вам не следует использовать CoroutineScope().launch, если вы не хотите, чтобы эта часть вашей функциональности была неотменяемой, даже если исходная сопрограмма была отменена. Но, по моему мнению, в этой ситуации всегда следует отдавать предпочтение GlobalScope, чтобы было ясно, что вы хотите, чтобы он был глобальным. В документации GlobalScope говорится, что не следует заменять его на CoroutineScope().
Чего именно вы хотите? Вы хотите вызвать какую-то функцию, и если она ничего не вернет до 500 мс, вернет ошибку?
Спасибо за ваши ответы. Я добавил больше деталей к вопросу. @Tenfour04 Tenfour04 на самом деле это работает с CoroutineScope().launch, но я не хочу его использовать по тем же причинам. Я не хотел делать это расширением CoroutineScope, потому что это экземплярная функция Клиента. Мне нужно будет вызывать его везде, используя with(client) {scope.call}, и я думаю, что это некрасиво. Это функция приостановки, поскольку она использует функцию приостановки в теле (ранее я удалил ее для примера)
Это немного помогает, но я все еще не совсем понимаю, что именно вы делаете, потому что в вашем нынешнем проекте так много странных деталей - сигнатура этой функции, почему вы генерируете горячий поток для каждого вызов функции, почему существует прослушиватель, который класс внутренне заменяет при каждом вызове функции, прерывая ранее возвращенные потоки, почему вам вообще нужна отдельная область действия сопрограммы и т. д. Я думаю, что лучший способ сообщить, что вы пытаетесь возможно, нужно объяснить, почему мой ответ ниже не соответствует вашим точным потребностям.
Я добавил больше из своей фактической реализации, чтобы было более понятно.





Попробуйте использовать withTimeoutOrNull
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
class Client {
sealed interface State {
object Loading: State
object Success: State
object Error: State
}
interface MyCallListener {
suspend fun onResult(result: State)
}
var methodListener: MyCallListener? = null
suspend fun call(): SharedFlow<State> {
val flow = MutableSharedFlow<State>()
// onResult is called elsewhere. This way, it's possible to emit a result to the flow
methodListener = object : MyCallListener {
override suspend fun onResult(result: State) {
flow.emit(result)
}
}
flow.tryEmit(State.Loading)
try {
val latestValue = withTimeoutOrNull(5000) {
flow.firstOrNull { it != State.Loading }
}
if (latestValue == null) {
flow.emit(State.Error)
}
} catch (e: TimeoutCancellationException) {
flow.emit(State.Error)
}
return flow
}
}
Разве withTimeout не приостановит функцию? Я хочу, чтобы он немедленно вернул поток.
Я немного догадываюсь, что вы пытаетесь сделать. Я думаю, что SharedFlow должен быть свойством val, поскольку он является общим. Вы не хотите, чтобы несколько вызовов этой функции молча уничтожали предыдущие потоки, которые она возвращала (что ваш код и делает, каждый раз заменяя прослушиватель). И тогда слушатель тоже не должен быть публично изменчивым.
private val backingCallStateFlow = MutableSharedFlow<State>(replay = 1)
.apply { tryEmit(State.Loading) }
// use this to emit new State instead of the listener
fun updateCallState(state: State) {
backingCallStateFlow.tryEmit(state)
}
val callStateFlow = methodListenerFlow.transformLatest {
emit(it)
delay(5000L)
emit(State.Error)
}
// you can tag this with shareIn if appropriate
Поведение здесь заключается в том, что когда вы начинаете собирать поток, он выдает состояния, которые были установлены с помощью updateState(), и если одно из них становится устаревшим (не обновляется в течение 5 секунд), он выдает состояние ошибки.
На самом деле я пытаюсь имитировать HTTP-запрос с помощью веб-сокетов. Таким образом, каждый «вызов» к конечной точке должен иметь свой собственный поток: Загрузка -> Успех/Ошибка и т. д. И мне нужно собрать данные из этого потока на сайте вызова, скажем, в моей ViewModel. Поэтому функция вызова должна вернуть поток. Но идея transforLatest выглядит многообещающе. Я собираюсь попробовать это.
Вы пробовали с Timeout?