Приложение Kafka и Python в Kubernetes в отдельных модулях — NoBrokersAvailable()

для начала - я новичок в Kubernetes, и я могу опустить некоторые основы.

У меня есть работающее контейнерное приложение, которое организовано с помощью docker-compose (и работает нормально), и я переписываю его для развертывания в Kubernetes. Я преобразовал его в файлы .yaml K8s через Kompose и в некоторой степени изменил его. Я изо всех сил пытаюсь установить соединение между приложением Python и Kafka, которые работают на отдельных модулях. Приложение Python постоянно возвращает ошибку NoBrokersAvailable() независимо от того, что я пытаюсь применить — совершенно очевидно, что оно не может подключиться к брокеру. Что мне не хватает? Я определил правильных слушателей и сетевую политику. Я запускаю его локально на Minikube с локальным реестром образов Docker.

Приложение Python подключается к следующему адресу: KafkaProducer(bootstrap_servers='kafka-service.default.svc.cluster.local:9092')

kafka-deployment.yaml (образ Dockerfile основан на confluentinc/cp-kafka:6.2.0 с добавленным к нему скриптом настройки тем):

apiVersion: apps/v1
kind: Deployment
metadata:
  annotations:
    kompose.cmd: C:\ProgramData\chocolatey\lib\kubernetes-kompose\tools\kompose.exe convert
    kompose.version: 1.27.0 (b0ed6a2c9)
  creationTimestamp: null
  labels:
    io.kompose.service: kafka
  name: kafka-app
spec:
  replicas: 1
  selector:
    matchLabels:
      io.kompose.service: kafka
  strategy: {}
  template:
    metadata:
      annotations:
        kompose.cmd: C:\ProgramData\chocolatey\lib\kubernetes-kompose\tools\kompose.exe convert
        kompose.version: 1.27.0 (b0ed6a2c9)
      creationTimestamp: null
      labels:
        io.kompose.network/pipeline-network: "true"
        io.kompose.service: kafka
    spec:
      containers:
        - env:
            - name: KAFKA_LISTENERS
              value: "LISTENER_INTERNAL://0.0.0.0:29092,LISTENER_EXTERNAL://0.0.0.0:9092"
            - name: KAFKA_ADVERTISED_LISTENERS
              value: "LISTENER_INTERNAL://localhost:29092,LISTENER_EXTERNAL://kafka-service.default.svc.cluster.local:9092"
            - name: KAFKA_LISTENER_SECURITY_PROTOCOL_MAP
              value: "LISTENER_EXTERNAL:PLAINTEXT,LISTENER_INTERNAL:PLAINTEXT"
            - name: KAFKA_INTER_BROKER_LISTENER_NAME
              value: "LISTENER_INTERNAL"
            - name: KAFKA_BROKER_ID
              value: "1"
            - name: KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR
              value: "1"
            - name: KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR
              value: "1"
            - name: KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS
              value: "0"
            - name: KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR
              value: "1"
            - name: KAFKA_TRANSACTION_STATE_LOG_MIN_ISR
              value: "1"
            - name: KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR
              value: "1"
            - name: KAFKA_ZOOKEEPER_CONNECT
              value: zookeeper:2181
            - name: POD_IP
              valueFrom:
                fieldRef:
                  fieldPath: status.podIP
          image: finnhub-streaming-data-pipeline-kafka:latest
          imagePullPolicy: Never
          lifecycle:
            postStart:
              exec: 
                command: ["/bin/sh","-c","/kafka-setup-k8s.sh"]
          name: kafka-app
          ports:
            - containerPort: 9092
            - containerPort: 29092
          resources: {}
      restartPolicy: Always

---
apiVersion: v1
kind: Service
metadata:
  name: kafka-service
spec:
  selector:
    app: kafka
  ports:
    - protocol: TCP
      name: firstport
      port: 9092
      targetPort: 9092
    - protocol: TCP
      name: secondport
      port: 29092
      targetPort: 29092

finnhub-producer.yaml (он же развертывание моего приложения Python):

apiVersion: apps/v1
kind: Deployment
metadata:
  annotations:
    kompose.cmd: C:\ProgramData\chocolatey\lib\kubernetes-kompose\tools\kompose.exe convert
    kompose.version: 1.27.0 (b0ed6a2c9)
  creationTimestamp: null
  labels:
    io.kompose.service: finnhubproducer
  name: finnhubproducer
spec:
  replicas: 1
  selector:
    matchLabels:
      io.kompose.service: finnhubproducer
  strategy: {}
  template:
    metadata:
      annotations:
        kompose.cmd: C:\ProgramData\chocolatey\lib\kubernetes-kompose\tools\kompose.exe convert
        kompose.version: 1.27.0 (b0ed6a2c9)
      creationTimestamp: null
      labels:
        io.kompose.network/pipeline-network: "true"
        io.kompose.service: finnhubproducer
    spec:
      containers:
        - env:
            - name: KAFKA_PORT
              value: "9092"
            - name: KAFKA_SERVER
              value: kafka-service.default.svc.cluster.local
            - name: KAFKA_TOPIC_NAME
              value: market
          image: docker.io/library/finnhub-streaming-data-pipeline-finnhubproducer:latest
          imagePullPolicy: Never
          name: finnhubproducer
          ports:
            - containerPort: 8001
          resources: {}
      restartPolicy: Always
status: {}

---
apiVersion: v1
kind: Service
metadata:
  annotations:
    kompose.cmd: C:\ProgramData\chocolatey\lib\kubernetes-kompose\tools\kompose.exe convert
    kompose.version: 1.27.0 (b0ed6a2c9)
  creationTimestamp: null
  labels:
    io.kompose.service: finnhubproducer
  name: finnhubproducer
spec:
  ports:
    - name: "8001"
      port: 8001
      targetPort: 8001
  selector:
    io.kompose.service: finnhubproducer
status:
  loadBalancer: {}

конвейерная сеть-сетевая политика.yaml:

apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
  creationTimestamp: null
  name: pipeline-network
spec:
  ingress:
    - from:
        - podSelector:
            matchLabels:
              io.kompose.network/pipeline-network: "true"
  podSelector:
    matchLabels:
      io.kompose.network/pipeline-network: "true"

Обновлено: Dockerfile для образа Kafka:

FROM confluentinc/cp-kafka:6.2.0

COPY ./scripts/kafka-setup-k8s.sh /kafka-setup-k8s.sh

кафка-настройка-k8s.sh:


# blocks until kafka is reachable
kafka-topics --bootstrap-server localhost:29092 --list

echo -e 'Creating kafka topics'
kafka-topics --bootstrap-server localhost:29092 --create --if-not-exists --topic market --replication-factor 1 --partitions 1

echo -e 'Successfully created the following topics:'
kafka-topics --bootstrap-server localhost:29092 --list
Развертывание модели машинного обучения с помощью Flask - Angular в Kubernetes
Развертывание модели машинного обучения с помощью Flask - Angular в Kubernetes
Kubernetes - это портативная, расширяемая платформа с открытым исходным кодом для управления контейнерными рабочими нагрузками и сервисами, которая...
Как создать PHP Image с нуля
Как создать PHP Image с нуля
Сегодня мы создадим PHP Image from Scratch для того, чтобы легко развернуть базовые PHP-приложения. Пожалуйста, имейте в виду, что это разработка для...
1
0
74
2
Перейти к ответу Данный вопрос помечен как решенный

Ответы 2

Селектор приложений вашего сервиса — kafka, а развертывание — kafka-app, поэтому они не связаны.

Я предлагаю вам использовать Strimzi (или Confluent для Kubernetes, если вы хотите использовать их образы), а не преобразовывать существующий файл Docker Compose с помощью Kompose, поскольку он редко корректирует сетевые политики. На самом деле, вы, вероятно, можете удалить сетевые метки и полностью удалить сетевую политику, так как это на самом деле не нужно в том же пространстве имен.

Что касается вашего приложения Python, вам не нужно отдельно определять хост и порт Kafka; используйте одну переменную для KAFKA_BOOTSTRAP_SERVERS, которая может принимать несколько брокеров, включая их порты

Спасибо за отзыв. Я исправил имя селектора приложений, но это не решило проблему. Как я уже писал, образ Docker — это confluentinc/cp-kafka:6.2.0 с той лишь разницей, что я скопировал сценарий оболочки (который используется в жизненном цикле) в образ в пользовательском файле Docker для настройки тем. Я посмотрю на Стримзи, если не найду решение до конца дня.

RSK RSK 04.02.2023 17:21

Не видя, что на самом деле делает ваш сценарий оболочки, вы переопределили там собственные команды Confluent, так действительно ли брокер запускается? Вы должны иметь возможность использовать AdminClient из библиотеки Python Kafka для создания тем во время выполнения, а не пытаться сделать это из брокера.

OneCricketeer 04.02.2023 17:28

Да, брокер, кажется, работает по назначению, основываясь на журналах из модуля Kafka. Я добавил файл Dockerfile и сценарий оболочки в исходный пост, хотя пытался запустить его с исходным слитным образом с тем же результатом (снова выдает ошибку NoBrokerAvailable()). Мне кажется проблема с сетью, но не уверен на 100%

RSK RSK 04.02.2023 17:36

Одним из шагов отладки будет kubectl run изображение с установленным kcat, а затем использование kcat -L -b kafka-service.default.svc.cluster.local:9092

OneCricketeer 04.02.2023 17:39

Я могу взаимодействовать с брокером внутри пода на localhost:29092, но он не работает с kafka-service:9092, запуская скрипты следующим образом: kubectl create -n default -f - <<EOF apiVersion: v1 kind: Pod metadata: name: kafka-test spec: containers: - name: kafka-test image: edenhill/kcat:1.7.0 # Just spin & wait forever command: [ "/bin/sh", "-c", "--" ] args: [ "while true; do sleep 3000; done;" ] EOF После запуска пода: kubectl exec -it -n default kafka-test -- shkcat -L -b kafka-service.default.svc.cluster.local:9092

RSK RSK 06.02.2023 15:37

Я также удалил аннотации сетевой политики для файлов развертывания приложений kafka, zookeeper и python.

RSK RSK 06.02.2023 15:44

Как я уже сказал, компоновка — не лучшая отправная точка. В Confluent есть как операторы, так и (неподдерживаемые) Helm Charts. Вот личный пример использования контейнера Python/Flask (папка data-gen), подключенного к серверу Kafka, развернутому через Helm. Если вы хотите использовать более поддерживаемые диаграммы Helm или (бесплатные) Operators, я обычно рекомендую Strimzi.

OneCricketeer 06.02.2023 17:27

Мне только что удалось это исправить :) проблема была связана с маркировкой Kompose. Большое спасибо за ваш вклад и хорошего дня!

RSK RSK 06.02.2023 17:30
Ответ принят как подходящий

Мне удалось заставить его работать, удалив службы из развертывания и запуска kubectl expose deployment kafka-app. Проблема возникает из-за маркировки Kompose.

Другие вопросы по теме