Я хочу зафиксировать сообщение, только если оно было успешно сохранено в базе данных.
Насколько я понимаю, я отключил автоматическую фиксацию для этого application.yml.
micronaut:
application:
name: demoGrpcKafka
executors:
consumer:
type: fixed
nThreads: 1
#kafka.bootstrap.servers: localhost:9092
kafka:
bootstrap:
servers: localhost:9092
consumers:
default:
auto:
commit:
enable: false
producers:
#default:
demo-producer:
retries: 2
Потребитель
package com.tolearn.consumer
import io.micronaut.configuration.kafka.annotation.KafkaKey
import io.micronaut.configuration.kafka.annotation.KafkaListener
import io.micronaut.configuration.kafka.annotation.Topic
@KafkaListener(groupId = "myGroup")
class DemoConsumer {
@Topic("testkey")
fun receive(@KafkaKey key: String?,
msg: String,
offset: Long,
partition: Int,
topic: String,
timestamp: Long
){
println("Key = $key " +
"msg = $msg " +
"offset = $offset " +
"partition = $partition " +
"topic = $topic " +
"timestamp = $timestamp ")
//saved to database
// THE ISSUE IS HERE: how commit like consumer.commitOffsets(true) ?????
}
}
Другими словами, как мне либо commitOffset, либо commitSync(), либо любую другую альтернативу, чтобы зафиксировать сообщение вручную при использовании Micronaut-Kafka?
*** второе издание
Я вернулся к application.yaml
consumers:
default:
auto:
commit:
enable: false
*** третье редактирование
Я попытался добавить либо io.micronaut.configuration.kafka.Acknowledgement (устарело), либо импортировать io.micronaut.messaging.Acknowledgement, и любой из них вызвал
com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'name': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
Кажется, мне нужно сделать что-то еще, чтобы Micronaut внедрил такой объект подтверждения. Что мне не хватает ниже?
package com.tolearn.consumer
import io.micronaut.configuration.kafka.Acknowledgement
import io.micronaut.configuration.kafka.annotation.KafkaKey
import io.micronaut.configuration.kafka.annotation.KafkaListener
import io.micronaut.configuration.kafka.annotation.OffsetStrategy
import io.micronaut.configuration.kafka.annotation.Topic
//import io.micronaut.messaging.Acknowledgement
//import io.micronaut.messaging.annotation.Header
@KafkaListener(
groupId = "myGroup",
offsetStrategy=OffsetStrategy.SYNC_PER_RECORD
)
class DemoConsumer {
@Topic("demotopic")
fun receive(@KafkaKey key: String?,
acknowledgement: Acknowledgement,
msg: String,
offset: Long,
partition: Int,
topic: String,
timestamp: Long
//,header: Header
){
println("Key = $key " +
"msg = $msg " +
"offset = $offset " +
"partition = $partition " +
"topic = $topic " +
"timestamp = $timestamp "
// + "header = $header"
)
//saved to database
// how commit like consumer.commitOffsets(true)
//Consumer.commitSync()
acknowledgement.ack();
}
}
Весь журнал
18:13:13.812 [consumer-executor-thread-1] ERROR i.m.c.k.e.KafkaListenerExceptionHandler - Kafka consumer [com.tolearn.consumer.DemoConsumer@17e970dd] failed to deserialize value: Error deserializing key/value for partition demotopic-0 at offset 4. If needed, please seek past the record to continue consumption.
org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition demotopic-0 at offset 4. If needed, please seek past the record to continue consumption.
Caused by: io.micronaut.core.serialize.exceptions.SerializationException: Error deserializing object from JSON: Unrecognized token 'name': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
at [Source: (byte[])"name: "Hello"
"; line: 1, column: 6]
at io.micronaut.jackson.serialize.JacksonObjectSerializer.deserialize(JacksonObjectSerializer.java:73)
at io.micronaut.configuration.kafka.serde.JsonSerde.deserialize(JsonSerde.java:82)
at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60)
at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1365)
at org.apache.kafka.clients.consumer.internals.Fetcher.access$3400(Fetcher.java:130)
at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1596)
at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1700(Fetcher.java:1432)
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:684)
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:635)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1308)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1237)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210)
at io.micronaut.configuration.kafka.processor.KafkaConsumerProcessor.lambda$process$8(KafkaConsumerProcessor.java:396)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'name': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
at [Source: (byte[])"name: "Hello"
"; line: 1, column: 6]
at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1851)
at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:717)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3588)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3564)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._matchToken2(UTF8StreamJsonParser.java:2899)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._matchNull(UTF8StreamJsonParser.java:2870)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:844)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:757)
at com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:4664)
at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4513)
at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3529)
at io.micronaut.jackson.serialize.JacksonObjectSerializer.deserialize(JacksonObjectSerializer.java:71)
... 18 common frames omitted
18:13:13.812 [consumer-executor-thread-1] INFO o.a.k.clients.consumer.KafkaConsumer - [Consumer clientId=demo-grpc-kafka-demo-consumer, groupId=myGroup] Seeking to offset 5 for partition demotopic-0
Согласно документации вы можете установить offsetStrategy
в аннотации KafkaListener
, например
@KafkaListener(groupId = "myGroup", offsetStrategy=OffsetStrategy.SYNC)
class DemoConsumer {
@Topic("testkey")
fun receive(@KafkaKey key: String?,
[...]
к одному из следующих вариантов:
ASYNC: Asynchronously commit offsets using Consumer.commitAsync() after each batch of messages is processed.
ASYNC_PER_RECORD: Asynchronously commit offsets using Consumer.commitSync() after each ConsumerRecord is consumed.
AUTO: Automatically commit offsets with the Consumer.poll(long) loop.
DISABLED: Do not commit offsets.
SYNC: Synchronously commit offsets using Consumer.commitSync() after each batch of messages is processed.
SYNC_PER_RECORD: Synchronously commit offsets using Consumer.commitSync() after each ConsumerRecord is consumed.
Если я правильно понял ваш вопрос, вы хотите установить его на SYNC
.
Я понимаю документацию, что вам не нужно специально вызывать Consumer.commitSync() самостоятельно, но это происходит за кулисами при использовании параметра offsetStrategy в вашем KafkaListener, как показано в моем ответе.
вам все равно нужно отключить автоматическую фиксацию в вашем файле application.yaml.
Ну, это почти то же самое, что сказать: «ВЫ НЕ МОЖЕТЕ СДЕЛАТЬ ВРУЧНУЮ». Я хочу, чтобы в случае неудачи сохранения в базе данных избегать фиксации на какое-то время и через некоторое время, если все еще не удается сохранить, я повторно отправлю в Death Letter Queue, а затем Вы уверены, что невозможно удерживать фиксацию?
Несколько минут назад здесь был ответ, указывающий на micronaut-projects.github.io/micronaut-kafka/latest/guide/…. Я проверяю это руководство. Внезапно ответ исчез.
Я предполагаю, что этот ответ был ответом только по ссылке, который сообщество рассмотрело для удаления.
Я внимательно изучил документацию, добавив «подтверждение: подтверждение», и теперь я получаю «com.fasterxml.jackson.core.JsonParseException». Должен ли я сделать что-то еще, чтобы внедрить объект подтверждения? Пожалуйста, смотрите выше весь журнал и потребительский код
Виноват. Я исправил, изменив положение параметра подтверждения на конец после параметра msg.
спасибо, вы правильно поняли мой вопрос. Что такое Consumer из Consumer.commitSync()? IntelliJ предложил org.apache.kafka.clients.consumer.Consumer, но пропустил commitSync. Можете ли вы предоставить код фрагмента?