извините, если это немного грубо. После нескольких дней попыток понять и реализовать концепции асинхронности в моем приложении Quarkus я как бы зашел в тупик, поэтому мне хотелось бы получить некоторую информацию.
Основная идея потока приложения:
Я воспринимаю поток сообщений Кафы. Ключом к этим сообщениям является конечная точка соответствующего внутреннего API, который я вызываю в нашей сети с помощью простого @GET
. (редко мне приходится использовать POST
, поэтому полезная нагрузка мне действительно нужна, но пока мы оставим ее простой). У меня есть около 20 API, которые я вызываю.
У меня есть классы POJO, которые моделируют JSON-файлы, возвращаемые из этих конечных точек, и в идеале они сохраняются в соответствующей таблице Oracle после того, как я их немного изменю/обновлю.
Мне удалось заставить весь этот процесс работать идеально с помощью простого стиля транзакций/блокировок, и я был очень доволен этим. Проблема возникла, когда я обнаружил, что могу получить только около 15 000 сохраненных сообщений в час и что мне действительно нужно нажать в несколько раз больше.
Я очень запутался, потому что в руководствах по кваркусам есть много очень похожих вещей, и для меня это как бы сочетается. Я продолжаю переходить от Hibernate-Reactive-Panache к Mutiny-Primer, к Hibernate-ORM и руководствам «Начало работы-Reactive» на Quarkus.io.
Итак, вот общая архитектура, которую я использую, на основе старого приложения давно ушедшего коллеги. Я знаю, что у меня это плохо получается, так как я только начал, так что извините, если на это ужасно смотреть:
KafkaConsumer анализирует ключ, вызывает соответствующий сервис. Служба использует внедренный RestClient для вызова API, построения объекта и попытки его сохранения.
KafkaConsumer
:
package com.acme.processor;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.acme.model.Message;
import com.acme.resource.KafkaKeyParser;
import com.acme.service.*;
import jakarta.enterprise.context.ApplicationScoped;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import io.smallrye.reactive.messaging.annotations.Blocking;
import io.smallrye.reactive.messaging.kafka.Record;
import jakarta.inject.Inject;
import java.util.Map;
@Slf4j
@ApplicationScoped
public class KafkaMessageConsumer {
@Inject
AccountContactEmailService accountEmailService;
@Incoming("test")
public Uni<Void> consume(KafkaRecord<String, Message> record) {
//I parse my keys and this works fine...
if httpEndpoint.contains("relevantInfo"):
accountContactEmailService.storeId(key1);
...
return Uni.createFrom().voidItem();
Наверное, я пытался сохранить большую часть логики и движущихся частей в сервисах: AccountContactEmailService
@ApplicationScoped
public class AccountContactEmailService {
@Inject
Mutiny.SessionFactory sessionFactory;
@RestClient
AccountContactEmailRestClient restClient;
public Uni<Void> storeId(string key) {
return restClient.getById(key)
.onItem().transformToUni(data -> {
AccountContactEmail accountContactEmail = data.getAccountContactEmail();
return sessionFactory.withTransaction(
(session, transaction) -> session.persist(accountContactEmail)
);
})
.onFailure().invoke(e -> {
throw new RuntimeException(e);
});
}
}
И мой RestClient
:
@Path("/PrettySureMyPathingWorks")
@RegisterRestClient(configKey = "rest1")
@ClientBasicAuth(username = "user", password= "pass")
public interface AccountContactEmailRestClient {
@GET
Uni<AccountContactEmailResponse> getById(@QueryParam("id") String id);
А мои сущности помечены как сущности и определены как расширяющиеся PanacheEntity
, поэтому для краткости я не буду ими делиться, но, возможно, проблема именно в этом! Если кто-нибудь ответит и нуждается в этих объектах, я тоже буду рад поделиться ими, если они понадобятся, вместе со свойствами pom/application.
Надеюсь, это имело хоть какой-то смысл, и я ценю всех, кто хочет помочь.
Метод .storeId
возвращает Uni<Void>
, который вы игнорируете.
Таким образом, приложение завершается, ничего не делая. Вот почему ничего не происходит.
Это зависит от вашего приложения, но код должен выглядеть примерно так:
@Incoming("test")
public Uni<Void> consume(KafkaRecord<String, Message> record) {
//I parse my keys and this works fine...
if (httpEndpoint.contains("relevantInfo")) {
// You need to return the value here
return accountContactEmailService
.storeId(key1)
// You can chain more uni here, if you need to. For example: .chain( ... )
;
}
...
return Uni.createFrom().voidItem();
Давиде, спасибо тебе огромное. Сейчас это кажется очевидным, но это мне очень помогает. Поскольку я все чаще использую Quarkus, я уверен, что когда-нибудь в будущем мне снова понадобится ваша помощь. Ваше здоровье.