Kafka producer-consumer не может производить / потреблять данные avro

Я написал код производителя kafka для создания данных avro, но он показывает следующую ошибку при сериализации данных:

Exception in thread "main" org.apache.kafka.common.errors.SerializationException: Error serializing Avro message Caused by: java.net.UnknownHostException: sandbox-hdf.hortonworks.com atjava.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:184) at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:172) at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) at java.net.Socket.connect(Socket.java:589) at java.net.Socket.connect(Socket.java:538) at sun.net.NetworkClient.doConnect(NetworkClient.java:180) at sun.net.www.http.HttpClient.openServer(HttpClient.java:463) at sun.net.www.http.HttpClient.openServer(HttpClient.java:558) at sun.net.www.http.HttpClient.(HttpClient.java:242) at sun.net.www.http.HttpClient.New(HttpClient.java:339) at sun.net.www.http.HttpClient.New(HttpClient.java:357) at sun.net.www.protocol.http.HttpURLConnection.getNewHttpClient(HttpURLConnection.java:1220) at sun.net.www.protocol.http.HttpURLConnection.plainConnect0(HttpURLConnection.java:1156) at sun.net.www.protocol.http.HttpURLConnection.plainConnect(HttpURLConnection.java:1050) at sun.net.www.protocol.http.HttpURLConnection.connect(HttpURLConnection.java:984)......

Ниже приведен мой код производителя:

package com.perfaware.kafka01;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.example.Customer;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class producerAvro {

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        Properties properties = new Properties();
        // setting producer properties

        properties.setProperty("bootstrap.servers", "localhost:9092");
        properties.setProperty("acks", "1");
        properties.setProperty("retries", "10");

        // Serialization(avro part)
        properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());
        properties.setProperty("schema.registry.url", "http://sandbox-hdf.hortonworks.com:7788/api/v1");

        Producer<String, Customer> producer = new KafkaProducer<String, Customer>(properties);

        String topic = "topic1";

        Customer customer = Customer.newBuilder()
                .setAge(21)
                .setAutomatedEmail(false)
                .setFirstName("Manish")
                .setLastName("B.")
                .setHeight(176f)
                .setWeight(62f)
                .build();

        ProducerRecord<String, Customer> producerRecord = new ProducerRecord<String, Customer>("topic1", customer);

        System.out.println(customer);
        producer.send(producerRecord, new Callback() {
            public void onCompletion(RecordMetadata metadata, Exception exception) {
                if (exception == null) {
                    System.out.println(metadata.toString());
                } else {
                    exception.printStackTrace();
                }
            }
        }).get();

        producer.flush();
        producer.close();
    }
}

Я также прикрепляю свой файл pom.xml, если это помогает:

 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>Kafka_Avro</groupId>
  <artifactId>Kafka_Avro_Practise</artifactId>
  <version>0.0.1-SNAPSHOT</version>

  <properties>
    <avro.verion>1.7.4</avro.verion>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <confluent.version>3.1.1</confluent.version>
  </properties>

 <repositories>
        <repository>
            <id>confluent</id>
            <url>http://packages.confluent.io/maven/</url>
        </repository>
 </repositories>

  <dependencies>


  <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-tools -->
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-tools</artifactId>
    <version>2.0.0</version>
</dependency>



  <dependency>
            <groupId>io.confluent</groupId>
            <artifactId>kafka-avro-serializer</artifactId>
            <version>3.1.1</version>
        </dependency>

  <!-- https://mvnrepository.com/artifact/org.apache.avro/avro -->
<dependency>
    <groupId>org.apache.avro</groupId>
    <artifactId>avro</artifactId>
    <version>1.8.2</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>1.1.1</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.avro/avro-compiler -->
<dependency>
    <groupId>org.apache.avro</groupId>
    <artifactId>avro-compiler</artifactId>
    <version>1.8.2</version>
</dependency>

<dependency>
            <groupId>com.google.code.gson</groupId>
            <artifactId>gson</artifactId>
            <version>2.3.1</version>
            <scope>provided</scope>
        </dependency>

<!-- https://mvnrepository.com/artifact/org.apache.maven.plugins/maven-compiler-plugin -->
<dependency>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-compiler-plugin</artifactId>
    <version>3.8.0</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.avro/avro-mapred -->
<dependency>
    <groupId>org.apache.avro</groupId>
    <artifactId>avro-mapred</artifactId>
    <version>1.8.1</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.avro/avro-ipc -->
<dependency>
    <groupId>org.apache.avro</groupId>
    <artifactId>avro-ipc</artifactId>
    <version>1.8.2</version>
</dependency>

<!-- https://mvnrepository.com/artifact/commons-codec/commons-codec -->
<dependency>
    <groupId>commons-codec</groupId>
    <artifactId>commons-codec</artifactId>
    <version>1.11</version>
</dependency>

  </dependencies>

   <build>
        <plugins>
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.0.0</version>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

Я также попытался изменить сериализатор значений:

com.hortonworks.registries.schemaregistry.serdes.avro.kafka.KafkaAvroSerializer

но это не решило проблему.

0
0
410
1

Ответы 1

UnknownHostException: sandbox-hdf.hortonworks.com

Если вы используете песочницу, вам следовало отредактировать файл /etc/hosts, чтобы сделать его известным хостом.

Однако вы определенно захотите использовать сериализатор Hortonworks, если используете их реестр. Непонятно, какая у вас ошибка при его использовании, но если то же самое, это проблема с сетью, не имеющая отношения к Avro.

"value.serializer","com.hortonworks.registries.schemaregistry.serdes.avro.kafka.KafkaAvroSerializer"

Кроме того, bootstrap.servers, вероятно, также потребуется разрешить экземпляры Kafka песочницы, а не только localhost.

Если вы действительно хотите использовать Confluent, хотя я не уверен, что он сработает, вам нужно будет использовать согласованные номера версий Kafka: например, Вы поместили Kafka 1.1.1, 2.0 и Confluent 3.1.1, который основан на Kafka 0.10.x.
. Аналогично с Avro - все должно быть установлено только для 1.8.1, например, хотя вам не нужны библиотеки IPC или Mapred Avro для работы вашего кода. Наверное, не компилятор.

Другие вопросы по теме