Есть ли способ ограничить количество сборщиков в функции, которая возвращает поток, используя построитель потока?
У меня есть этот общедоступный метод в ViewModel
fun fetchAssets(limit: String) {
viewModelScope.launch {
withContext(Dispatchers.IO){
getAssetsUseCase(AppConfigs.ASSET_PARAMS, limit).onEach {
when (it) {
is RequestStatus.Loading -> {
_assetState.tryEmit(AssetState.FetchLoading)
}
is RequestStatus.Success -> {
_assetState.tryEmit(AssetState.FetchSuccess(it.data.assetDataDomain))
}
is RequestStatus.Failed -> {
_assetState.tryEmit(AssetState.FetchFailed(it.message))
}
}
}.collect()
}
}
}
Этот метод вызывается в блоке init
ViewModel, но его также можно вызвать вручную в пользовательском интерфейсе.
Этот поток испускает значение каждые 10 секунд.
Репозиторий
override fun fetchAssets(
query: String,
limit: String
) = flow {
while (true) {
try {
interceptor.baseUrl = AppConfigs.ASSET_BASE_URL
emit(RequestStatus.Loading())
val domainModel = mapper.mapToDomainModel(service.getAssetItems(query, limit))
emit(RequestStatus.Success(domainModel))
} catch (e: HttpException) {
emit(RequestStatus.Failed(e))
} catch (e: IOException) {
emit(RequestStatus.Failed(e))
}
delay(10_000)
}
}
К сожалению, каждый раз, когда fetch()
вызывался из пользовательского интерфейса, я замечал, что он создает другие сборщики, поэтому в итоге может появиться множество сборщиков, что действительно плохо и неправильно.
Идея состоит в том, чтобы иметь поток, который выдает значение каждые 10 секунд, но его также можно вызывать вручную через пользовательский интерфейс для немедленного обновления данных без использования нескольких коллекторов.
@Сергей SharedFlow
Похоже, вы неправильно понимаете, что значит собирать поток, или неправильно используете операцию сбора. Собирая поток, мы имеем в виду, что наблюдаем за его изменениями. Но вы пытаетесь использовать collect()
, чтобы внести изменения в поток, что на самом деле не может работать. Он просто запускает другой поток в фоновом режиме.
Вы должны собрать поток только один раз, поэтому держите его внутри init
или там, где это подходит для вашего случая. Затем вам нужно обновить логику потока, чтобы можно было запускать перезагрузку по требованию. Есть много способов сделать это, и решение будет различаться в зависимости от того, нужно ли вам сбросить таймер при ручном обновлении или нет. Например, мы можем использовать канал для уведомления потока о необходимости перезагрузки:
val reloadChannel = Channel<Unit>(Channel.CONFLATED)
fun fetchAssets(
query: String,
limit: String
) = flow {
while (true) {
try {
...
}
withTimeoutOrNull(10.seconds) { reloadChannel.receive() } // replace `delay()` with this
}
}
fun reload() {
reloadChannel.trySend(Unit)
}
Всякий раз, когда вам нужно запустить ручную перезагрузку, не запускайте другой поток или другую операцию collect()
, а вместо этого просто вызовите reload()
. Затем поток, который уже собирается, начнет перезагружаться и будет выдавать изменения состояния.
Это решение сбрасывает таймер при ручной перезагрузке, что, по моему мнению, лучше для пользователя.
Если это решение сбрасывает таймер при ручной перезагрузке, то при вызове fun reload()
нам все равно нужно будет ждать еще 10 секунд до выполнения задачи? Поскольку это то, что я испытываю прямо сейчас после выполнения вашего кода, вызов reload()
на самом деле не сразу выполняет блок кода внутри потока. Можем ли мы игнорировать таймер для перезагрузки по требованию, потому что, если текущий таймер уже в 5 секундах, и я звоню reload()
, похоже, что у меня всего 15 секунд?
Он должен немедленно перезагружаться при вызове reload()
. См.: pl.kotl.in/XFGrGU0FB . Если это не так в вашем случае, то я думаю, что в вашем коде должно быть что-то еще, что задерживает это.
Отмечая это как ответ, хотя я нашел лучшее решение, этот ответ использует тот же подход, но по какой-то причине лучше перенести этот подход на ViewModel и использовать shareIn
или stateIn
, что сделает его более оптимизированным.
В итоге я переместил таймер в ViewModel, поскольку я могу запрашивать выборку по запросу, а также не иметь нескольких сборщиков, работающих одновременно.
private var job: Job? = null
private val _assetState = defaultMutableSharedFlow<AssetState>()
fun getAssetState() = _assetState.asSharedFlow()
init {
job = viewModelScope.launch {
while(true) {
if (lifecycleState == LifeCycleState.ON_START || lifecycleState == LifeCycleState.ON_RESUME)
fetchAssets()
delay(10_000)
}
}
}
fun fetchAssets() {
viewModelScope.launch {
withContext(Dispatchers.IO) {
getAssetsUseCase(
AppConfigs.ASSET_BASE_URL,
AppConfigs.ASSET_PARAMS,
AppConfigs.ASSET_SIZES[AppConfigs.ASSET_LIMIT_INDEX]
).onEach {
when(it){
is RequestStatus.Loading -> {
_assetState.tryEmit(AssetState.FetchLoading)
}
is RequestStatus.Success -> {
_assetState.tryEmit(AssetState.FetchSuccess(it.data.assetDataDomain))
}
is RequestStatus.Failed -> {
_assetState.tryEmit(AssetState.FetchFailed(it.message))
}
}
}.collect()
}
}
}
override fun onCleared() {
job?.cancel()
super.onCleared()
}
Пожалуйста, поправьте меня, если это запах кода.
Какой тип имеет свойство
_assetState
? ЭтоStateFlow
?