Весенняя загрузка с использованием Webflux и Kotlin: операция CRUD из подписанного Mono не работает

Я тестирую, как запрашивать данные из API "jsonplaceholder" и сохранять полученные данные в локальной базе данных Postgresql. Когда я звоню от Почтальона, полученные данные возвращаются правильно с помощью «jsonplaceholder», но, как ни странно, в базе данных ничего не сохраняется. Может быть, я просто что-то упускаю в своем коде (см. ниже), я не знаю! Это приложение Spring Boot с Webflux, Kotlin и Postgres'SQL в качестве локальной базы данных. Заранее благодарю за любую помощь!

    /* The config file: WebfluxConfig.kt*/
    package com.example.monoSubscribe.config

    import org.springframework.context.annotation.Configuration
    import org.springframework.http.codec.ServerCodecConfigurer
    import org.springframework.web.reactive.config.CorsRegistry
    import org.springframework.web.reactive.config.EnableWebFlux
    import org.springframework.web.reactive.config.WebFluxConfigurer

    @Configuration
    @EnableWebFlux
    class WebfluxConfig: WebFluxConfigurer {

    override fun addCorsMappings(registry: CorsRegistry) {
        registry.addMapping("api/**")
    }
    override fun configureHttpMessageCodecs(configurer: 
    ServerCodecConfigurer) {
        configurer.defaultCodecs().maxInMemorySize(-1)
    }
    }


    /* The model: Post.kt */
    package com.example.monoSubscribe.model

    data class Post(
    val userId: Int,
    val id: Int,
    val title: String,
    val body: String
    )
    /* The Repository: JsonholderRepo.kt */
    package com.example.monoSubscribe.repo

    import com.example.monoSubscribe.model.Post
    import org.springframework.data.repository.reactive.ReactiveCrudRepository

    interface JsonholderRepo: ReactiveCrudRepository<Post, Long> {
    }
    /* The service: AppService.kt */
    package com.example.monoSubscribe.service

    import com.example.monoSubscribe.model.Post
       import com.example.monoSubscribe.repo.JsonholderRepo
    import org.springframework.stereotype.Service
    import org.springframework.web.reactive.function.client.WebClient
    import org.springframework.web.util.UriBuilder
    import reactor.core.publisher.Flux
    import reactor.core.publisher.Mono

    @Service
    class AppService( private val jsonholderRepo : JsonholderRepo ) {

    fun createPost(postReq: Post) : Mono<Post> {
        return jsonholderRepo.save(
                Post(
                    userId = postReq.userId,
                    id = postReq.id,
                    title = postReq.title,
                    body = postReq.body
                )
            )
    }

    /** Call remote API (json placeholder) */
    fun fetchPosts(): Flux<Post> = 
    fetch("/posts").bodyToFlux(Post::class.java)
    fun fetch(path: String): WebClient.ResponseSpec {
        val baseURL = "http://jsonplaceholder.typicode.com/"
        val client = WebClient.create(baseURL)
        return client.get().uri{
                builder: UriBuilder ->
            builder.path(path)
                .build()
        }.retrieve()
    }
    }

    //The controller: AppController.kt
    package com.example.monoSubscribe.controller

    import com.example.monoSubscribe.model.Post
    import com.example.monoSubscribe.service.AppService
    import org.springframework.http.ResponseEntity
    import org.springframework.web.bind.annotation.*
    import reactor.core.publisher.Mono
    import reactor.kotlin.core.publisher.toMono


    @RestController
    @RequestMapping("/api")
    class AppController(private val appService: AppService) {


    @PostMapping("/posts")
    fun createPost(@RequestBody postReq: Post): Mono<Post> {
        return appService.createPost(postReq)
    }

    /** Call remote API - Json placeholder  */
    @GetMapping("/jsonplaceholder")
    fun getData(): Mono<ResponseEntity<List<Post>>> {
        val ret =  appService.fetchPosts()
            .take(3)
            .collectList()
            .map { body -> ResponseEntity.ok().body(body) }
            .toMono()
        ret.log().subscribe(
            {
                val x:List<Post> = it.body as List<Post>
                for (t in x){
                    print(t)
                    appService.createPost(t)
                }
            },null,
            { }
        )
        return ret
    }

    }

    /* The property file: application.yaml */
    spring:
      r2dbc:
        url: r2dbc:postgresql://localhost:5432/
        username: postgres
        password: "*********"
      jpa:
        properties:
          hibernate:
            generate_statistics: true
    server:
      error:
        include-message:
    logging:
      file:
        name: src/main/kotlin/de/cops/webfluxPostgresql/logging/aam.log
    /* The build.gradle*/
    import org.jetbrains.kotlin.gradle.tasks.KotlinCompile

    plugins {
    id("org.springframework.boot") version "2.7.4"
    id("io.spring.dependency-management") version "1.0.14.RELEASE"
    kotlin("jvm") version "1.6.21"
    kotlin("plugin.spring") version "1.6.21"
    }

    group = "com.example"
    version = "0.0.1-SNAPSHOT"
    java.sourceCompatibility = JavaVersion.VERSION_11

    repositories {
    mavenCentral()
    }

    dependencies {
    implementation("org.springframework.boot:spring-boot-starter-data-r2dbc")
    implementation("org.springframework.boot:spring-boot-starter-webflux")
    implementation("com.fasterxml.jackson.module:jackson-module-kotlin")
    implementation("io.projectreactor.kotlin:reactor-kotlin-extensions")
    implementation("org.jetbrains.kotlin:kotlin-reflect")
    implementation("org.jetbrains.kotlin:kotlin-stdlib-jdk8")
    implementation("org.jetbrains.kotlinx:kotlinx-coroutines-reactor")
    runtimeOnly("org.postgresql:postgresql")
    runtimeOnly("org.postgresql:r2dbc-postgresql")
    testImplementation("org.springframework.boot:spring-boot-starter-test")
    testImplementation("io.projectreactor:reactor-test")
    implementation("org.springframework.boot:spring-boot-starter- 
    validation:2.7.5")
    }

      tasks.withType<KotlinCompile> {
    kotlinOptions {
        freeCompilerArgs = listOf("-Xjsr305=strict")
        jvmTarget = "11"
    }
    }

    tasks.withType<Test> {
    useJUnitPlatform()
    }

Тег mono предназначен для чего-то другого. Вместо этого используйте тег java.

Lex Li 04.11.2022 02:52
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
В компьютерном программировании биты играют важнейшую роль в представлении и манипулировании данными на двоичном уровне. Побитовые операции...
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Приходилось ли вам сталкиваться с требованиями, в которых вас могли попросить поднять тревогу или выдать ошибку, когда метод Java занимает больше...
Полный курс Java для разработчиков веб-сайтов и приложений
Полный курс Java для разработчиков веб-сайтов и приложений
Получите сертификат Java Web и Application Developer, используя наш курс.
0
1
139
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Проблема в том, как вы пытаетесь сохранить сообщения в базе данных.

На данный момент не сохраняется, так как эта часть кода не успевает выполниться. subscribe не означает, что основной поток завершится после выполнения этих строк.

       ret.log().subscribe(
            {
                val x:List<Post> = it.body as List<Post>
                for (t in x){
                    print(t)
                    appService.createPost(t)
                }
            },null,
            { }
        )

В вашем случае сохранение выполняется в отдельном потоке, который не блокирует основной поток. Основной поток завершается до выполнения операций сохранения. Вот почему база данных пуста.

Операции по сохранению постов должны быть частью реактивного потока, который у вас уже есть. Если вы поместите его перед сопоставлением с ResponseEntity, вы пропустите дополнительное сопоставление из объекта ответа для повторной публикации.

Что-то вроде этого должно работать.

        return appService.fetchPosts()
            .take(3)
            .flatMap { appService.createPost(it) }
            .collectList()
            .map { body -> ResponseEntity.ok().body(body) }
            .toMono()

Пожалуйста, отредактируйте сообщение и добавьте свою текущую реализацию в конце.

Ice 05.11.2022 22:38

Другие вопросы по теме