Запустите производитель Kafka-реактора SpringBoot 3.2.4 с собственным образом graalVM 21.
Простой проект производителя kafka-реактора SpringBoot отлично работает как неродной образ. Образ создавался много раз, работал без проблем с низкой и высокой нагрузкой.
Теперь я пытаюсь преобразовать задание в собственный образ.
Я ожидаю, что тот же процесс также будет построен и будет работать нормально.
Хотя сборка собственного образа происходит без проблем, во время выполнения кажется, что существует проблема, специфичная для собственного образа graalVM.
Трассировка стека ошибок:
reactor.core.Exceptions$ErrorCallbackNotImplemented: org.apache.kafka.common.KafkaException: Failed to construct kafka producer
Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka producer
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:459)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:287)
at reactor.kafka.sender.internals.ProducerFactory.createProducer(ProducerFactory.java:34)
at reactor.kafka.sender.internals.DefaultKafkaSender.lambda$new$2(DefaultKafkaSender.java:102)
at reactor.core.publisher.MonoCallable.call(MonoCallable.java:72)
at reactor.core.publisher.FluxSubscribeOnCallable$CallableSubscribeOnSubscription.run(FluxSubscribeOnCallable.java:228)
at reactor.core.scheduler.ImmediateScheduler.schedule(ImmediateScheduler.java:52)
at reactor.core.publisher.MonoSubscribeOnCallable.subscribe(MonoSubscribeOnCallable.java:52)
at reactor.core.publisher.MonoCacheTime.subscribeOrReturn(MonoCacheTime.java:143)
at reactor.core.publisher.Flux.subscribe(Flux.java:8825)
at reactor.core.publisher.Flux.subscribeWith(Flux.java:8961)
at reactor.core.publisher.Flux.subscribe(Flux.java:8805)
at reactor.core.publisher.Flux.subscribe(Flux.java:8729)
at reactor.core.publisher.Flux.subscribe(Flux.java:8647)
at com.Service.run(Service.java:51)
at org.springframework.boot.SpringApplication.lambda$callRunner$5(SpringApplication.java:790)
at org.springframework.util.function.ThrowingConsumer$1.acceptWithException(ThrowingConsumer.java:83)
at org.springframework.util.function.ThrowingConsumer.accept(ThrowingConsumer.java:60)
at org.springframework.util.function.ThrowingConsumer$1.accept(ThrowingConsumer.java:88)
at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:798)
at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:789)
at org.springframework.boot.SpringApplication.lambda$callRunners$3(SpringApplication.java:774)
at [email protected]/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
at [email protected]/java.util.stream.SortedOps$SizedRefSortingSink.end(SortedOps.java:357)
at [email protected]/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:510)
at [email protected]/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
at [email protected]/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:151)
at [email protected]/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:174)
at [email protected]/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at [email protected]/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:596)
at org.springframework.boot.SpringApplication.callRunners(SpringApplication.java:774)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:341)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1354)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1343)
at com.Application.main(Application.java:18)
at [email protected]/java.lang.invoke.LambdaForm$DMH/sa346b79c.invokeStaticInit(LambdaForm$DMH)
Caused by: org.apache.kafka.common.KafkaException: Could not find a public no-argument constructor for io.confluent.kafka.serializers.KafkaJsonSerializer
at org.apache.kafka.common.utils.Utils.newInstance(Utils.java:399)
at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:395)
at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:434)
at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:419)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:385)
... 35 common frames omitted
Caused by: java.lang.NoSuchMethodException: io.confluent.kafka.serializers.KafkaJsonSerializer.<init>()
at [email protected]/java.lang.Class.checkMethod(DynamicHub.java:1075)
at [email protected]/java.lang.Class.getConstructor0(DynamicHub.java:1238)
at [email protected]/java.lang.Class.getDeclaredConstructor(DynamicHub.java:2930)
at org.apache.kafka.common.utils.Utils.newInstance(Utils.java:397)
... 39 common frames omitted
Вот фрагмент конфигурации:
@Bean
public KafkaSender<String, Log> kafkaSender(final MeterRegistry meterRegistry, final ObservationRegistry observationRegistry) {
final Map<String, Object> properties = new HashMap<>();
properties.put(SSL_PROTOCOL, SSL_VALUE);
properties.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, keyStoreLocation);
properties.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, keyStorePassphrase);
properties.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, trustStoreLocation);
properties.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, trustStorePassphrase);
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, io.confluent.kafka.serializers.KafkaJsonSerializer.class);
final SenderOptions<String, Log> senderOptions = SenderOptions.create(properties);
return KafkaSender.create(senderOptions
.producerListener(new MicrometerProducerListener(meterRegistry))
.withObservation(observationRegistry));
}
Кажется, нужно манипулировать регистраторами, некоторым config.json, чтобы исправить проблему, связанную с graalVM.
Что необходимо для устранения этой проблемы?
согласен с вами, поэтому я очень озадачен. Эта трассировка стека довольно запутанная
Пожалуйста, поделитесь, где вы настраиваете сериализатор. Из документации видно, что этого класса не существует docs.confluent.io/platform/current/schema-registry/fundamentals/…
Конечно, обновил конфигурацию производителя. И мы действительно используем KafkaJsonSerializer из confluent, а не из Spring-Kafka.
Хм. Я думаю, если вы используете класс напрямую и не передаете строку в свойства, она должна быть допустимой. Но мне все равно интересно, что будет, если попробовать весенний? Я не уверен, почему graal удалил/исключил конструктор по умолчанию.
Итак, вы правы. Если я поменяю все это с confluence на Spring-Kafka, это сработает. Так что в каком-то смысле вы правы и решили проблему. Но наша команда требует использования слияния, поэтому я все еще надеюсь получить решение.
Они должны работать одинаково для обычного JSON. Обычно Confluent полезны только при использовании с их реестром схемы. Кстати, здесь вы можете найти похожую на вашу ошибку. Возможно настроить allDeclaredConstructors
stackoverflow.com/a/57997082/2308683
Это работает. Если вы хотите добавить это в качестве ответа, я приму. К сожалению, теперь мне приходится нести этот файл, [ { "name": "io.confluent.kafka.serializers.KafkaJsonSerializer", "allDeclaredConstructors": true } ]
Kafka использует отражение для поиска различных классов по свойствам, например сериализаторам.
Вам нужно будет определить файл конфигурации, чтобы включить определенные вызовы отражения, например, для получения конструкторов.
[
{
"name": "io.confluent.kafka.serializers.KafkaJsonSerializer",
"allDeclaredConstructors": true
}
]
Вы уверены, что конфигурация правильная? Spring имеет KafkaJsonSerializer, а не Confluent... Confluent имеет сериализатор Jsonschema