Я учусь с помощью тестового потребителя и производителя Kafka, но сталкиваюсь с ошибкой ниже.
Потребительская программа Kafka:
package kafka001;
import java.util.Arrays;
import java.util.Properties;
import java.util.Scanner;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.errors.WakeupException;
public class ConsumerApp {
private static Scanner in;
private static boolean stop = false;
public static void main(String[] args) throws Exception {
System.out.println(args[0] + args.length);
if (args.length != 2) {
System.err.printf("Usage: %s <topicName> <groupId>\n");
System.exit(-1);
}
in = new Scanner(System.in);
String topicName = args[0];
String groupId = args[1];
ConsumerThread consumerRunnable = new ConsumerThread(topicName, groupId);
consumerRunnable.start();
//System.out.println("Here");
String line = "";
while (!line.equals("exit")) {
line = in.next();
}
consumerRunnable.getKafkaConsumer().wakeup();
System.out.println("Stopping consumer now.....");
consumerRunnable.join();
}
private static class ConsumerThread extends Thread{
private String topicName;
private String groupId;
private KafkaConsumer<String,String> kafkaConsumer;
public ConsumerThread(String topicName, String groupId){
//System.out.println("inside ConsumerThread constructor");
this.topicName = topicName;
this.groupId = groupId;
}
public void run() {
//System.out.println("inside run");
// Setup Kafka producer properties
Properties configProperties = new Properties();
configProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "aup7727s.unix.anz:9092");
configProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
configProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
configProperties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
configProperties.put(ConsumerConfig.CLIENT_ID_CONFIG, "simple");
// subscribe to topic
kafkaConsumer = new KafkaConsumer<String, String>(configProperties);
kafkaConsumer.subscribe(Arrays.asList(topicName));
// Get/process messages from topic and print it to console
try {while(true) {
ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.println(record.value());
}
} catch(WakeupException ex) {
System.out.println("Exception caught " + ex.getMessage());
}finally {
kafkaConsumer.close();
System.out.println("After closing KafkaConsumer");
}
}
public KafkaConsumer<String,String> getKafkaConsumer(){
return this.kafkaConsumer;
}
}
}
Когда я компилирую код, я замечаю следующие файлы классов:
ConsumerApp $ ConsumerThread.class и ConsumerApp.class
Я создал файл jar с именем ConsumerApp.jar через eclipse, и когда я запускаю его в кластере Hadoop, я получаю ошибку noclassdeffound, как показано ниже:
java -cp ConsumerApp.jar kafka001/ConsumerApp console1 group1
or
hadoop jar ConsumerApp.jar console1 group1
Exception in thread "main" java.lang.NoClassDefFoundError: org.apache.kafka.common.errors.WakeupException
at kafka001.ConsumerApp.main(ConsumerApp.java:24)
Caused by: java.lang.ClassNotFoundException: org.apache.kafka.common.errors.WakeupException
at java.net.URLClassLoader.findClass(URLClassLoader.java:607)
at java.lang.ClassLoader.loadClassHelper(ClassLoader.java:846)
at java.lang.ClassLoader.loadClass(ClassLoader.java:825)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:325)
at java.lang.ClassLoader.loadClass(ClassLoader.java:805)
... 1 more
Я использую Eclipse для компиляции, сборки maven и создания файла jar. Строка номер 24 соответствует созданию экземпляра ConsumerThread.
Я не могу решить, связано ли это с неправильным сохранением имени класса ConsumerThread (файл класса сгенерирован как ConsumerApp $ ConsumerThread.class вместо ConsumerThread.class)? или что-то, что нужно позаботиться при создании файла jar?
NoClassDefFoundError означает, что класс недоступен в вашем пути к классам. Вы указали путь к классам с помощью флага -cp, и он содержит только указанную вами jar. Вы должны, по крайней мере, добавить также и банки с хадупом. Примечание: вы можете использовать команду maven copy-dependency для сбора всех необходимых jar-файлов.




Поскольку я не могу просмотреть весь проект, я бы попробовал следующее: щелкните проект правой кнопкой мыши -> перейдите к инструментам Maven 2 -> щелкните сгенерировать артефакты (проверьте наличие обновлений). Это должно создать любые недостающие зависимости. Также не забудьте проверить другие похожие сообщения, которые могут решить вашу проблему, например это.
Вы не должны использовать
hadoop jarдля запуска кода, отличного от Hadoop, но попробуйте создать закрашенную банку. В противном случае вам понадобятся библиотеки Kafka в пути к классам вашего сервера, которых нет в Hadoop из коробки.