Я написал файл docker-compose.yml, чтобы можно было связать и запустить несколько служб одновременно.
У меня есть jar, который отправляет сообщения в формате JSON в тему kafka внутри контейнера, только бывает, что иногда невозможно отправить данные в тему, и я получаю это сообщение об ошибке
WARNING: Error sending message with key 1,667,770,078,000
Topic general-events not present in metadata after 60000 ms.
Вот мой файл для создания докеров:
version: "3"
services:
zookeeper:
image: "bitnami/zookeeper:latest"
container_name: zookeeper
hostname: zookeeper
ports:
- "2181:2181"
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
kafka:
image: confluentinc/cp-kafka:6.1.1
container_name: kafka
depends_on:
- zookeeper
ports:
- '9092:9092'
expose:
- '29092'
environment:
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: '1'
KAFKA_MIN_INSYNC_REPLICAS: '1'
init-kafka:
image: confluentinc/cp-kafka:6.1.1
container_name: init-kafka
depends_on:
- kafka
- zookeeper
entrypoint: [ '/bin/sh', '-c' ]
command: |
"
# blocks until kafka is reachable
kafka-topics --bootstrap-server kafka:29092 --list
echo -e 'Creating kafka topics'
kafka-topics --bootstrap-server kafka:29092 --create --if-not-exists --topic general-events --replication-factor 1 --partitions 1
echo -e 'Successfully created the following topics:'
kafka-topics --bootstrap-server kafka:29092 --list
kafka-console-consumer --topic general-events --from-beginning --bootstrap-server kafka:29092
"
influxdb:
image: influxdb
container_name: influxdb
hostname: influxdb
volumes:
- influxdb-storage:/var/lib/influxdb2:rw
env_file:
- .env
entrypoint: ["./entrypoint.sh"]
ports:
- ${DOCKER_INFLUXDB_INIT_PORT}:8086
telegraf:
image: telegraf
depends_on:
- influxdb
- kafka
container_name: telegraf
links:
- influxdb
restart: on-failure
env_file:
- .env
environment:
- DOCKER_INFLUXDB_INIT_ORG=earthWatch
- DOCKER_INFLUXDB_INIT_BUCKET=telegraf
- DOCKER_INFLUXDB_INIT_ADMIN_TOKEN=60b42f6f12a91425b4fc02d1dd4e44eff9231f737171da79a993055c3aa367ab
volumes:
- ./telegraf/telegraf.conf:/etc/telegraf/telegraf.conf:rw
java:
image: openjdk:15
depends_on:
- init-kafka
container_name: data-source
volumes:
- ./earthWatch.jar:/usr/src/java
command: bash -c "java -jar /usr/src/java earthWatch.jar"
volumes:
influxdb-storage:
Моя конфигурация класса производителя:
public class Producer implements Runnable {
private static final Logger LOGGER = Logger.getLogger(Producer.class.getName());
private static final String TOPIC_NAME = "general-events";
private KafkaProducer<Long, String> kafkaProducer = null;
private final String KAFKA_CLUSTER_ENV_VAR_NAME = "KAFKA_CLUSTER";
public Producer() {
LOGGER.log(Level.INFO, "Kafka Producer running in thread {0}", Thread.currentThread().getName());
Properties kafkaProps = new Properties();
String defaultClusterValue = "172.21.0.4:29092";
String kafkaCluster = System.getProperty(KAFKA_CLUSTER_ENV_VAR_NAME, defaultClusterValue);
LOGGER.log(Level.INFO, "Kafka cluster {0}", kafkaCluster);
kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaCluster);
kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.LongSerializer");
kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
kafkaProps.put(ProducerConfig.ACKS_CONFIG, "0");
this.kafkaProducer = new KafkaProducer<>(kafkaProps);
}
И когда я проверяю журналы, я знаю, что тема успешно создана:
[2022-11-06 21:35:13,759] WARN [AdminClient clientId=adminclient-1] Connection to node -1 (kafka/172.21.0.5:29092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2022-11-06 21:35:13,866] WARN [AdminClient clientId=adminclient-1] Connection to node -1 (kafka/172.21.0.5:29092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2022-11-06 21:35:13,969] WARN [AdminClient clientId=adminclient-1] Connection to node -1 (kafka/172.21.0.5:29092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2022-11-06 21:35:14,176] WARN [AdminClient clientId=adminclient-1] Connection to node -1 (kafka/172.21.0.5:29092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2022-11-06 21:35:14,578] WARN [AdminClient clientId=adminclient-1] Connection to node -1 (kafka/172.21.0.5:29092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2022-11-06 21:35:15,383] WARN [AdminClient clientId=adminclient-1] Connection to node -1 (kafka/172.21.0.5:29092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2022-11-06 21:35:16,591] WARN [AdminClient clientId=adminclient-1] Connection to node -1 (kafka/172.21.0.5:29092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2022-11-06 21:35:17,795] WARN [AdminClient clientId=adminclient-1] Connection to node -1 (kafka/172.21.0.5:29092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2022-11-06 21:35:18,900] WARN [AdminClient clientId=adminclient-1] Connection to node -1 (kafka/172.21.0.5:29092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
Creating kafka topics
Created topic general-events.
Successfully created the following topics:
general-events
Но он ничего не потребляет в конце, есть идеи, что не так?
Читая журналы, я заметил, что иногда адрес загрузочного сервера меняется, что проблематично, потому что у меня есть эта строка в моем классе производителя:
String defaultClusterValue = "172.21.0.4:29092";
Что сейчас:
String defaultClusterValue = "kafka:29092";
чтобы всегда указывать правильный адрес. Это решает проблему для меня