Удалено 0 просроченных смещений за 1 миллисекунду. (kafka.coordinator.group.GroupMetadataManager)

Я новичок в кафке. я пытаюсь вызвать потребителю kafka через мою весеннюю загрузку и kafka-producer через мой терминал.

Раньше код работал нормально, но в последнее время я продолжаю получать эту форму журнала на стороне потребителя.

2019-04-22 11:56:39.415  INFO 10253 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka version : 2.0.1
2019-04-22 11:56:39.416  INFO 10253 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId : fa14705e51bd2ce5
2019-04-22 11:56:39.424  INFO 10253 --- [           main] org.apache.kafka.clients.Metadata        : Cluster ID: iR8AmeB9RC-eqS7rrFyYGw
2019-04-22 11:56:39.425  INFO 10253 --- [           main] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-2, groupId=com.stellapps.rtc] Discovered group coordinator akash-Lenovo-ideapad-330-15IKB:9092 (id: 2147483647 rack: null)
2019-04-22 11:56:39.426  INFO 10253 --- [           main] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-2, groupId=com.stellapps.rtc] Revoking previously assigned partitions []
2019-04-22 11:56:39.427  INFO 10253 --- [           main] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-2, groupId=com.stellapps.rtc] (Re-)joining group
2019-04-22 11:56:42.164  INFO 10253 --- [m.stellapps.rtc] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-1, groupId=com.stellapps.rtc] Attempt to heartbeat failed since group is rebalancing
2019-04-22 11:56:45.183  INFO 10253 --- [m.stellapps.rtc] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-1, groupId=com.stellapps.rtc] Attempt to heartbeat failed since group is rebalancing
2019-04-22 11:56:48.201  INFO 10253 --- [m.stellapps.rtc] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-1, groupId=com.stellapps.rtc] Attempt to heartbeat failed since group is rebalancing
2019-04-22 11:56:51.218  INFO 10253 --- [m.stellapps.rtc] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-1, groupId=com.stellapps.rtc] Attempt to heartbeat failed since group is rebalancing
2019-04-22 11:56:54.235  INFO 10253 --- [m.stellapps.rtc] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-1, groupId=com.stellapps.rtc] Attempt to heartbeat failed since group is rebalancing
2019-04-22 11:56:57.256  INFO 10253 --- [m.stellapps.rtc] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-1, groupId=com.stellapps.rtc] Attempt to heartbeat failed since group is rebalancing
2019-04-22 11:57:00.276  INFO 10253 --- [m.stellapps.rtc] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-1, groupId=com.stellapps.rtc] Attempt to heartbeat failed since group is rebalancing
2019-04-22 11:57:03.294  INFO 10253 --- [m.stellapps.rtc] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-1, groupId=com.stellapps.rtc] Attempt to heartbeat failed since group is rebalancing
2019-04-22 11:57:06.315  INFO 10253 --- [m.stellapps.rtc] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-1, groupId=com.stellapps.rtc] Attempt to heartbeat failed since group is rebalancing
2019-04-22 11:57:09.334  INFO 10253 --- [m.stellapps.rtc] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-1, groupId=com.stellapps.rtc] Attempt to heartbeat failed since group is rebalancing

а это со стороны производителя

2019-04-22 11:57:47,755] INFO [GroupMetadataManager brokerId=0] Removed 0 expired offsets in 1 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2019-04-22 12:00:38,119] INFO [GroupCoordinator 0]: Member consumer-1-c8b9d4fd-d2e3-45a3-8f9a-2e825a9a87bd in group com.stellapps.rtc has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
[2019-04-22 12:00:38,123] INFO [GroupCoordinator 0]: Stabilized group com.stellapps.rtc generation 11 (__consumer_offsets-30) (kafka.coordinator.group.GroupCoordinator)
[2019-04-22 12:00:48,124] INFO [GroupCoordinator 0]: Member consumer-2-8f068bb6-d78c-458f-9775-d6b13ca54b57 in group com.stellapps.rtc has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
[2019-04-22 12:00:48,125] INFO [GroupCoordinator 0]: Preparing to rebalance group com.stellapps.rtc in state PreparingRebalance with old generation 11 (__consumer_offsets-30) (reason: removing member consumer-2-8f068bb6-d78c-458f-9775-d6b13ca54b57 on heartbeat expiration) (kafka.coordinator.group.GroupCoordinator)
[2019-04-22 12:00:48,125] INFO [GroupCoordinator 0]: Group com.stellapps.rtc with generation 12 is now empty (__consumer_offsets-30) (kafka.coordinator.group.GroupCoordinator)
[2019-04-22 12:07:47,755] INFO [GroupMetadataManager brokerId=0] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)

потребитель не принимает никаких входных данных от производителя. Я попытался изменить значение опроса, но не напрасно, сообщение продолжает поступать или потребитель просто не принимает производителя формы ввода.

вот мой загрузочный код Spring.

import com.stellapps.rtc.reset.RTCInterpreter;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.stellapps.rtc.reset.json.DataParser;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

import javax.annotation.PostConstruct;

@Component
public class DataConsumer {

    @Autowired
    private RTCInterpreter interpret;


    private Consumer<Long, String> createConsumer(String topic) {
        final String BOOTSTRAP_SERVERS = "localhost:9092";
        final Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "com.stellapps.rtc"); // group Id of the consumer group (if a consumer
        // group exists).
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        // Create the consumer using props.
        final Consumer<Long, String> consumer = new KafkaConsumer<>(props);
        // Subscribe to the topic.
        consumer.subscribe(Collections.singletonList(topic));
        return consumer;
    }

   public void runConsumer(String topic) throws InterruptedException {
        try {
            final Consumer<Long, String> consumer = createConsumer(topic);

            final int giveUp = 1000;
            int noRecordsCount = 0;
            while (true) {
                final ConsumerRecords<Long, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));
                if (consumerRecords.count() == 0) {
                    noRecordsCount++;
                    if (noRecordsCount > giveUp)
                        break;
                    else
                        continue;
                }
                consumerRecords.forEach(record -> {
                    // System.out.printf(" %s\n", record.value().getClass().getName());
                    interpret.call(record.value());
                });
                consumer.commitAsync();
            }
            consumer.close();
            System.out.println("Kafka is closed");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @PostConstruct
    public void init() {
        try {
            Runnable r = new Runnable() {
                @Override
                public void run() {
                    System.out.println(Thread.currentThread().getName());

                    try {
                        runConsumer("consume");  
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }

                }
            };
            Thread run = new Thread(r);
            run.join();
            run.start();
        }
        catch (Exception e)
        {
            e.printStackTrace();
        }
    }
}

пожалуйста, помогите мне.

Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
В компьютерном программировании биты играют важнейшую роль в представлении и манипулировании данными на двоичном уровне. Побитовые операции...
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Приходилось ли вам сталкиваться с требованиями, в которых вас могли попросить поднять тревогу или выдать ошибку, когда метод Java занимает больше...
Полный курс Java для разработчиков веб-сайтов и приложений
Полный курс Java для разработчиков веб-сайтов и приложений
Получите сертификат Java Web и Application Developer, используя наш курс.
0
0
2 805
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

проблема заключалась в том, что я также вызывал Consumer из Application.java, поэтому у меня было 2 экземпляра Kafka consumer, но я ограничил размер своей группы до 1.

Проблема с производителем была связана со свойствами, которые я установил для производителя.

Извините за беспокойство.

Другие вопросы по теме