Как программно создать тему с помощью Kafka 1.1.0

Недавно я обновился до Kafka 1.1.0. Я пытаюсь создать модульные тесты для потребителя kafka. Для этой цели было бы идеально, если бы модульный тест мог создать тему, которую он использует для теста. Я нашел код, который, похоже, должен делать то, что я хочу. Однако, когда я запускаю его, он выдает исключение: java.lang.NoSuchMethodError: org.apache.kafka.common.utils.Utils.closeQuietly (Ljava / lang / AutoCloseable; Ljava / lang / String;) V

Вот код для создания темы, которую я нашел в сети:

@BeforeClass
public static void createTopic() {
   try (final AdminClient adminClient = AdminClient.create(configure())) {
        try {
            // Define topic
            NewTopic newTopic = new NewTopic("test-orders", 1, (short)1);

            // Create topic, which is async call.
            final CreateTopicsResult createTopicsResult = adminClient.createTopics(Collections.singleton(newTopic));

            // Since the call is Async, Lets wait for it to complete.
            createTopicsResult.values().get(ordersTopic).get();
        } catch (InterruptedException | ExecutionException e) {
            if (!(e.getCause() instanceof TopicExistsException)) {
                throw new RuntimeException(e.getMessage(), e);
            }
        }
    }
}

Однако при запуске возникает исключение.

java.lang.NoSuchMethodError: org.apache.kafka.common.utils.Utils.closeQuietly(Ljava/lang/AutoCloseable;Ljava/lang/String;)V
at org.apache.kafka.clients.admin.KafkaAdminClient.createInternal(KafkaAdminClient.java:334)
at org.apache.kafka.clients.admin.AdminClient.create(AdminClient.java:52)
at com.sial.notifications.topics.OrdersTopicsTests.createTopic(OrdersTopicsTests.java:162)

Единственные параметры конфигурации, которые я передаю ему, - это серверы начальной загрузки и client.id. Что я делаю неправильно? это кажется достаточно простым

Вы получите эту ошибку, если ваш путь к классам настроен неправильно

OneCricketeer 11.07.2018 05:45

Вы проверили это stackoverflow.com/questions/16946778/…

Deadpool 11.07.2018 06:09
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
В компьютерном программировании биты играют важнейшую роль в представлении и манипулировании данными на двоичном уровне. Побитовые операции...
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Приходилось ли вам сталкиваться с требованиями, в которых вас могли попросить поднять тревогу или выдать ошибку, когда метод Java занимает больше...
Полный курс Java для разработчиков веб-сайтов и приложений
Полный курс Java для разработчиков веб-сайтов и приложений
Получите сертификат Java Web и Application Developer, используя наш курс.
1
2
2 574
2
Перейти к ответу Данный вопрос помечен как решенный

Ответы 2

Ответ принят как подходящий

Этот немного измененный код работал у меня, когда я запускал его автономно с брокером 1.1.0:

public static void main(String[] args) {
    final String ordersTopic = "test-orders";
    Properties props = new Properties();
    props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

    try (final AdminClient adminClient = AdminClient.create(props)) {
        try {
            // Define topic
            NewTopic newTopic = new NewTopic(ordersTopic, 1, (short)1);

            // Create topic, which is async call.
            final CreateTopicsResult createTopicsResult = adminClient.createTopics(Collections.singleton(newTopic));

            // Since the call is Async, Lets wait for it to complete.
            createTopicsResult.values().get(ordersTopic).get();
        } catch (InterruptedException | ExecutionException e) {
            if (!(e.getCause() instanceof TopicExistsException))
                throw new RuntimeException(e.getMessage(), e);
        }
    }
}

Поскольку это очень похоже на ваш код и, исходя из ошибки, которую вы видите, возможно, вы не полностью разобрались с зависимостями с библиотеками Kafka? Я использовал артефакт Maven org.apache.kafka:kafka_2.12:1.1.0.

Спасибо. Проблема заключалась в том, что я использую Eclipse, и он держал в своем пути сборки jar-файлы старой версии kafka. Если бы я запускал модульные тесты из командной строки, это сработало бы.

whomer 11.07.2018 17:17

Следуя этому комментарию, я тоже недавно обновился с Kafka-client, который был в версии 0.10, до версии 1.0 ... и столкнулся с этой ошибкой, старая версия все еще висела, как только я ее удалил, она сработала просто хорошо.

NateH06 20.08.2018 23:33

Гораздо более простой способ - просто настроить Kafka для автоматического создания любой используемой вами темы, которая еще не существует с

auto.create.topics.enable

установка для Кафки. При этом требуется дополнительный код нет для создания тем. Вы просто используете любое название темы, которое хотите, и Kafka создаст его для вас, когда вы его используете.

Другие вопросы по теме