Не могу отправлять сообщения в темы кафки в docker-compose

Я написал файл docker-compose.yml, чтобы можно было связать и запустить несколько служб одновременно.

У меня есть jar, который отправляет сообщения в формате JSON в тему kafka внутри контейнера, только бывает, что иногда невозможно отправить данные в тему, и я получаю это сообщение об ошибке

WARNING: Error sending message with key 1,667,770,078,000
Topic general-events not present in metadata after 60000 ms.

Вот мой файл для создания докеров:

version: "3"

services:
  zookeeper:
    image: "bitnami/zookeeper:latest"
    container_name: zookeeper
    hostname: zookeeper
    ports:
      - "2181:2181"
    environment:
      - ALLOW_ANONYMOUS_LOGIN=yes

  kafka:
    image: confluentinc/cp-kafka:6.1.1
    container_name: kafka
    depends_on:
      - zookeeper
    ports:
      - '9092:9092'
    expose:
      - '29092'
    environment:
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: '1'
      KAFKA_MIN_INSYNC_REPLICAS: '1'


  init-kafka:
    image: confluentinc/cp-kafka:6.1.1
    container_name: init-kafka
    depends_on:
      - kafka
      - zookeeper
    entrypoint: [ '/bin/sh', '-c' ]
    command: |
      "
      # blocks until kafka is reachable
      kafka-topics --bootstrap-server kafka:29092 --list

      echo -e 'Creating kafka topics'
      kafka-topics --bootstrap-server kafka:29092 --create --if-not-exists --topic general-events --replication-factor 1 --partitions 1

      echo -e 'Successfully created the following topics:'
      kafka-topics --bootstrap-server kafka:29092 --list

      kafka-console-consumer --topic general-events --from-beginning --bootstrap-server kafka:29092
      "
    
  influxdb:
    image: influxdb
    container_name: influxdb
    hostname: influxdb
    volumes:
      - influxdb-storage:/var/lib/influxdb2:rw
    env_file:
      - .env
    entrypoint: ["./entrypoint.sh"]
    ports:
      - ${DOCKER_INFLUXDB_INIT_PORT}:8086

  telegraf:
    image: telegraf
    depends_on:
      - influxdb
      - kafka
    container_name: telegraf
    links:
      - influxdb
    restart: on-failure
    env_file:
      - .env
    environment: 
      - DOCKER_INFLUXDB_INIT_ORG=earthWatch
      - DOCKER_INFLUXDB_INIT_BUCKET=telegraf
      - DOCKER_INFLUXDB_INIT_ADMIN_TOKEN=60b42f6f12a91425b4fc02d1dd4e44eff9231f737171da79a993055c3aa367ab
    volumes:
      - ./telegraf/telegraf.conf:/etc/telegraf/telegraf.conf:rw

  java:
    image: openjdk:15
    depends_on:
      - init-kafka
    container_name: data-source
    volumes:
      - ./earthWatch.jar:/usr/src/java 
    command: bash -c "java -jar /usr/src/java earthWatch.jar"

volumes:
  influxdb-storage:

Моя конфигурация класса производителя:

public class Producer implements Runnable {

    private static final Logger LOGGER = Logger.getLogger(Producer.class.getName());
    private static final String TOPIC_NAME = "general-events";
    private KafkaProducer<Long, String> kafkaProducer = null;
    private final String KAFKA_CLUSTER_ENV_VAR_NAME = "KAFKA_CLUSTER";

    public Producer() {
        LOGGER.log(Level.INFO, "Kafka Producer running in thread {0}", Thread.currentThread().getName());
        Properties kafkaProps = new Properties();

        String defaultClusterValue = "172.21.0.4:29092";
        String kafkaCluster = System.getProperty(KAFKA_CLUSTER_ENV_VAR_NAME, defaultClusterValue);
        LOGGER.log(Level.INFO, "Kafka cluster {0}", kafkaCluster);

        kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaCluster);
        kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.LongSerializer");
        kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        kafkaProps.put(ProducerConfig.ACKS_CONFIG, "0");

        this.kafkaProducer = new KafkaProducer<>(kafkaProps);

    }

И когда я проверяю журналы, я знаю, что тема успешно создана:

[2022-11-06 21:35:13,759] WARN [AdminClient clientId=adminclient-1] Connection to node -1 (kafka/172.21.0.5:29092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2022-11-06 21:35:13,866] WARN [AdminClient clientId=adminclient-1] Connection to node -1 (kafka/172.21.0.5:29092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2022-11-06 21:35:13,969] WARN [AdminClient clientId=adminclient-1] Connection to node -1 (kafka/172.21.0.5:29092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2022-11-06 21:35:14,176] WARN [AdminClient clientId=adminclient-1] Connection to node -1 (kafka/172.21.0.5:29092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2022-11-06 21:35:14,578] WARN [AdminClient clientId=adminclient-1] Connection to node -1 (kafka/172.21.0.5:29092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2022-11-06 21:35:15,383] WARN [AdminClient clientId=adminclient-1] Connection to node -1 (kafka/172.21.0.5:29092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2022-11-06 21:35:16,591] WARN [AdminClient clientId=adminclient-1] Connection to node -1 (kafka/172.21.0.5:29092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2022-11-06 21:35:17,795] WARN [AdminClient clientId=adminclient-1] Connection to node -1 (kafka/172.21.0.5:29092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2022-11-06 21:35:18,900] WARN [AdminClient clientId=adminclient-1] Connection to node -1 (kafka/172.21.0.5:29092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)

Creating kafka topics
Created topic general-events.
Successfully created the following topics:
general-events

Но он ничего не потребляет в конце, есть идеи, что не так?

Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
В компьютерном программировании биты играют важнейшую роль в представлении и манипулировании данными на двоичном уровне. Побитовые операции...
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Приходилось ли вам сталкиваться с требованиями, в которых вас могли попросить поднять тревогу или выдать ошибку, когда метод Java занимает больше...
Полный курс Java для разработчиков веб-сайтов и приложений
Полный курс Java для разработчиков веб-сайтов и приложений
Получите сертификат Java Web и Application Developer, используя наш курс.
0
0
100
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Читая журналы, я заметил, что иногда адрес загрузочного сервера меняется, что проблематично, потому что у меня есть эта строка в моем классе производителя:

String defaultClusterValue = "172.21.0.4:29092";

Что сейчас:

String defaultClusterValue = "kafka:29092";

чтобы всегда указывать правильный адрес. Это решает проблему для меня

Другие вопросы по теме