Я тестирую, как запрашивать данные из API "jsonplaceholder" и сохранять полученные данные в локальной базе данных Postgresql. Когда я звоню от Почтальона, полученные данные возвращаются правильно с помощью «jsonplaceholder», но, как ни странно, в базе данных ничего не сохраняется. Может быть, я просто что-то упускаю в своем коде (см. ниже), я не знаю! Это приложение Spring Boot с Webflux, Kotlin и Postgres'SQL в качестве локальной базы данных. Заранее благодарю за любую помощь!
/* The config file: WebfluxConfig.kt*/
package com.example.monoSubscribe.config
import org.springframework.context.annotation.Configuration
import org.springframework.http.codec.ServerCodecConfigurer
import org.springframework.web.reactive.config.CorsRegistry
import org.springframework.web.reactive.config.EnableWebFlux
import org.springframework.web.reactive.config.WebFluxConfigurer
@Configuration
@EnableWebFlux
class WebfluxConfig: WebFluxConfigurer {
override fun addCorsMappings(registry: CorsRegistry) {
registry.addMapping("api/**")
}
override fun configureHttpMessageCodecs(configurer:
ServerCodecConfigurer) {
configurer.defaultCodecs().maxInMemorySize(-1)
}
}
/* The model: Post.kt */
package com.example.monoSubscribe.model
data class Post(
val userId: Int,
val id: Int,
val title: String,
val body: String
)
/* The Repository: JsonholderRepo.kt */
package com.example.monoSubscribe.repo
import com.example.monoSubscribe.model.Post
import org.springframework.data.repository.reactive.ReactiveCrudRepository
interface JsonholderRepo: ReactiveCrudRepository<Post, Long> {
}
/* The service: AppService.kt */
package com.example.monoSubscribe.service
import com.example.monoSubscribe.model.Post
import com.example.monoSubscribe.repo.JsonholderRepo
import org.springframework.stereotype.Service
import org.springframework.web.reactive.function.client.WebClient
import org.springframework.web.util.UriBuilder
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
@Service
class AppService( private val jsonholderRepo : JsonholderRepo ) {
fun createPost(postReq: Post) : Mono<Post> {
return jsonholderRepo.save(
Post(
userId = postReq.userId,
id = postReq.id,
title = postReq.title,
body = postReq.body
)
)
}
/** Call remote API (json placeholder) */
fun fetchPosts(): Flux<Post> =
fetch("/posts").bodyToFlux(Post::class.java)
fun fetch(path: String): WebClient.ResponseSpec {
val baseURL = "http://jsonplaceholder.typicode.com/"
val client = WebClient.create(baseURL)
return client.get().uri{
builder: UriBuilder ->
builder.path(path)
.build()
}.retrieve()
}
}
//The controller: AppController.kt
package com.example.monoSubscribe.controller
import com.example.monoSubscribe.model.Post
import com.example.monoSubscribe.service.AppService
import org.springframework.http.ResponseEntity
import org.springframework.web.bind.annotation.*
import reactor.core.publisher.Mono
import reactor.kotlin.core.publisher.toMono
@RestController
@RequestMapping("/api")
class AppController(private val appService: AppService) {
@PostMapping("/posts")
fun createPost(@RequestBody postReq: Post): Mono<Post> {
return appService.createPost(postReq)
}
/** Call remote API - Json placeholder */
@GetMapping("/jsonplaceholder")
fun getData(): Mono<ResponseEntity<List<Post>>> {
val ret = appService.fetchPosts()
.take(3)
.collectList()
.map { body -> ResponseEntity.ok().body(body) }
.toMono()
ret.log().subscribe(
{
val x:List<Post> = it.body as List<Post>
for (t in x){
print(t)
appService.createPost(t)
}
},null,
{ }
)
return ret
}
}
/* The property file: application.yaml */
spring:
r2dbc:
url: r2dbc:postgresql://localhost:5432/
username: postgres
password: "*********"
jpa:
properties:
hibernate:
generate_statistics: true
server:
error:
include-message:
logging:
file:
name: src/main/kotlin/de/cops/webfluxPostgresql/logging/aam.log
/* The build.gradle*/
import org.jetbrains.kotlin.gradle.tasks.KotlinCompile
plugins {
id("org.springframework.boot") version "2.7.4"
id("io.spring.dependency-management") version "1.0.14.RELEASE"
kotlin("jvm") version "1.6.21"
kotlin("plugin.spring") version "1.6.21"
}
group = "com.example"
version = "0.0.1-SNAPSHOT"
java.sourceCompatibility = JavaVersion.VERSION_11
repositories {
mavenCentral()
}
dependencies {
implementation("org.springframework.boot:spring-boot-starter-data-r2dbc")
implementation("org.springframework.boot:spring-boot-starter-webflux")
implementation("com.fasterxml.jackson.module:jackson-module-kotlin")
implementation("io.projectreactor.kotlin:reactor-kotlin-extensions")
implementation("org.jetbrains.kotlin:kotlin-reflect")
implementation("org.jetbrains.kotlin:kotlin-stdlib-jdk8")
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-reactor")
runtimeOnly("org.postgresql:postgresql")
runtimeOnly("org.postgresql:r2dbc-postgresql")
testImplementation("org.springframework.boot:spring-boot-starter-test")
testImplementation("io.projectreactor:reactor-test")
implementation("org.springframework.boot:spring-boot-starter-
validation:2.7.5")
}
tasks.withType<KotlinCompile> {
kotlinOptions {
freeCompilerArgs = listOf("-Xjsr305=strict")
jvmTarget = "11"
}
}
tasks.withType<Test> {
useJUnitPlatform()
}




Проблема в том, как вы пытаетесь сохранить сообщения в базе данных.
На данный момент не сохраняется, так как эта часть кода не успевает выполниться. subscribe не означает, что основной поток завершится после выполнения этих строк.
ret.log().subscribe(
{
val x:List<Post> = it.body as List<Post>
for (t in x){
print(t)
appService.createPost(t)
}
},null,
{ }
)
В вашем случае сохранение выполняется в отдельном потоке, который не блокирует основной поток. Основной поток завершается до выполнения операций сохранения. Вот почему база данных пуста.
Операции по сохранению постов должны быть частью реактивного потока, который у вас уже есть. Если вы поместите его перед сопоставлением с ResponseEntity, вы пропустите дополнительное сопоставление из объекта ответа для повторной публикации.
Что-то вроде этого должно работать.
return appService.fetchPosts()
.take(3)
.flatMap { appService.createPost(it) }
.collectList()
.map { body -> ResponseEntity.ok().body(body) }
.toMono()
Пожалуйста, отредактируйте сообщение и добавьте свою текущую реализацию в конце.
Тег mono предназначен для чего-то другого. Вместо этого используйте тег java.