Как настроить политику хранения тем кафки во время создания с помощью Spring?

Мне нужно настроить политику хранения определенной темы во время создания. Я пытался найти решение, я мог найти только команду изменения уровня команды, как показано ниже.

./bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic my-topic --config retention.ms=1680000

Может ли кто-нибудь сообщить мне, как настроить его во время создания, что-то вроде конфигурации файла xml или свойств в spring-mvc.

Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
В компьютерном программировании биты играют важнейшую роль в представлении и манипулировании данными на двоичном уровне. Побитовые операции...
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Приходилось ли вам сталкиваться с требованиями, в которых вас могли попросить поднять тревогу или выдать ошибку, когда метод Java занимает больше...
Полный курс Java для разработчиков веб-сайтов и приложений
Полный курс Java для разработчиков веб-сайтов и приложений
Получите сертификат Java Web и Application Developer, используя наш курс.
9
0
5 324
3
Перейти к ответу Данный вопрос помечен как решенный

Ответы 3

Я думаю, вы могли бы использовать клиент администратора (https://kafka.apache.org/22/javadoc/index.html?org/apache/kafka/clients/admin/AdminClient.html) для этого. Вы можете создать экземпляр клиента администратора в своем приложении и использовать команду создания или изменения темы для управления конфигурациями темы, включая сохранение.

Ответ принят как подходящий

Spring Kafka позволяет создавать новые темы, объявляя @Beans в контексте вашего приложения. Для этого потребуется bean-компонент типа KafkaAdmin в контексте приложения, который будет создан автоматически при использовании Spring Boot. Вы можете определить свою тему следующим образом:

@Bean
public NewTopic myTopic() {
    return TopicBuilder.name("my-topic")
            .partitions(4)
            .replicas(3)
            .config(TopicConfig.RETENTION_MS_CONFIG, "1680000")
            .build();
}

Если вы не используете Spring Boot, вам дополнительно потребуется определить bean-компонент KafkaAdmin:

@Bean
public KafkaAdmin admin() {
    Map<String, Object> configs = new HashMap<>();
    configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
    return new KafkaAdmin(configs);
}

Если вы хотите отредактировать конфигурацию существующей темы, вам придется использовать AdminClient, вот фрагмент, чтобы изменить retention.ms на уровне темы:

Map<String, Object> config = new HashMap<>();                
config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
                         
AdminClient client = AdminClient.create(config);
                         
ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, "new-topic");
            
// Update the retention.ms value
ConfigEntry retentionEntry = new ConfigEntry(TopicConfig.RETENTION_MS_CONFIG, "1680000");
Map<ConfigResource, Config> updateConfig = new HashMap<>();
updateConfig.put(resource, new Config(Collections.singleton(retentionEntry)));

AlterConfigOp op = new AlterConfigOp(retentionEntry, AlterConfigOp.OpType.SET);
Map<ConfigResource, Collection<AlterConfigOp>> configs = new HashMap<>(1);
configs.put(resource, Arrays.asList(op));

AlterConfigsResult alterConfigsResult = client.incrementalAlterConfigs(configs);
        alterConfigsResult.all();

Конфигурация может быть настроена автоматически с помощью этого @PostConstruct метода, который использует NewTopic bean-компоненты.


    @Autowired
    private Set<NewTopic> topics;

    @PostConstruct
    public void reconfigureTopics() throws ExecutionException, InterruptedException {

        try (final AdminClient adminClient = AdminClient.create(Map.of(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers))) {
            adminClient.incrementalAlterConfigs(topics.stream()
                .filter(topic -> topic.configs() != null)
                .collect(Collectors.toMap(
                    topic -> new ConfigResource(ConfigResource.Type.TOPIC, topic.name()),
                    topic -> topic.configs().entrySet()
                        .stream()
                        .map(e -> new ConfigEntry(e.getKey(), e.getValue()))
                        .peek(ce -> log.debug("configuring {} {} = {}", topic.name(), ce.name(), ce.value()))
                        .map(ce -> new AlterConfigOp(ce, AlterConfigOp.OpType.SET))
                        .collect(Collectors.toList())
                )))
                .all()
                .get();
        }

    }

Спасибо за образец кода. @Sergi, выдает ли изменитьКонфигурсРезультат.все() какое-либо исключение, если мы пытаемся изменить сохранение несуществующей темы? Как мы можем узнать, изменяем ли мы существующую тему?

Rachan R K 13.06.2019 07:42

Обновление конфигурации существующей темы не вызывает никаких исключений. Вы можете использовать метод descriptionConfigs, чтобы получить текущую конфигурацию существующей темы.

Sergi Almar 13.06.2019 09:57

Чтобы создать тему с помощью AdminClient программно с указанным временем хранения, сделайте следующее:

NewTopic topic = new NewTopic(topicName, numPartitions, replicationFactor);
topic.configs(Map.of(TopicConfig.RETENTION_MS_CONFIG, retentionMs.toString()));
adminClient.createTopics(List.of(topic));

Другие вопросы по теме