В моем проекте мне нужно подключиться к двум разным брокерам Kafka.
Мой application.yaml выглядит примерно так:
spring:
cloud:
function:
definition: orderCreatedListener;orderProcessedListener
stream:
bindings:
orderCreatedProducer-out-0:
destination: order-created
binder: kafka-one
orderCreatedListener-in-0:
destination: order-created
group: spot
binder: kafka-one
orderCreatedListener-out-0:
destination: order-processed
binder: kafka-two
orderProcessedListener-in-0:
destination: order-processed
group: spot
binder: kafka-two
kafka:
binder:
auto-create-topics: true
bindings:
orderCreatedListener-in-0:
consumer:
enableDlq: true
dlqName: order-created-dlq
autoCommitOnError: true
autoCommitOffset: true
orderProcessedListener-in-0:
consumer:
enableDlq: true
dlqName: order-processed-dlq
autoCommitOnError: true
autoCommitOffset: true
binders:
kafka-one:
type: kafka
environment:
spring:
cloud:
stream:
kafka:
binder:
brokers: localhost:9092
kafka-two:
type: kafka
environment:
spring:
cloud:
stream:
kafka:
binder:
brokers: localhost:9093
Но когда я запустил приложение, это не сработало, это вызвало следующую ошибку:
2024-03-05T23:35:48.473-03:00 INFO 25569 --- [| adminclient-1] org.apache.kafka.clients.NetworkClient : [AdminClient clientId=adminclient-1] Cancelled in-flight API_VERSIONS request with correlation id 31 due to node 1001 being disconnected (elapsed time since creation: 4ms, elapsed time since send: 4ms, request timeout: 3600000ms)
2024-03-05T23:35:49.595-03:00 INFO 25569 --- [| adminclient-1] org.apache.kafka.clients.NetworkClient : [AdminClient clientId=adminclient-1] Node 1001 disconnected.
2024-03-05T23:35:49.595-03:00 INFO 25569 --- [| adminclient-1] org.apache.kafka.clients.NetworkClient : [AdminClient clientId=adminclient-1] Cancelled in-flight API_VERSIONS request with correlation id 32 due to node 1001 being disconnected (elapsed time since creation: 5ms, elapsed time since send: 5ms, request timeout: 3600000ms)
2024-03-05T23:35:50.727-03:00 INFO 25569 --- [| adminclient-1] org.apache.kafka.clients.NetworkClient : [AdminClient clientId=adminclient-1] Node 1001 disconnected.
2024-03-05T23:35:50.728-03:00 INFO 25569 --- [| adminclient-1] org.apache.kafka.clients.NetworkClient : [AdminClient clientId=adminclient-1] Cancelled in-flight API_VERSIONS request with correlation id 33 due to node 1001 being disconnected (elapsed time since creation: 4ms, elapsed time since send: 4ms, request timeout: 3600000ms)
2024-03-05T23:35:51.086-03:00 INFO 25569 --- [| adminclient-1] o.a.k.c.a.i.AdminMetadataManager : [AdminClient clientId=adminclient-1] Metadata update failed
org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment. Call: fetchMetadata
Я хочу разделить темы Kafka на два кластера:
order-created и order-created-dlqorder-processed и order-processed-dlqЯ использую:
У меня есть два кластера Kafka, которые нормально работают в моей среде разработки с док-контейнерами: один доступен через порт 9092, а другой — через порт 9093.
Как это отрегулировать?
Похоже, возникли проблемы с подключением Kafka.
@sobychacko Спасибо за ваш ответ, я использовал именно этот пример в качестве образца, но у меня это не работает.
@sobychacko У вас есть еще предложения?
Не то чтобы я мог об этом подумать. Если вы сможете собрать свое приложение в виде небольшого примера на GH и сценарии создания докера для Kafka, я смогу попробовать это со своей стороны и посмотреть, что происходит.
Это репозиторий моего приложения: github.com/jonathanmdr/Spot/tree/feature/multiple-kafka-brokers
Пожалуйста, взгляните на мой ответ с некоторыми выводами.
Я думаю, что есть некоторые проблемы с вашей настройкой Kafka, определенной здесь . Вместо этого я использовал те, которые указаны в Spring Cloud Stream, предоставленные для нужд тестирования. Это дает кластер из 3 узлов с localhost:9091, localhost:9092 и localhost:9093. Вы используете те, которые работают на 9092 и 9093, в своем приложении в ветке feature/multiple-kafka-brokers. Поскольку эти узлы используются в качестве брокеров Kafka, я не вижу никаких ошибок при запуске приложения. При запуске команды CURL, указанной в README, я вижу в консоли следующий вывод:
2024-03-07T15:20:55.577-05:00 INFO 64186 --- [container-0-C-1] c.c.s.s.i.o.l.OrderProcessedListener : {"order_id":"89c6ea7a-6fe5-4fa9-91c6-733a5f603b10","customer_id":"36a8ea26-4eb0-4b9d-b609-d095175a2f7b","value":1000.00,"status":"REJECTED"}
2024-03-07T15:22:25.885-05:00 INFO 64186 --- [container-0-C-1] c.c.s.s.i.o.l.OrderProcessedListener : {"order_id":"cf22490f-6d4c-477a-99ba-2088cee07804","customer_id":"36a8ea26-4eb0-4b9d-b609-d095175a2f7b","value":400.00,"status":"APPROVED"}
Таким образом, это говорит мне о том, что с кодом или конфигурацией в application.yaml все в порядке; скорее, это указывает на некоторые проблемы с конфигурацией/подключением самих брокеров Kafka, возможно, из-за того, как вы настроили их в своем скрипте docker-compose. Я бы посоветовал начать смотреть там и посмотреть, видите ли вы какие-либо проблемы.
Судя по всему, приложение запускается и обрабатывает события правильно, но все темы по-прежнему создаются только в брокере kafka-one, второй брокер не работает, мое намерение состоит в том, чтобы темы order-created* существовали только на kafka-one, а темы order-processed* должны существовать только на kafka-two. только.
Я пытался использовать свойство auto-create-topics со значениями true и false, но все темы по-прежнему создавались с использованием kafka-one. У вас случайно нет идей по поводу того, что это происходит?
даже когда я создаю отдельные темы вручную на двух брокерах Kafka, приложение подключает только потребителя kafka-one, у kafka-two нет ни одного подключенного потребителя, для меня это странное поведение.
Можете ли вы запустить два отдельных кластера Kafka локально, один на localhost:9092, а другой на localhost:9093, и посмотреть, увидите ли вы одну и ту же проблему? Если та же проблема возникает и в этой настройке, скорее всего, это ошибка, и мы будем рады ее дальнейшему исследованию. Но сначала мы хотим проверить эту теорию.
@sobychako большое спасибо за помощь и внимание, я проанализировал подробнее свою настройку, и, судя по всему, проблема была вызвана неправильными настройками докер-контейнеров Kafka, после настройки приложение заработало нормально и с ожидаемым поведением!
Основная проблема, связанная с этим, была вызвана неправильными конфигурациями контейнеров докеров.
Внутри сети докеров все контейнеры Kafka работали на порту 9092, и это вызывало неожиданное поведение!
Старый docker-compose.yaml:
version: "3.9"
services:
spot.zookeeper.one:
image: docker.io/bitnami/zookeeper:3.7
container_name: spot.zookeeper.one
restart: "no"
hostname: spot.zookeeper.one
ports:
- "2282:2181"
environment:
ALLOW_ANONYMOUS_LOGIN: "yes"
networks:
- spot-network-one
volumes:
- zookeeper_data_one:/bitnami
spot.broker.one:
image: docker.io/bitnami/kafka:3
container_name: spot.broker.one
hostname: spot.broker.one
restart: "no"
ports:
- "9092:9092"
environment:
KAFKA_CFG_LISTENERS: "LISTENER_DOCKER_INTERNAL://:19092,LISTENER_DOCKER_EXTERNAL://:9092"
KAFKA_CFG_ADVERTISED_LISTENERS: "LISTENER_DOCKER_INTERNAL://spot.broker.one:19092,LISTENER_DOCKER_EXTERNAL://localhost:9092"
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: "LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT"
KAFKA_CFG_ZOOKEEPER_CONNECT: "spot.zookeeper.one:2181"
KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR: "1"
KAFKA_CFG_PORT: "9092"
KAFKA_INTER_BROKER_LISTENER_NAME: "LISTENER_DOCKER_INTERNAL"
ALLOW_PLAINTEXT_LISTENER: "yes"
depends_on:
- spot.zookeeper.one
networks:
- spot-network-one
volumes:
- kafka_data_one:/bitnami
spot.zookeeper.two:
image: docker.io/bitnami/zookeeper:3.7
container_name: spot.zookeeper.two
restart: "no"
hostname: spot.zookeeper.two
ports:
- "2283:2181"
environment:
ALLOW_ANONYMOUS_LOGIN: "yes"
networks:
- spot-network-two
volumes:
- zookeeper_data_two:/bitnami
spot.broker.two:
image: docker.io/bitnami/kafka:3
container_name: spot.broker.two
hostname: spot.broker.two
restart: "no"
ports:
- "9093:9092"
environment:
KAFKA_CFG_LISTENERS: "LISTENER_DOCKER_INTERNAL://:19092,LISTENER_DOCKER_EXTERNAL://:9092"
KAFKA_CFG_ADVERTISED_LISTENERS: "LISTENER_DOCKER_INTERNAL://spot.broker.two:19092,LISTENER_DOCKER_EXTERNAL://localhost:9092"
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: "LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT"
KAFKA_CFG_ZOOKEEPER_CONNECT: "spot.zookeeper.two:2181"
KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR: "1"
KAFKA_CFG_PORT: "9092"
KAFKA_INTER_BROKER_LISTENER_NAME: "LISTENER_DOCKER_INTERNAL"
ALLOW_PLAINTEXT_LISTENER: "yes"
depends_on:
- spot.zookeeper.two
networks:
- spot-network-two
volumes:
- kafka_data_two:/bitnami
spot.kafka-ui:
image: provectuslabs/kafka-ui:latest
container_name: spot.kafka-ui
restart: "no"
environment:
KAFKA_CLUSTERS_0_NAME: spot-one
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: spot.broker.one:19092
KAFKA_CLUSTERS_0_ZOOKEEPER: spot.zookeeper.one:2181
KAFKA_CLUSTERS_1_NAME: spot-two
KAFKA_CLUSTERS_1_BOOTSTRAPSERVERS: spot.broker.two:19092
KAFKA_CLUSTERS_1_ZOOKEEPER: spot.zookeeper.two:2181
ports:
- "8580:8080"
depends_on:
- spot.zookeeper.one
- spot.broker.one
- spot.zookeeper.two
- spot.broker.two
networks:
- spot-network-one
- spot-network-two
networks:
spot-network-one:
driver: bridge
spot-network-two:
driver: bridge
volumes:
zookeeper_data_one:
driver: local
kafka_data_one:
driver: local
zookeeper_data_two:
driver: local
kafka_data_two:
driver: local
Скорректированные docker-compose.yaml:
version: "3.9"
services:
spot.zookeeper.one:
image: docker.io/bitnami/zookeeper:3.7
container_name: spot.zookeeper.one
restart: "no"
hostname: spot.zookeeper.one
ports:
- "2282:2181"
environment:
ALLOW_ANONYMOUS_LOGIN: "yes"
networks:
- spot-network-one
volumes:
- zookeeper_data_one:/bitnami
spot.broker.one:
image: docker.io/bitnami/kafka:3
container_name: spot.broker.one
hostname: spot.broker.one
restart: "no"
ports:
- "9092:9092"
environment:
KAFKA_CFG_LISTENERS: "LISTENER_DOCKER_INTERNAL://:19092,LISTENER_DOCKER_EXTERNAL://:9092"
KAFKA_CFG_ADVERTISED_LISTENERS: "LISTENER_DOCKER_INTERNAL://spot.broker.one:19092,LISTENER_DOCKER_EXTERNAL://localhost:9092"
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: "LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT"
KAFKA_CFG_ZOOKEEPER_CONNECT: "spot.zookeeper.one:2181"
KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR: "1"
KAFKA_CFG_PORT: "9092"
KAFKA_INTER_BROKER_LISTENER_NAME: "LISTENER_DOCKER_INTERNAL"
ALLOW_PLAINTEXT_LISTENER: "yes"
depends_on:
- spot.zookeeper.one
networks:
- spot-network-one
volumes:
- kafka_data_one:/bitnami
spot.zookeeper.two:
image: docker.io/bitnami/zookeeper:3.7
container_name: spot.zookeeper.two
restart: "no"
hostname: spot.zookeeper.two
ports:
- "2283:2181"
environment:
ALLOW_ANONYMOUS_LOGIN: "yes"
networks:
- spot-network-two
volumes:
- zookeeper_data_two:/bitnami
spot.broker.two:
image: docker.io/bitnami/kafka:3
container_name: spot.broker.two
hostname: spot.broker.two
restart: "no"
ports:
- "9093:9093" # modified here
environment:
KAFKA_CFG_LISTENERS: "LISTENER_DOCKER_INTERNAL://:19093,LISTENER_DOCKER_EXTERNAL://:9093" # modified here
KAFKA_CFG_ADVERTISED_LISTENERS: "LISTENER_DOCKER_INTERNAL://spot.broker.two:19093,LISTENER_DOCKER_EXTERNAL://localhost:9093" # modified here
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: "LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT"
KAFKA_CFG_ZOOKEEPER_CONNECT: "spot.zookeeper.two:2181"
KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR: "1"
KAFKA_CFG_PORT: "9093" # modified here
KAFKA_INTER_BROKER_LISTENER_NAME: "LISTENER_DOCKER_INTERNAL"
ALLOW_PLAINTEXT_LISTENER: "yes"
depends_on:
- spot.zookeeper.two
networks:
- spot-network-two
volumes:
- kafka_data_two:/bitnami
spot.kafka-ui:
image: provectuslabs/kafka-ui:latest
container_name: spot.kafka-ui
restart: "no"
environment:
KAFKA_CLUSTERS_0_NAME: spot-one
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: spot.broker.one:19092
KAFKA_CLUSTERS_0_ZOOKEEPER: spot.zookeeper.one:2181
KAFKA_CLUSTERS_1_NAME: spot-two
KAFKA_CLUSTERS_1_BOOTSTRAPSERVERS: spot.broker.two:19093 # modified here
KAFKA_CLUSTERS_1_ZOOKEEPER: spot.zookeeper.two:2181
ports:
- "8580:8080"
depends_on:
- spot.zookeeper.one
- spot.broker.one
- spot.zookeeper.two
- spot.broker.two
networks:
- spot-network-one
- spot-network-two
networks:
spot-network-one:
driver: bridge
spot-network-two:
driver: bridge
volumes:
zookeeper_data_one:
driver: local
kafka_data_one:
driver: local
zookeeper_data_two:
driver: local
kafka_data_two:
driver: local
Все скорректированные точки отмечены комментарием # modified here в строке файла.
Мне ваша конфигурация кажется правильной. Вы можете сравнить это приложение с примером: github.com/spring-cloud/spring-cloud-stream-samples/blob/main/…