Недавно я обновился до Kafka 1.1.0. Я пытаюсь создать модульные тесты для потребителя kafka. Для этой цели было бы идеально, если бы модульный тест мог создать тему, которую он использует для теста. Я нашел код, который, похоже, должен делать то, что я хочу. Однако, когда я запускаю его, он выдает исключение: java.lang.NoSuchMethodError: org.apache.kafka.common.utils.Utils.closeQuietly (Ljava / lang / AutoCloseable; Ljava / lang / String;) V
Вот код для создания темы, которую я нашел в сети:
@BeforeClass
public static void createTopic() {
try (final AdminClient adminClient = AdminClient.create(configure())) {
try {
// Define topic
NewTopic newTopic = new NewTopic("test-orders", 1, (short)1);
// Create topic, which is async call.
final CreateTopicsResult createTopicsResult = adminClient.createTopics(Collections.singleton(newTopic));
// Since the call is Async, Lets wait for it to complete.
createTopicsResult.values().get(ordersTopic).get();
} catch (InterruptedException | ExecutionException e) {
if (!(e.getCause() instanceof TopicExistsException)) {
throw new RuntimeException(e.getMessage(), e);
}
}
}
}
Однако при запуске возникает исключение.
java.lang.NoSuchMethodError: org.apache.kafka.common.utils.Utils.closeQuietly(Ljava/lang/AutoCloseable;Ljava/lang/String;)V
at org.apache.kafka.clients.admin.KafkaAdminClient.createInternal(KafkaAdminClient.java:334)
at org.apache.kafka.clients.admin.AdminClient.create(AdminClient.java:52)
at com.sial.notifications.topics.OrdersTopicsTests.createTopic(OrdersTopicsTests.java:162)
Единственные параметры конфигурации, которые я передаю ему, - это серверы начальной загрузки и client.id. Что я делаю неправильно? это кажется достаточно простым
Вы проверили это stackoverflow.com/questions/16946778/…




Этот немного измененный код работал у меня, когда я запускал его автономно с брокером 1.1.0:
public static void main(String[] args) {
final String ordersTopic = "test-orders";
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
try (final AdminClient adminClient = AdminClient.create(props)) {
try {
// Define topic
NewTopic newTopic = new NewTopic(ordersTopic, 1, (short)1);
// Create topic, which is async call.
final CreateTopicsResult createTopicsResult = adminClient.createTopics(Collections.singleton(newTopic));
// Since the call is Async, Lets wait for it to complete.
createTopicsResult.values().get(ordersTopic).get();
} catch (InterruptedException | ExecutionException e) {
if (!(e.getCause() instanceof TopicExistsException))
throw new RuntimeException(e.getMessage(), e);
}
}
}
Поскольку это очень похоже на ваш код и, исходя из ошибки, которую вы видите, возможно, вы не полностью разобрались с зависимостями с библиотеками Kafka? Я использовал артефакт Maven org.apache.kafka:kafka_2.12:1.1.0.
Спасибо. Проблема заключалась в том, что я использую Eclipse, и он держал в своем пути сборки jar-файлы старой версии kafka. Если бы я запускал модульные тесты из командной строки, это сработало бы.
Следуя этому комментарию, я тоже недавно обновился с Kafka-client, который был в версии 0.10, до версии 1.0 ... и столкнулся с этой ошибкой, старая версия все еще висела, как только я ее удалил, она сработала просто хорошо.
Гораздо более простой способ - просто настроить Kafka для автоматического создания любой используемой вами темы, которая еще не существует с
auto.create.topics.enable
установка для Кафки. При этом требуется дополнительный код нет для создания тем. Вы просто используете любое название темы, которое хотите, и Kafka создаст его для вас, когда вы его используете.
Вы получите эту ошибку, если ваш путь к классам настроен неправильно