Micronaut-Kafka с auto.commit.enable=false: как зафиксировать смещение вручную

Я хочу зафиксировать сообщение, только если оно было успешно сохранено в базе данных.

Насколько я понимаю, я отключил автоматическую фиксацию для этого application.yml.

micronaut:
  application:
    name: demoGrpcKafka
  executors:
    consumer:
      type: fixed
      nThreads: 1
#kafka.bootstrap.servers: localhost:9092
kafka:
  bootstrap:
    servers: localhost:9092
  consumers:
    default:
      auto:
        commit:
          enable: false
  producers:
    #default:
    demo-producer:
      retries: 2

Потребитель

package com.tolearn.consumer

import io.micronaut.configuration.kafka.annotation.KafkaKey
import io.micronaut.configuration.kafka.annotation.KafkaListener
import io.micronaut.configuration.kafka.annotation.Topic

@KafkaListener(groupId = "myGroup")
class DemoConsumer {
    @Topic("testkey")
    fun receive(@KafkaKey key: String?,
                msg: String,
                offset: Long,
                partition: Int,
                topic: String,
                timestamp: Long
    ){
        println("Key = $key " +
                "msg = $msg " +
                "offset = $offset " +
                "partition = $partition " +
                "topic = $topic " +
                "timestamp = $timestamp ")

        //saved to database
        
        // THE ISSUE IS HERE: how commit like consumer.commitOffsets(true) ?????
    }
}

Другими словами, как мне либо commitOffset, либо commitSync(), либо любую другую альтернативу, чтобы зафиксировать сообщение вручную при использовании Micronaut-Kafka?

*** второе издание

Я вернулся к application.yaml

  consumers:
    default:
      auto:
        commit:
          enable: false

*** третье редактирование

Я попытался добавить либо io.micronaut.configuration.kafka.Acknowledgement (устарело), ​​либо импортировать io.micronaut.messaging.Acknowledgement, и любой из них вызвал

com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'name': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')

Кажется, мне нужно сделать что-то еще, чтобы Micronaut внедрил такой объект подтверждения. Что мне не хватает ниже?

package com.tolearn.consumer

import io.micronaut.configuration.kafka.Acknowledgement
import io.micronaut.configuration.kafka.annotation.KafkaKey
import io.micronaut.configuration.kafka.annotation.KafkaListener
import io.micronaut.configuration.kafka.annotation.OffsetStrategy
import io.micronaut.configuration.kafka.annotation.Topic
//import io.micronaut.messaging.Acknowledgement
//import io.micronaut.messaging.annotation.Header

@KafkaListener(
        groupId = "myGroup",
        offsetStrategy=OffsetStrategy.SYNC_PER_RECORD
)
class DemoConsumer {
    @Topic("demotopic")
    fun receive(@KafkaKey key: String?,
                acknowledgement: Acknowledgement,
                msg: String,
                offset: Long,
                partition: Int,
                topic: String,
                timestamp: Long
                //,header: Header
    ){
        println("Key = $key " +
                "msg = $msg " +
                "offset = $offset " +
                "partition = $partition " +
                "topic = $topic " +
                "timestamp = $timestamp "
                // + "header = $header"
        )

        //saved to database

        // how commit like consumer.commitOffsets(true)
        //Consumer.commitSync()

        acknowledgement.ack();
    }
}

Весь журнал

18:13:13.812 [consumer-executor-thread-1] ERROR i.m.c.k.e.KafkaListenerExceptionHandler - Kafka consumer [com.tolearn.consumer.DemoConsumer@17e970dd] failed to deserialize value: Error deserializing key/value for partition demotopic-0 at offset 4. If needed, please seek past the record to continue consumption.
org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition demotopic-0 at offset 4. If needed, please seek past the record to continue consumption.
Caused by: io.micronaut.core.serialize.exceptions.SerializationException: Error deserializing object from JSON: Unrecognized token 'name': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
 at [Source: (byte[])"name: "Hello"
"; line: 1, column: 6]
    at io.micronaut.jackson.serialize.JacksonObjectSerializer.deserialize(JacksonObjectSerializer.java:73)
    at io.micronaut.configuration.kafka.serde.JsonSerde.deserialize(JsonSerde.java:82)
    at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60)
    at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1365)
    at org.apache.kafka.clients.consumer.internals.Fetcher.access$3400(Fetcher.java:130)
    at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1596)
    at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1700(Fetcher.java:1432)
    at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:684)
    at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:635)
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1308)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1237)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210)
    at io.micronaut.configuration.kafka.processor.KafkaConsumerProcessor.lambda$process$8(KafkaConsumerProcessor.java:396)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'name': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
 at [Source: (byte[])"name: "Hello"
"; line: 1, column: 6]
    at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1851)
    at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:717)
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3588)
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3564)
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._matchToken2(UTF8StreamJsonParser.java:2899)
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._matchNull(UTF8StreamJsonParser.java:2870)
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:844)
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:757)
    at com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:4664)
    at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4513)
    at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3529)
    at io.micronaut.jackson.serialize.JacksonObjectSerializer.deserialize(JacksonObjectSerializer.java:71)
    ... 18 common frames omitted
18:13:13.812 [consumer-executor-thread-1] INFO  o.a.k.clients.consumer.KafkaConsumer - [Consumer clientId=demo-grpc-kafka-demo-consumer, groupId=myGroup] Seeking to offset 5 for partition demotopic-0
Построение конвейеров данных в реальном времени с Apache Kafka: Руководство по Python
Построение конвейеров данных в реальном времени с Apache Kafka: Руководство по Python
Apache Kafka - популярная платформа распределенной потоковой передачи данных, которую можно использовать для построения конвейеров данных в реальном...
0
0
1 425
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Согласно документации вы можете установить offsetStrategy в аннотации KafkaListener, например

@KafkaListener(groupId = "myGroup", offsetStrategy=OffsetStrategy.SYNC)
class DemoConsumer {
    @Topic("testkey")
    fun receive(@KafkaKey key: String?,
[...]

к одному из следующих вариантов:

ASYNC: Asynchronously commit offsets using Consumer.commitAsync() after each batch of messages is processed.
ASYNC_PER_RECORD: Asynchronously commit offsets using Consumer.commitSync() after each ConsumerRecord is consumed.
AUTO: Automatically commit offsets with the Consumer.poll(long) loop.
DISABLED: Do not commit offsets.
SYNC: Synchronously commit offsets using Consumer.commitSync() after each batch of messages is processed.
SYNC_PER_RECORD: Synchronously commit offsets using Consumer.commitSync() after each ConsumerRecord is consumed.

Если я правильно понял ваш вопрос, вы хотите установить его на SYNC.

спасибо, вы правильно поняли мой вопрос. Что такое Consumer из Consumer.commitSync()? IntelliJ предложил org.apache.kafka.clients.consumer.Consumer, но пропустил commitSync. Можете ли вы предоставить код фрагмента?

Jim C 11.12.2020 10:16

Я понимаю документацию, что вам не нужно специально вызывать Consumer.commitSync() самостоятельно, но это происходит за кулисами при использовании параметра offsetStrategy в вашем KafkaListener, как показано в моем ответе.

Michael Heil 11.12.2020 10:25

вам все равно нужно отключить автоматическую фиксацию в вашем файле application.yaml.

Michael Heil 11.12.2020 10:27

Ну, это почти то же самое, что сказать: «ВЫ НЕ МОЖЕТЕ СДЕЛАТЬ ВРУЧНУЮ». Я хочу, чтобы в случае неудачи сохранения в базе данных избегать фиксации на какое-то время и через некоторое время, если все еще не удается сохранить, я повторно отправлю в Death Letter Queue, а затем Вы уверены, что невозможно удерживать фиксацию?

Jim C 11.12.2020 10:29

Несколько минут назад здесь был ответ, указывающий на micronaut-projects.github.io/micronaut-kafka/latest/guide/…. Я проверяю это руководство. Внезапно ответ исчез.

Jim C 11.12.2020 21:28

Я предполагаю, что этот ответ был ответом только по ссылке, который сообщество рассмотрело для удаления.

Michael Heil 11.12.2020 21:30

Я внимательно изучил документацию, добавив «подтверждение: подтверждение», и теперь я получаю «com.fasterxml.jackson.core.JsonParseException». Должен ли я сделать что-то еще, чтобы внедрить объект подтверждения? Пожалуйста, смотрите выше весь журнал и потребительский код

Jim C 11.12.2020 22:19

Виноват. Я исправил, изменив положение параметра подтверждения на конец после параметра msg.

Jim C 11.12.2020 23:05

Другие вопросы по теме