Невозможно прочитать одно и то же сообщение на одну и ту же тему несколькими пользователями

Публикую сообщение для topicX. Чтобы использовать это сообщение несколькими потребителями, я написал следующий ConsumerConfig (обратите внимание, что у каждого потребителя свой groupd_id.

@EnableKafka
@Configuration
public class ConsumerConfig {
    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Bean
    public Map<String, Object> consumerConfigs(String groupId) {
        Map<String, Object> props = new HashMap<>();
        props.put(BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(GROUP_ID_CONFIG, "my.groupid." + groupId);
        props.put(AUTO_OFFSET_RESET_CONFIG, "earliest");
        return props;
    }

    @Bean
    public ConsumerFactory<String, Car> consumerFactory(String groupId) {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs(groupId), new StringDeserializer(),
                new JsonDeserializer<>(Car.class));
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Car> consumerOneKafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Car> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory("one"));
        return factory;
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Car> consumerTwoKafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Car> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory("two"));
        return factory;
    }
}

А затем для конкретного потребителя я добавил следующую аннотацию (указал идентификатор группы и фабрику контейнеров)

@KafkaListener(id = "my.groupid.one", topics = "${app.topic.topicname}", containerFactory = "consumerOneKafkaListenerContainerFactory")

Но когда я публикую сообщение, его использует только один потребитель. Как я могу настроить кафку так, чтобы каждое опубликованное сообщение использовалось несколькими потребителями?

Я пробовал этот код с 1 разделом и разделами п (где n = количество потребителей), но у меня возникла эта проблема.

ОБНОВИТЬ

Вот мой класс продюсера темы

@Service
@Slf4j
public class TopicProducer {

    @Autowired
    private KafkaTemplate<String, TopicPayload> kafkaTemplate;

    public void send(String topicName, TopicPayload payload) {
        log.info("sending message = {} to topic = {}", payload, topicName);
        kafkaTemplate.send(topicName, payload);
    }

}

А вот и мой TopicProducerConfig

@Configuration
public class TopicProducerConfig {
    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        return props;
    }

    @Bean
    public ProducerFactory<String, TopicPayload> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    @Bean
    public KafkaTemplate<String, TopicPayload> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

Отправляю сообщение по телефону producer.send(topicName, payload).

Вы используете продюсера с ключом? Вы создаете разные типы сообщений, которые дают разные хеши / ключи?

Knight71 12.06.2018 15:01

Позвольте мне обновить вопрос, как я создаю сообщение msg.

Em Ae 12.06.2018 15:11

Я не вижу, чтобы у вас там два экземпляра фабрик-потребителей. Могли бы вы просто создать отдельные классы конфигурации для 2 потребителей, включая отдельную фабрику потребителей.

Indraneel Bende 17.06.2018 04:21
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Приходилось ли вам сталкиваться с требованиями, в которых вас могли попросить поднять тревогу или выдать ошибку, когда метод Java занимает больше...
Версия Java на основе версии загрузки
Версия Java на основе версии загрузки
Если вы зайдете на официальный сайт Spring Boot , там представлен start.spring.io , который упрощает создание проектов Spring Boot, как показано ниже.
Документирование API с помощью Swagger на Springboot
Документирование API с помощью Swagger на Springboot
В предыдущей статье мы уже узнали, как создать Rest API с помощью Springboot и MySql .
1
3
180
0

Другие вопросы по теме