Как вернуть Kotlin Coroutines Flow в Spring Reactive WebClient

Spring 5.2 принес поддержку сопрограмм Kotlin, Spring Reactive WebClient получил поддержку сопрограмм в расширениях Kotlin.

Я создал серверную службу, которая предоставляет GET /posts как поток, проверьте коды здесь.

@GetMapping("")
fun findAll(): Flow<Post> =
        postRepository.findAll()

В образце клиента я попытался использовать WebClient для использования этого API следующим образом.

@GetMapping("")
suspend fun findAll(): Flow<Post> =
        client.get()
                .uri("/posts")
                .accept(MediaType.APPLICATION_JSON)
                .awaitExchange()
                .awaitBody()

Это не удалось из-за сериализации Джексона типа Flow.

Из-за метода awaitXXX в приведенном выше выражении я должен использовать модификатор suspend для этого развлечения.

Но следующее работает, если я изменил тип тела на Any, проверьте скомпилировать коды.

GetMapping("")
suspend fun findAll() =
        client.get()
                .uri("/posts")
                .accept(MediaType.APPLICATION_JSON)
                .awaitExchange()
                .awaitBody<Any>()

После прочтения Котлинские сопрограммы документа Spring ref Flux следует преобразовать в поток сопрограмм Kotlin. Как поступить с возвращаемым типом в Flow и убрать suspend здесь?

Обновлять: тип возвращаемого значения изменен на Flow, проверьте последний исходные коды здесь, я думаю, что это может быть частью Spring 5.2.0.M2. Модификатор suspend требуется для двухэтапных операций сопрограмм в API веб-клиента, как поясняет ниже Себастьен Делёз.

3
0
3 451
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Первое, что нужно понять, это то, что возврат Flow не требует использования функций приостановки для самого метода обработчика. С Flow функции приостановки обычно изолированы в лямбда-параметрах. Но в этом (распространенном) случае использования из-за WebClient двухэтапного API (сначала получите ответ, затем получите тело) нам нужно, чтобы метод обработчика был приостановлен для awaitExchange, а затем получил тело как Flow с расширением bodyToFlow:

@GetMapping("")
suspend fun findAll() =
    client.get()
        .uri("/posts")
        .accept(MediaType.APPLICATION_JSON)
        .awaitExchange()
        .bodyToFlow<Post>()

Это поддерживается начиная с Spring Framework 5.2 M2 и Spring Boot 2.2 M3 (см. связанная проблема). См. также мой связанный подробный пост в блоге.

Почему бы не добавить bodyToFlow на биржу напрямую?

Hantsy 21.04.2019 07:33
WebClient — это двухэтапный API, потому что то, что вы делаете с телом, может зависеть от обработки другой информации, такой как статус ответа или заголовки. Я зафиксировал исправление на мастере, не стесняйтесь тестировать.
Sébastien Deleuze 25.04.2019 15:22

Другие вопросы по теме