для начала - я новичок в Kubernetes, и я могу опустить некоторые основы.
У меня есть работающее контейнерное приложение, которое организовано с помощью docker-compose (и работает нормально), и я переписываю его для развертывания в Kubernetes. Я преобразовал его в файлы .yaml K8s через Kompose и в некоторой степени изменил его. Я изо всех сил пытаюсь установить соединение между приложением Python и Kafka, которые работают на отдельных модулях. Приложение Python постоянно возвращает ошибку NoBrokersAvailable() независимо от того, что я пытаюсь применить — совершенно очевидно, что оно не может подключиться к брокеру. Что мне не хватает? Я определил правильных слушателей и сетевую политику. Я запускаю его локально на Minikube с локальным реестром образов Docker.
Приложение Python подключается к следующему адресу:
KafkaProducer(bootstrap_servers='kafka-service.default.svc.cluster.local:9092')
kafka-deployment.yaml (образ Dockerfile основан на confluentinc/cp-kafka:6.2.0 с добавленным к нему скриптом настройки тем):
apiVersion: apps/v1
kind: Deployment
metadata:
annotations:
kompose.cmd: C:\ProgramData\chocolatey\lib\kubernetes-kompose\tools\kompose.exe convert
kompose.version: 1.27.0 (b0ed6a2c9)
creationTimestamp: null
labels:
io.kompose.service: kafka
name: kafka-app
spec:
replicas: 1
selector:
matchLabels:
io.kompose.service: kafka
strategy: {}
template:
metadata:
annotations:
kompose.cmd: C:\ProgramData\chocolatey\lib\kubernetes-kompose\tools\kompose.exe convert
kompose.version: 1.27.0 (b0ed6a2c9)
creationTimestamp: null
labels:
io.kompose.network/pipeline-network: "true"
io.kompose.service: kafka
spec:
containers:
- env:
- name: KAFKA_LISTENERS
value: "LISTENER_INTERNAL://0.0.0.0:29092,LISTENER_EXTERNAL://0.0.0.0:9092"
- name: KAFKA_ADVERTISED_LISTENERS
value: "LISTENER_INTERNAL://localhost:29092,LISTENER_EXTERNAL://kafka-service.default.svc.cluster.local:9092"
- name: KAFKA_LISTENER_SECURITY_PROTOCOL_MAP
value: "LISTENER_EXTERNAL:PLAINTEXT,LISTENER_INTERNAL:PLAINTEXT"
- name: KAFKA_INTER_BROKER_LISTENER_NAME
value: "LISTENER_INTERNAL"
- name: KAFKA_BROKER_ID
value: "1"
- name: KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR
value: "1"
- name: KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR
value: "1"
- name: KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS
value: "0"
- name: KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR
value: "1"
- name: KAFKA_TRANSACTION_STATE_LOG_MIN_ISR
value: "1"
- name: KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR
value: "1"
- name: KAFKA_ZOOKEEPER_CONNECT
value: zookeeper:2181
- name: POD_IP
valueFrom:
fieldRef:
fieldPath: status.podIP
image: finnhub-streaming-data-pipeline-kafka:latest
imagePullPolicy: Never
lifecycle:
postStart:
exec:
command: ["/bin/sh","-c","/kafka-setup-k8s.sh"]
name: kafka-app
ports:
- containerPort: 9092
- containerPort: 29092
resources: {}
restartPolicy: Always
---
apiVersion: v1
kind: Service
metadata:
name: kafka-service
spec:
selector:
app: kafka
ports:
- protocol: TCP
name: firstport
port: 9092
targetPort: 9092
- protocol: TCP
name: secondport
port: 29092
targetPort: 29092
finnhub-producer.yaml (он же развертывание моего приложения Python):
apiVersion: apps/v1
kind: Deployment
metadata:
annotations:
kompose.cmd: C:\ProgramData\chocolatey\lib\kubernetes-kompose\tools\kompose.exe convert
kompose.version: 1.27.0 (b0ed6a2c9)
creationTimestamp: null
labels:
io.kompose.service: finnhubproducer
name: finnhubproducer
spec:
replicas: 1
selector:
matchLabels:
io.kompose.service: finnhubproducer
strategy: {}
template:
metadata:
annotations:
kompose.cmd: C:\ProgramData\chocolatey\lib\kubernetes-kompose\tools\kompose.exe convert
kompose.version: 1.27.0 (b0ed6a2c9)
creationTimestamp: null
labels:
io.kompose.network/pipeline-network: "true"
io.kompose.service: finnhubproducer
spec:
containers:
- env:
- name: KAFKA_PORT
value: "9092"
- name: KAFKA_SERVER
value: kafka-service.default.svc.cluster.local
- name: KAFKA_TOPIC_NAME
value: market
image: docker.io/library/finnhub-streaming-data-pipeline-finnhubproducer:latest
imagePullPolicy: Never
name: finnhubproducer
ports:
- containerPort: 8001
resources: {}
restartPolicy: Always
status: {}
---
apiVersion: v1
kind: Service
metadata:
annotations:
kompose.cmd: C:\ProgramData\chocolatey\lib\kubernetes-kompose\tools\kompose.exe convert
kompose.version: 1.27.0 (b0ed6a2c9)
creationTimestamp: null
labels:
io.kompose.service: finnhubproducer
name: finnhubproducer
spec:
ports:
- name: "8001"
port: 8001
targetPort: 8001
selector:
io.kompose.service: finnhubproducer
status:
loadBalancer: {}
конвейерная сеть-сетевая политика.yaml:
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
creationTimestamp: null
name: pipeline-network
spec:
ingress:
- from:
- podSelector:
matchLabels:
io.kompose.network/pipeline-network: "true"
podSelector:
matchLabels:
io.kompose.network/pipeline-network: "true"
Обновлено: Dockerfile для образа Kafka:
FROM confluentinc/cp-kafka:6.2.0
COPY ./scripts/kafka-setup-k8s.sh /kafka-setup-k8s.sh
кафка-настройка-k8s.sh:
# blocks until kafka is reachable
kafka-topics --bootstrap-server localhost:29092 --list
echo -e 'Creating kafka topics'
kafka-topics --bootstrap-server localhost:29092 --create --if-not-exists --topic market --replication-factor 1 --partitions 1
echo -e 'Successfully created the following topics:'
kafka-topics --bootstrap-server localhost:29092 --list
Селектор приложений вашего сервиса — kafka
, а развертывание — kafka-app
, поэтому они не связаны.
Я предлагаю вам использовать Strimzi (или Confluent для Kubernetes, если вы хотите использовать их образы), а не преобразовывать существующий файл Docker Compose с помощью Kompose, поскольку он редко корректирует сетевые политики. На самом деле, вы, вероятно, можете удалить сетевые метки и полностью удалить сетевую политику, так как это на самом деле не нужно в том же пространстве имен.
Что касается вашего приложения Python, вам не нужно отдельно определять хост и порт Kafka; используйте одну переменную для KAFKA_BOOTSTRAP_SERVERS
, которая может принимать несколько брокеров, включая их порты
Не видя, что на самом деле делает ваш сценарий оболочки, вы переопределили там собственные команды Confluent, так действительно ли брокер запускается? Вы должны иметь возможность использовать AdminClient из библиотеки Python Kafka для создания тем во время выполнения, а не пытаться сделать это из брокера.
Да, брокер, кажется, работает по назначению, основываясь на журналах из модуля Kafka. Я добавил файл Dockerfile и сценарий оболочки в исходный пост, хотя пытался запустить его с исходным слитным образом с тем же результатом (снова выдает ошибку NoBrokerAvailable()). Мне кажется проблема с сетью, но не уверен на 100%
Одним из шагов отладки будет kubectl run
изображение с установленным kcat
, а затем использование kcat -L -b kafka-service.default.svc.cluster.local:9092
Я могу взаимодействовать с брокером внутри пода на localhost:29092, но он не работает с kafka-service:9092, запуская скрипты следующим образом: kubectl create -n default -f - <<EOF apiVersion: v1 kind: Pod metadata: name: kafka-test spec: containers: - name: kafka-test image: edenhill/kcat:1.7.0 # Just spin & wait forever command: [ "/bin/sh", "-c", "--" ] args: [ "while true; do sleep 3000; done;" ] EOF
После запуска пода: kubectl exec -it -n default kafka-test -- sh
kcat -L -b kafka-service.default.svc.cluster.local:9092
Я также удалил аннотации сетевой политики для файлов развертывания приложений kafka, zookeeper и python.
Как я уже сказал, компоновка — не лучшая отправная точка. В Confluent есть как операторы, так и (неподдерживаемые) Helm Charts. Вот личный пример использования контейнера Python/Flask (папка data-gen
), подключенного к серверу Kafka, развернутому через Helm. Если вы хотите использовать более поддерживаемые диаграммы Helm или (бесплатные) Operators, я обычно рекомендую Strimzi.
Мне только что удалось это исправить :) проблема была связана с маркировкой Kompose. Большое спасибо за ваш вклад и хорошего дня!
Мне удалось заставить его работать, удалив службы из развертывания и запуска kubectl expose deployment kafka-app
. Проблема возникает из-за маркировки Kompose.
Спасибо за отзыв. Я исправил имя селектора приложений, но это не решило проблему. Как я уже писал, образ Docker — это confluentinc/cp-kafka:6.2.0 с той лишь разницей, что я скопировал сценарий оболочки (который используется в жизненном цикле) в образ в пользовательском файле Docker для настройки тем. Я посмотрю на Стримзи, если не найду решение до конца дня.