Невозможно сохранить данные в базе данных Oracle с помощью методов Quarkus Reactive

извините, если это немного грубо. После нескольких дней попыток понять и реализовать концепции асинхронности в моем приложении Quarkus я как бы зашел в тупик, поэтому мне хотелось бы получить некоторую информацию.

Основная идея потока приложения:

Я воспринимаю поток сообщений Кафы. Ключом к этим сообщениям является конечная точка соответствующего внутреннего API, который я вызываю в нашей сети с помощью простого @GET. (редко мне приходится использовать POST, поэтому полезная нагрузка мне действительно нужна, но пока мы оставим ее простой). У меня есть около 20 API, которые я вызываю.

У меня есть классы POJO, которые моделируют JSON-файлы, возвращаемые из этих конечных точек, и в идеале они сохраняются в соответствующей таблице Oracle после того, как я их немного изменю/обновлю.

Мне удалось заставить весь этот процесс работать идеально с помощью простого стиля транзакций/блокировок, и я был очень доволен этим. Проблема возникла, когда я обнаружил, что могу получить только около 15 000 сохраненных сообщений в час и что мне действительно нужно нажать в несколько раз больше.

Итак, я пытался выяснить, как я мог бы реализовать подобные вещи, используя асинхронные методы для повышения производительности, и что бы я ни пытался, я не могу сохранить какие-либо данные в таблице, которую тестирую. При запуске приложения таблица создается со столбцами, но данные никогда не помещаются в нее и не вызывается сообщение об ошибке.

Я очень запутался, потому что в руководствах по кваркусам есть много очень похожих вещей, и для меня это как бы сочетается. Я продолжаю переходить от Hibernate-Reactive-Panache к Mutiny-Primer, к Hibernate-ORM и руководствам «Начало работы-Reactive» на Quarkus.io.

Итак, вот общая архитектура, которую я использую, на основе старого приложения давно ушедшего коллеги. Я знаю, что у меня это плохо получается, так как я только начал, так что извините, если на это ужасно смотреть:

KafkaConsumer анализирует ключ, вызывает соответствующий сервис. Служба использует внедренный RestClient для вызова API, построения объекта и попытки его сохранения.

KafkaConsumer:

package com.acme.processor;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.acme.model.Message;
import com.acme.resource.KafkaKeyParser;
import com.acme.service.*;

import jakarta.enterprise.context.ApplicationScoped;

import lombok.extern.slf4j.Slf4j;
import org.eclipse.microprofile.reactive.messaging.Incoming;

import io.smallrye.reactive.messaging.annotations.Blocking;
import io.smallrye.reactive.messaging.kafka.Record;
import jakarta.inject.Inject;
import java.util.Map;


@Slf4j
@ApplicationScoped
public class KafkaMessageConsumer {

    @Inject
    AccountContactEmailService accountEmailService;


    @Incoming("test")
    public Uni<Void> consume(KafkaRecord<String, Message> record) {
    //I parse my keys and this works fine...
    if httpEndpoint.contains("relevantInfo"):
        accountContactEmailService.storeId(key1);
    
    ...
    return Uni.createFrom().voidItem();

Наверное, я пытался сохранить большую часть логики и движущихся частей в сервисах: AccountContactEmailService

@ApplicationScoped
public class AccountContactEmailService {
    @Inject
    Mutiny.SessionFactory sessionFactory;

    @RestClient
    AccountContactEmailRestClient restClient;

    public Uni<Void> storeId(string key) {
        return restClient.getById(key)
                .onItem().transformToUni(data -> {
                    AccountContactEmail accountContactEmail = data.getAccountContactEmail();
                    return sessionFactory.withTransaction(
                            (session, transaction) -> session.persist(accountContactEmail)
                    );
                 })
                  .onFailure().invoke(e -> {
                      throw new RuntimeException(e);
                 });
        }
}

И мой RestClient:

@Path("/PrettySureMyPathingWorks")
@RegisterRestClient(configKey = "rest1")
@ClientBasicAuth(username = "user", password= "pass")
public interface AccountContactEmailRestClient {

    @GET
    Uni<AccountContactEmailResponse> getById(@QueryParam("id") String id);

А мои сущности помечены как сущности и определены как расширяющиеся PanacheEntity, поэтому для краткости я не буду ими делиться, но, возможно, проблема именно в этом! Если кто-нибудь ответит и нуждается в этих объектах, я тоже буду рад поделиться ими, если они понадобятся, вместе со свойствами pom/application.

Надеюсь, это имело хоть какой-то смысл, и я ценю всех, кто хочет помочь.

Стоит ли изучать PHP в 2023-2024 годах?
Стоит ли изучать PHP в 2023-2024 годах?
Привет всем, сегодня я хочу высказать свои соображения по поводу вопроса, который я уже много раз получал в своем сообществе: "Стоит ли изучать PHP в...
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
В JavaScript одним из самых запутанных понятий является поведение ключевого слова "this" в стрелочной и обычной функциях.
Приемы CSS-макетирования - floats и Flexbox
Приемы CSS-макетирования - floats и Flexbox
Здравствуйте, друзья-студенты! Готовы совершенствовать свои навыки веб-дизайна? Сегодня в нашем путешествии мы рассмотрим приемы CSS-верстки - в...
Тестирование функциональных ngrx-эффектов в Angular 16 с помощью Jest
В системе управления состояниями ngrx, совместимой с Angular 16, появились функциональные эффекты. Это здорово и делает код определенно легче для...
Концепция локализации и ее применение в приложениях React ⚡️
Концепция локализации и ее применение в приложениях React ⚡️
Локализация - это процесс адаптации приложения к различным языкам и культурным требованиям. Это позволяет пользователям получить опыт, соответствующий...
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
0
0
55
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Метод .storeId возвращает Uni<Void>, который вы игнорируете. Таким образом, приложение завершается, ничего не делая. Вот почему ничего не происходит.

Это зависит от вашего приложения, но код должен выглядеть примерно так:

    @Incoming("test")
    public Uni<Void> consume(KafkaRecord<String, Message> record) {
    //I parse my keys and this works fine...
    if (httpEndpoint.contains("relevantInfo")) {
        // You need to return the value here
        return accountContactEmailService
              .storeId(key1)
             // You can chain more uni here, if you need to. For example: .chain( ... )
             ;
    }
    
    ...
    return Uni.createFrom().voidItem();

Давиде, спасибо тебе огромное. Сейчас это кажется очевидным, но это мне очень помогает. Поскольку я все чаще использую Quarkus, я уверен, что когда-нибудь в будущем мне снова понадобится ваша помощь. Ваше здоровье.

SaxBuddha 22.05.2024 17:02

Другие вопросы по теме