Spring Cloud Stream Конфигурация нескольких кластеров Kafka

В моем проекте мне нужно подключиться к двум разным брокерам Kafka.

Мой application.yaml выглядит примерно так:

spring:
  cloud:
    function:
      definition: orderCreatedListener;orderProcessedListener
    stream:
      bindings:
        orderCreatedProducer-out-0:
          destination: order-created
          binder: kafka-one
        orderCreatedListener-in-0:
          destination: order-created
          group: spot
          binder: kafka-one
        orderCreatedListener-out-0:
          destination: order-processed
          binder: kafka-two
        orderProcessedListener-in-0:
          destination: order-processed
          group: spot
          binder: kafka-two
      kafka:
        binder:
          auto-create-topics: true
        bindings:
          orderCreatedListener-in-0:
            consumer:
              enableDlq: true
              dlqName: order-created-dlq
              autoCommitOnError: true
              autoCommitOffset: true
          orderProcessedListener-in-0:
            consumer:
              enableDlq: true
              dlqName: order-processed-dlq
              autoCommitOnError: true
              autoCommitOffset: true
      binders:
        kafka-one:
          type: kafka
          environment:
            spring:
              cloud:
                stream:
                  kafka:
                    binder:
                      brokers: localhost:9092
        kafka-two:
          type: kafka
          environment:
            spring:
              cloud:
                stream:
                  kafka:
                    binder:
                      brokers: localhost:9093

Но когда я запустил приложение, это не сработало, это вызвало следующую ошибку:

2024-03-05T23:35:48.473-03:00  INFO 25569 --- [| adminclient-1] org.apache.kafka.clients.NetworkClient   : [AdminClient clientId=adminclient-1] Cancelled in-flight API_VERSIONS request with correlation id 31 due to node 1001 being disconnected (elapsed time since creation: 4ms, elapsed time since send: 4ms, request timeout: 3600000ms)
2024-03-05T23:35:49.595-03:00  INFO 25569 --- [| adminclient-1] org.apache.kafka.clients.NetworkClient   : [AdminClient clientId=adminclient-1] Node 1001 disconnected.
2024-03-05T23:35:49.595-03:00  INFO 25569 --- [| adminclient-1] org.apache.kafka.clients.NetworkClient   : [AdminClient clientId=adminclient-1] Cancelled in-flight API_VERSIONS request with correlation id 32 due to node 1001 being disconnected (elapsed time since creation: 5ms, elapsed time since send: 5ms, request timeout: 3600000ms)
2024-03-05T23:35:50.727-03:00  INFO 25569 --- [| adminclient-1] org.apache.kafka.clients.NetworkClient   : [AdminClient clientId=adminclient-1] Node 1001 disconnected.
2024-03-05T23:35:50.728-03:00  INFO 25569 --- [| adminclient-1] org.apache.kafka.clients.NetworkClient   : [AdminClient clientId=adminclient-1] Cancelled in-flight API_VERSIONS request with correlation id 33 due to node 1001 being disconnected (elapsed time since creation: 4ms, elapsed time since send: 4ms, request timeout: 3600000ms)
2024-03-05T23:35:51.086-03:00  INFO 25569 --- [| adminclient-1] o.a.k.c.a.i.AdminMetadataManager         : [AdminClient clientId=adminclient-1] Metadata update failed

org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment. Call: fetchMetadata

Я хочу разделить темы Kafka на два кластера:

  • кафка — тот, который содержит order-created и order-created-dlq
  • кафка — два, содержащие order-processed и order-processed-dlq

Я использую:

  • Весенняя загрузка 3.2.3
  • Весеннее облако 2023.0.0

У меня есть два кластера Kafka, которые нормально работают в моей среде разработки с док-контейнерами: один доступен через порт 9092, а другой — через порт 9093.

Как это отрегулировать?

Мне ваша конфигурация кажется правильной. Вы можете сравнить это приложение с примером: github.com/spring-cloud/spring-cloud-stream-samples/blob/mai‌​n/…

sobychacko 06.03.2024 18:15

Похоже, возникли проблемы с подключением Kafka.

sobychacko 06.03.2024 18:15

@sobychacko Спасибо за ваш ответ, я использовал именно этот пример в качестве образца, но у меня это не работает.

Jonathan Henrique Medeiros 06.03.2024 20:38

@sobychacko У вас есть еще предложения?

Jonathan Henrique Medeiros 06.03.2024 20:59

Не то чтобы я мог об этом подумать. Если вы сможете собрать свое приложение в виде небольшого примера на GH и сценарии создания докера для Kafka, я смогу попробовать это со своей стороны и посмотреть, что происходит.

sobychacko 06.03.2024 21:56

Это репозиторий моего приложения: github.com/jonathanmdr/Spot/tree/feature/multiple-kafka-brok‌​ers

Jonathan Henrique Medeiros 06.03.2024 23:12

Пожалуйста, взгляните на мой ответ с некоторыми выводами.

sobychacko 07.03.2024 21:33
0
7
148
2
Перейти к ответу Данный вопрос помечен как решенный

Ответы 2

Я думаю, что есть некоторые проблемы с вашей настройкой Kafka, определенной здесь . Вместо этого я использовал те, которые указаны в Spring Cloud Stream, предоставленные для нужд тестирования. Это дает кластер из 3 узлов с localhost:9091, localhost:9092 и localhost:9093. Вы используете те, которые работают на 9092 и 9093, в своем приложении в ветке feature/multiple-kafka-brokers. Поскольку эти узлы используются в качестве брокеров Kafka, я не вижу никаких ошибок при запуске приложения. При запуске команды CURL, указанной в README, я вижу в консоли следующий вывод:

2024-03-07T15:20:55.577-05:00  INFO 64186 --- [container-0-C-1] c.c.s.s.i.o.l.OrderProcessedListener     : {"order_id":"89c6ea7a-6fe5-4fa9-91c6-733a5f603b10","customer_id":"36a8ea26-4eb0-4b9d-b609-d095175a2f7b","value":1000.00,"status":"REJECTED"}
2024-03-07T15:22:25.885-05:00  INFO 64186 --- [container-0-C-1] c.c.s.s.i.o.l.OrderProcessedListener     : {"order_id":"cf22490f-6d4c-477a-99ba-2088cee07804","customer_id":"36a8ea26-4eb0-4b9d-b609-d095175a2f7b","value":400.00,"status":"APPROVED"}

Таким образом, это говорит мне о том, что с кодом или конфигурацией в application.yaml все в порядке; скорее, это указывает на некоторые проблемы с конфигурацией/подключением самих брокеров Kafka, возможно, из-за того, как вы настроили их в своем скрипте docker-compose. Я бы посоветовал начать смотреть там и посмотреть, видите ли вы какие-либо проблемы.

Судя по всему, приложение запускается и обрабатывает события правильно, но все темы по-прежнему создаются только в брокере kafka-one, второй брокер не работает, мое намерение состоит в том, чтобы темы order-created* существовали только на kafka-one, а темы order-processed* должны существовать только на kafka-two. только.

Jonathan Henrique Medeiros 08.03.2024 01:54

Я пытался использовать свойство auto-create-topics со значениями true и false, но все темы по-прежнему создавались с использованием kafka-one. У вас случайно нет идей по поводу того, что это происходит?

Jonathan Henrique Medeiros 08.03.2024 02:02

даже когда я создаю отдельные темы вручную на двух брокерах Kafka, приложение подключает только потребителя kafka-one, у kafka-two нет ни одного подключенного потребителя, для меня это странное поведение.

Jonathan Henrique Medeiros 08.03.2024 14:50

Можете ли вы запустить два отдельных кластера Kafka локально, один на localhost:9092, а другой на localhost:9093, и посмотреть, увидите ли вы одну и ту же проблему? Если та же проблема возникает и в этой настройке, скорее всего, это ошибка, и мы будем рады ее дальнейшему исследованию. Но сначала мы хотим проверить эту теорию.

sobychacko 09.03.2024 00:28

@sobychako большое спасибо за помощь и внимание, я проанализировал подробнее свою настройку, и, судя по всему, проблема была вызвана неправильными настройками докер-контейнеров Kafka, после настройки приложение заработало нормально и с ожидаемым поведением!

Jonathan Henrique Medeiros 09.03.2024 06:18
Ответ принят как подходящий

Основная проблема, связанная с этим, была вызвана неправильными конфигурациями контейнеров докеров.

Внутри сети докеров все контейнеры Kafka работали на порту 9092, и это вызывало неожиданное поведение!

Старый docker-compose.yaml:

version: "3.9"

services:

  spot.zookeeper.one:
    image: docker.io/bitnami/zookeeper:3.7
    container_name: spot.zookeeper.one
    restart: "no"
    hostname: spot.zookeeper.one
    ports:
      - "2282:2181"
    environment:
      ALLOW_ANONYMOUS_LOGIN: "yes"
    networks:
      - spot-network-one
    volumes:
      - zookeeper_data_one:/bitnami

  spot.broker.one:
    image: docker.io/bitnami/kafka:3
    container_name: spot.broker.one
    hostname: spot.broker.one
    restart: "no"
    ports:
      - "9092:9092"
    environment:
      KAFKA_CFG_LISTENERS: "LISTENER_DOCKER_INTERNAL://:19092,LISTENER_DOCKER_EXTERNAL://:9092"
      KAFKA_CFG_ADVERTISED_LISTENERS: "LISTENER_DOCKER_INTERNAL://spot.broker.one:19092,LISTENER_DOCKER_EXTERNAL://localhost:9092"
      KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: "LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT"
      KAFKA_CFG_ZOOKEEPER_CONNECT: "spot.zookeeper.one:2181"
      KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR: "1"
      KAFKA_CFG_PORT: "9092"
      KAFKA_INTER_BROKER_LISTENER_NAME: "LISTENER_DOCKER_INTERNAL"
      ALLOW_PLAINTEXT_LISTENER: "yes"
    depends_on:
      - spot.zookeeper.one
    networks:
      - spot-network-one
    volumes:
      - kafka_data_one:/bitnami

  spot.zookeeper.two:
    image: docker.io/bitnami/zookeeper:3.7
    container_name: spot.zookeeper.two
    restart: "no"
    hostname: spot.zookeeper.two
    ports:
      - "2283:2181"
    environment:
      ALLOW_ANONYMOUS_LOGIN: "yes"
    networks:
      - spot-network-two
    volumes:
      - zookeeper_data_two:/bitnami

  spot.broker.two:
    image: docker.io/bitnami/kafka:3
    container_name: spot.broker.two
    hostname: spot.broker.two
    restart: "no"
    ports:
      - "9093:9092"
    environment:
      KAFKA_CFG_LISTENERS: "LISTENER_DOCKER_INTERNAL://:19092,LISTENER_DOCKER_EXTERNAL://:9092"
      KAFKA_CFG_ADVERTISED_LISTENERS: "LISTENER_DOCKER_INTERNAL://spot.broker.two:19092,LISTENER_DOCKER_EXTERNAL://localhost:9092"
      KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: "LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT"
      KAFKA_CFG_ZOOKEEPER_CONNECT: "spot.zookeeper.two:2181"
      KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR: "1"
      KAFKA_CFG_PORT: "9092"
      KAFKA_INTER_BROKER_LISTENER_NAME: "LISTENER_DOCKER_INTERNAL"
      ALLOW_PLAINTEXT_LISTENER: "yes"
    depends_on:
      - spot.zookeeper.two
    networks:
      - spot-network-two
    volumes:
      - kafka_data_two:/bitnami

  spot.kafka-ui:
    image: provectuslabs/kafka-ui:latest
    container_name: spot.kafka-ui
    restart: "no"
    environment:
      KAFKA_CLUSTERS_0_NAME: spot-one
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: spot.broker.one:19092
      KAFKA_CLUSTERS_0_ZOOKEEPER: spot.zookeeper.one:2181
      KAFKA_CLUSTERS_1_NAME: spot-two
      KAFKA_CLUSTERS_1_BOOTSTRAPSERVERS: spot.broker.two:19092
      KAFKA_CLUSTERS_1_ZOOKEEPER: spot.zookeeper.two:2181
    ports:
      - "8580:8080"
    depends_on:
      - spot.zookeeper.one
      - spot.broker.one
      - spot.zookeeper.two
      - spot.broker.two
    networks:
      - spot-network-one
      - spot-network-two

networks:
  spot-network-one:
    driver: bridge
  spot-network-two:
    driver: bridge

volumes:
  zookeeper_data_one:
    driver: local
  kafka_data_one:
    driver: local
  zookeeper_data_two:
    driver: local
  kafka_data_two:
    driver: local

Скорректированные docker-compose.yaml:

version: "3.9"

services:

  spot.zookeeper.one:
    image: docker.io/bitnami/zookeeper:3.7
    container_name: spot.zookeeper.one
    restart: "no"
    hostname: spot.zookeeper.one
    ports:
      - "2282:2181"
    environment:
      ALLOW_ANONYMOUS_LOGIN: "yes"
    networks:
      - spot-network-one
    volumes:
      - zookeeper_data_one:/bitnami

  spot.broker.one:
    image: docker.io/bitnami/kafka:3
    container_name: spot.broker.one
    hostname: spot.broker.one
    restart: "no"
    ports:
      - "9092:9092"
    environment:
      KAFKA_CFG_LISTENERS: "LISTENER_DOCKER_INTERNAL://:19092,LISTENER_DOCKER_EXTERNAL://:9092"
      KAFKA_CFG_ADVERTISED_LISTENERS: "LISTENER_DOCKER_INTERNAL://spot.broker.one:19092,LISTENER_DOCKER_EXTERNAL://localhost:9092"
      KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: "LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT"
      KAFKA_CFG_ZOOKEEPER_CONNECT: "spot.zookeeper.one:2181"
      KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR: "1"
      KAFKA_CFG_PORT: "9092"
      KAFKA_INTER_BROKER_LISTENER_NAME: "LISTENER_DOCKER_INTERNAL"
      ALLOW_PLAINTEXT_LISTENER: "yes"
    depends_on:
      - spot.zookeeper.one
    networks:
      - spot-network-one
    volumes:
      - kafka_data_one:/bitnami

  spot.zookeeper.two:
    image: docker.io/bitnami/zookeeper:3.7
    container_name: spot.zookeeper.two
    restart: "no"
    hostname: spot.zookeeper.two
    ports:
      - "2283:2181"
    environment:
      ALLOW_ANONYMOUS_LOGIN: "yes"
    networks:
      - spot-network-two
    volumes:
      - zookeeper_data_two:/bitnami

  spot.broker.two:
    image: docker.io/bitnami/kafka:3
    container_name: spot.broker.two
    hostname: spot.broker.two
    restart: "no"
    ports:
      - "9093:9093" # modified here
    environment:
      KAFKA_CFG_LISTENERS: "LISTENER_DOCKER_INTERNAL://:19093,LISTENER_DOCKER_EXTERNAL://:9093" # modified here
      KAFKA_CFG_ADVERTISED_LISTENERS: "LISTENER_DOCKER_INTERNAL://spot.broker.two:19093,LISTENER_DOCKER_EXTERNAL://localhost:9093" # modified here
      KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: "LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT"
      KAFKA_CFG_ZOOKEEPER_CONNECT: "spot.zookeeper.two:2181"
      KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR: "1"
      KAFKA_CFG_PORT: "9093" # modified here
      KAFKA_INTER_BROKER_LISTENER_NAME: "LISTENER_DOCKER_INTERNAL"
      ALLOW_PLAINTEXT_LISTENER: "yes"
    depends_on:
      - spot.zookeeper.two
    networks:
      - spot-network-two
    volumes:
      - kafka_data_two:/bitnami

  spot.kafka-ui:
    image: provectuslabs/kafka-ui:latest
    container_name: spot.kafka-ui
    restart: "no"
    environment:
      KAFKA_CLUSTERS_0_NAME: spot-one
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: spot.broker.one:19092
      KAFKA_CLUSTERS_0_ZOOKEEPER: spot.zookeeper.one:2181
      KAFKA_CLUSTERS_1_NAME: spot-two
      KAFKA_CLUSTERS_1_BOOTSTRAPSERVERS: spot.broker.two:19093 # modified here
      KAFKA_CLUSTERS_1_ZOOKEEPER: spot.zookeeper.two:2181
    ports:
      - "8580:8080"
    depends_on:
      - spot.zookeeper.one
      - spot.broker.one
      - spot.zookeeper.two
      - spot.broker.two
    networks:
      - spot-network-one
      - spot-network-two

networks:
  spot-network-one:
    driver: bridge
  spot-network-two:
    driver: bridge

volumes:
  zookeeper_data_one:
    driver: local
  kafka_data_one:
    driver: local
  zookeeper_data_two:
    driver: local
  kafka_data_two:
    driver: local

Все скорректированные точки отмечены комментарием # modified here в строке файла.

Другие вопросы по теме