Как вызвать приостановку развлечения асинхронно из области потока?

Так что я никогда не делал ничего подобного, поэтому понятия не имею, какой подход лучше.

Прямо сейчас у меня есть useCase, который возвращает поток. Для простоты я не прикрепляю свой код, но он очень похож на этот.

class UseCase constructor(
    private val userRepository: UserRepository,
    private val carRepository: CarRepository
){
    operator fun invoke(): Flow<Result<UserWithCars>>{
        return flow {
            try {
                //UserRepository under the hood returns with a RxJava Single<List<User>> list
                // But I converted to suspend fun with await() fun
                val users = userRepository.getAllUsers().await()
                fetchUserCars(users)
            }catch (e: Exception){
                emit(Result.Error())
            }
        }
    }

    private suspend fun fetchUserCars(userList: List<User>){
        userList.forEach { user->
            user.cars = try {
                //REpository also returns with a Single<>
                carRepository.getCarsByUserId(user.id).await()
            }catch (e: Exception){
                emptyList<Car>()
            }
        }
    }
}

Если я не ошибаюсь, foreach будет выполняться синхронно. Я не уверен, но думаю, что foreach можно выполнить асинхронно, но я не знаю как.

Тогда я использовал ansyc{} fun , но мне нужно было находиться внутри области действия Coroutine.

Раньше пользовательский сингл создавался в формате FlatMap, и автомобили переносились в пользовательский объект.

return users.flatMap { user->
                    fetchCars(user)
                }

В методе fetchCars() я в основном сопоставлял каждый идентификатор пользователя с Single<>, который запрашивает автомобили, поэтому я получаю List<Single>, и после этого я использовал RxJava Zip, но в этом случае я не уверен. , как работает Zip, поэтому все синглы будут выполняться друг за другом или одновременно

Так могу ли я оптимизировать его еще немного, или сейчас все в порядке?

Что вы подразумеваете под «синхронно»/«асинхронно»? Вы имеете в виду, что вам нравится отправлять несколько запросов на автомобили одновременно? Также обратите внимание, что вы никогда не добавляете в поток пользователей.

broot 21.06.2024 12:14

Да, точно, я не уверен, справится ли бэкэнд с этим, но мне интересно, как я могу сделать что-то подобное. Как я уже упоминал, код, который я приложил, является примером, а не реальным кодом.

Andreas1234 21.06.2024 12:24
1
2
52
2
Перейти к ответу Данный вопрос помечен как решенный

Ответы 2

Вы правы насчет использования async, но вы, вероятно, упустили из виду тот факт, что вам нужно создать новую область сопрограммы, чтобы разрешить запуск нескольких дочерних программ. Кроме того, поскольку мы не возвращаем список автомобилей, а добавляем его непосредственно пользователю, мы можем использовать launch вместо async:

private suspend fun fetchUserCars(userList: List<User>){
    coroutineScope { 
        userList.forEach { user->
            launch {
                user.cars = try {
                    //REpository also returns with a Single<>
                    carRepository.getCarsByUserId(user.id).await()
                }catch (e: Exception){
                    emptyList<Car>()
                }
            }
        }
    }
}

Обратите внимание, что это может привести к запуску большого количества параллельных сетевых запросов. Вы можете ограничить это число, заменив coroutineScope на, например: withContext(Dispatchers.IO.limitedParallelism(8)). Однако, поскольку вы используете RxJava, это может не понадобиться, поскольку RxJava, вероятно, управляет своими собственными очередями и пулами потоков.

Спасибо, к этому времени я тоже нашел то, что пропустил, но хорошо, что вы это подтвердили.

Andreas1234 21.06.2024 12:53

Я не разбираюсь в RxJava, но разве не достаточно сначала получить все синглы, а затем дождаться их в отдельном цикле? Тогда никакие дополнительные сопрограммы не потребуются. Я добавил это как отдельный ответ, сравните.

Leviathan 21.06.2024 13:45

@Leviathan Да, я тоже не очень знаком с RxJava, но это имеет смысл.

broot 21.06.2024 13:53
Ответ принят как подходящий

Ответ Брута хорош, поскольку он позволяет вам await синглы асинхронно, но он запускает новую сопрограмму для ожидания каждого сингла.

Я не очень хорошо знаком с RxJava, но думаю, что следующее должно работать без запуска каких-либо дополнительных сопрограмм:

private suspend fun fetchUserCars(userList: List<User>) {
    userList
        .map { it to carRepository.getCarsByUserId(it.id) }
        .map {
            it.first.cars = try {
                it.second.await()
            } catch (e: Exception) {
                emptyList()
            }
        }
}

Список пользователей повторяется дважды: на первой итерации извлекаются Singles (без ожидания их значения, так что это быстро и не требуется сопрограмма), на второй итерации await вызывается для каждого Single для ожидания значения. Для этого нужна сопрограмма, но это можно сделать последовательно в той же сопрограмме, в которой запускается функция приостановки, потому что вы хотите подождать, пока все не будет выполнено.

Это похоже на то, как встроенный await работает на joinAll.

Спасибо, вообще-то я создал две версии класса варианта использования, и одна из них похожа на вашу. Итак, если я понимаю ваш код, удовольствие от карты происходит из библиотеки RxJava, оно определенно похоже на мое, может быть, оно немного лучше, чем первый подход.

Andreas1234 21.06.2024 14:42
карта принадлежит стандартной библиотеке Kotlin, как и forEach. Он возвращает новый список, элементы которого заменены значением лямбды. Он эффективно сопоставляет значения из списка с новыми значениями, отсюда и его название.
Leviathan 21.06.2024 14:54

На самом деле, если вы вызываете карту в объекте Single, она будет использовать RxJava, userList.map поступает из библиотеки Kotlin, но поскольку carRepostiory.getCarsByUserId() возвращает Single, вторая карта поступает из библиотеки Rx, но да это хорошее решение, спасибо

Andreas1234 21.06.2024 17:02

Нет, в обоих случаях map вызывается List. Внутри списка может быть Single, но операция map применяется к списку, а не к Single, поэтому в обоих случаях используется функция stdlib Kotlin map. Single также имеет функцию map, которая имеет аналогичную семантику (следовательно, то же имя), но здесь она никогда не используется и не должна использоваться.

Leviathan 21.06.2024 17:17

Другие вопросы по теме