Я использую следующий пример, чтобы использовать потребителя Spring Kafka для чтения сообщений. Мой вариант использования требует, чтобы каждый раз при создании сообщения слушатель читал его с самого начала.
@KafkaListener(
id = "grouplistener",
topicPartitions = {
@TopicPartition(
topic = "mycompactedtopic", partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "0")
)
}
)
public void onReceiving(
String payload, @Header(KafkaHeaders.OFFSET) Integer offset,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic
) {
log.info(
"Processing topic = {}, partition = {}, offset = {}, payload= {}",
topic, partition, offset, payload
);
}
Мне кажется, что я могу заставить его читать с самого начала при запуске приложения, а затем он обычно просто потребляет сообщения в будущем.
Есть ли способ заставить его каждый раз стремиться к началу?
Какова бизнес-цель этого? Суть Apache Kafka в том, что он не удаляет сообщения, поэтому вы будете каждый раз получать все записи из темы. Также у потребителя нет возможности узнать, что продукт отправил запись в тему. Промежуточное ПО для обмена сообщениями предназначено не для этого ...
@ cricket_007 да, именно так. Эта тема конкретно не будет содержать миллионы записей, это тема конфигурации, и поэтому мне нужно читать всю тему каждый раз, когда она используется.
Цель в том, и я ценю, что это не то, для чего был разработан Kafka, а то, что меня попросили реализовать, - это использовать сжатую тему с 1 разделом для хранения списка конфигураций. Затем это должно быть вызвано конечной точкой отдыха, и она должна отображать полный уникальный список конфигураций, уникальная часть отлично подходит для сортировки набора java, проблема, с которой я сталкиваюсь, заключается в том, что я реализую это с обычным потребителем , это невероятно медленно, так как он должен инициализировать потребителя и сбрасывать смещение. а со слушателем @Kafka он отображает только самое последнее сообщение.
use a compacted topic with 1 partition to hold a list of configurations. This then needs to be called by a rest end point and it should display a full unique list of the configurations
То, как вы должны реализовать это, - использовать потоки Kafka и KTable и настроить интерактивные запросы за вашим уровнем REST. Не стандартный потребитель, которому нужно перематывать себя, чтобы получить наиболее актуальное состояние системы.
Пример этого уже существует в платформе Kafka Connect, где у него есть тема конфигурации, и вы можете получить доступ только к самому последнему значению GET /connectors/name/config, и только если вы перезапустите его или масштабируете до большего количества экземпляров, он снова потребляет все сообщения . Реестр схем также является примером этого и хранит внутреннюю хэш-карту всех схем в теме _schemas и имеет REST API для чтения, вставки, удаления.
По сути, когда вы получаете новую конфигурацию для данного ключа, вы можете либо «заменить» старое значение для данного ключа совершенно новым, либо каким-то образом «объединить» старое значение с новыми данными.
Спасибо за ответ. Я исключил потоки Kafka, накладные расходы слишком высоки для того, что мне нужно, максимум 100-200 записей. Я посмотрел, как это делается в реестре схем, похоже, что он копирует тему в кеш памяти и обновляет ее по мере добавления новых сообщений. Я думаю, что буду использовать этот подход, только я скопирую их в h2 в mem db, а затем я смогу использовать стандартный spring crudrepository для взаимодействия с ним.
Не уверен, что вы имеете в виду под "накладными расходами". Одна из проблем с локальным хранением в памяти заключается в том, что если вы когда-нибудь масштабируете свое приложение, каждый экземпляр получит только частичные данные. В реестре схемы есть только одна секционированная тема и только один подходящий мастер, который принимает запросы, поэтому он может избежать этой проблемы. Если вы действительно хотите скопировать это, вот откуда берутся настоящие накладные расходы. Я считаю, что есть оболочка Spring Kafka Streams, если вы говорите о коде
Я думаю, вам следует попробовать написать ConsumerSeekAware Listener и искать смещение 0 каждый раз, когда вы читаете сообщение. Звучит как безумный обходной путь, но он может помочь. Надеюсь, что это поможет вам :-)
class Listener implements ConsumerSeekAware {
private final ThreadLocal<ConsumerSeekCallback> seekCallBack = new ThreadLocal<>();
----Override all methods that are needed----
@KafkaListener(...)
public void listen(@Payload String message) {
this.seekCallBack.get().seek(topic, partition, 0);
}
}
}
Спасибо, но это вызовет бесконечный цикл.
@ Nimo1981 Да, это заставит потребителя работать бесконечно и прислушиваться к любым новым добавленным записям, и каждый раз, когда потребитель будет начинать с начала. Разве это не предназначено? Извините, я мог неправильно понять, что вы имеете в виду под «бесконечным циклом». Вы хотите сказать, что потребитель никогда не будет читать и просто войдет в бесконечный цикл? Не могли бы вы уточнить?
@ Nimo1981 Итак, это реализация на простой Java. Я не уверен, что это соответствует вашим потребностям. Итак, в основном я фиксирую смещение 0 (то есть, даже если я читаю из темы Kafka, я возвращаюсь к смещению, которое находится в начале.) Я не уверен, рассматривали ли вы эту реализацию, но, пожалуйста, дайте мне знать, если это это то, что вы ищете
Оставьте CommitCountObj. Это вам не нужно. Таким образом, по умолчанию offsetMap будет иметь следующую запись смещения, подобную этой,
offsetMap.put (новый TopicPartition (record.topic (), record.partition ()), новый OffsetAndMetadata (record.offset () + 1, «какое-то сообщение об успешной фиксации»));
но для вашего варианта использования я как бы модифицировал, он хорошо работает, когда потребитель не перезапускается
offsetMap.put (новый TopicPartition (record.topic (), record.partition ()), новый OffsetAndMetadata (0, «фиксация не выполнена»));
public class KafkaConsumerClass {
private static final org.apache.log4j.Logger log = org.apache.log4j.Logger.getLogger(KafkaConsumerClass.class);
private CommitCountClass commitCountobj = new CommitCountClass();
public Consumer<String, List<FeedBackConsumerClass>> createConsumer() {
Map<String, Object> consumerProps = new HashMap<String, Object>();
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:7070,localhost:7072");
consumerProps.put(ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, 50000);
consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, "first_group-client1");
// consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "first_group");
// consumerProps.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, KafkaConsumerInterceptor.class);
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
consumerProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 15000);
consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
consumerProps.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1500);
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return new KafkaConsumer<String, List<FeedBackConsumerClass>>(consumerProps);
}
public void consumeRecord() {
log.info("Coming inside consumer consumer");
ArrayList<String> topicList = new ArrayList<String>();
topicList.add("topic1");
commitCountobj.setCount(0);
Consumer<String, List<FeedBackConsumerClass>> kafkaConsumer = createConsumer();
kafkaConsumer.subscribe(topicList);
log.info("after subscribing");
Map<TopicPartition, OffsetAndMetadata> offsetMap = new HashMap<>();
while (true) {
ConsumerRecords<String, List<FeedBackConsumerClass>> recordList = kafkaConsumer.poll(Long.MAX_VALUE);
// kafkaConsumer.seekToBeginning(kafkaConsumer.assignment());
log.info("Inside while loop:" + recordList);
if (!recordList.isEmpty()) {
recordList.forEach(record -> {
int i = 0;
System.out.println(record.toString());
// we can make the call to the API here
// call the db here or any API and process the record
// then call the code to commit
// since the commit is switched off, it becomes a developers responsibility to do the auto commit
offsetMap.put(new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(0, "no metadata/offset commited"));
// here we are incrementing the offsetMap so that we are making sure we are storing the
// next set of offsets in the map
if (commitCountobj.getCount() % 1000 == 0) {
kafkaConsumer.commitAsync(offsetMap, new OffsetCommitCallback() {
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets,
Exception exception) {
// TODO Auto-generated method stub
if (exception != null) {
// retry it now with a sync
// possibility of error occuring here as well
// so capture the exception and exit the consumer gracefully
kafkaConsumer.commitSync();
log.error(exception.getMessage());
}
}
});
}
commitCountobj.setCount(i++);
});
}
}
}
}
Я не закрыл потребителя, но вы можете попытаться закрыть потребителя после того, как он прочитает, помните, что вам нужно реализовать механизм, запускающий потребителя каждый раз, когда производитель создает запись, потому что Apache Kafka не делает этого самостоятельно.
Помните, что каждый раз закрывать и запускать потребителя не рекомендуется. И Kafka не стремится создавать / перезапускать / запускать потребителя каждый раз, когда создается запись. Это никогда не вариант использования, которого можно достичь только с помощью Kafka. Также не рекомендуется перезапускать / создавать новых потребителей для каждой новой созданной записи из-за накладных расходов на производительность. Однако он может гарантировать, что потребитель потребляет либо с самого начала, либо с самого раннего зафиксированного смещения, но все зависит от варианта использования.
Теперь, если вы не хотите использовать старую простую Java, вы можете использовать с ней springboot. Я наткнулся на это решение, и оно сработало как шарм - stackoverflow.com/questions/40352008/…
Вот как я это буду реализовывать. Вам необходимо реализовать интерфейс ConsumerSeekAware и выполнить некоторые реализации метода onPartitionsAssigned. Вы также можете сделать seekToBegining по запросу, если отправляете переменную среды при перезапуске приложения. Но я не реализовал это!
@Service
@EnableKafka
public class Service implements ConsumerSeekAware {
@KafkaListener(topics = "${topicName}", groupId = "${groupId}")
public void listen(@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts,
@Payload List<String> messageBatch
) {
//do a bunch of stuff
}
@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
String topic= Optional.ofNullable(System.getProperty(TOPIC_NAME)).orElseThrow(()->new RuntimeException("topicName needs to be set"));
assignments.keySet().stream().filter(partition->topic.equals(partition.topic()))
.forEach(partition -> callback.seekToBeginning(topic, partition.partition()));
}
@Override
public void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {}
@Override
public void registerSeekCallback(ConsumerSeekCallback callback) {}
}
@KafkaListener(topicPartitions
= @TopicPartition(topic = "test", partitionOffsets = {
@PartitionOffset(partition = "0", initialOffset = "0")}),groupId = "foo",
containerFactory = "kafkaListenerContainerFactory")
public void listenAllMsg(@Payload String message,@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
System.out.println(" all msg Received Messasge in group 'foo': " + message+"RECEIVED_PARTITION_ID - "+partition);
}
в кафке 2.3.1
Каждый раз, когда отправляется сообщение, вам нужно повторно использовать старые сообщения?