У меня есть кафка-машина, работающая в AWS, которая состоит из нескольких тем. У меня есть следующая функция Lambda, которая создает сообщение и отправляет его в одну из тем кафки.
import json from kafka
import KafkaClient from kafka
import SimpleProducer from kafka
import KafkaProducer
def lambda_handler(event, context):
kafka = KafkaClient("XXXX.XXX.XX.XX:XXXX")
print(kafka)
producer = SimpleProducer(kafka, async = True)
print(producer)
task_op = {
"'message": "Hai, Calling from AWS Lambda"
}
print(json.dumps(task_op))
producer.send_messages("topic_atx_ticket_update",json.dumps(task_op).encode('utf-8'))
print(producer.send_messages)
return ("Messages Sent to Kafka Topic")
Но я вижу, что сообщения не отправляются, как я ожидал.
Примечание. Нет проблем с ролями и политиками, подключением.
в файле lambda_handler.py я создаю клиент kafka и использую API Kafka Producer для создания сообщений. С моего локального ноутбука я смог отправить сообщения в тему кафки.
Можете ли вы добавить код в свой вопрос?
Вы говорите «нет проблем с… подключением» -> можете ли вы подробно рассказать, как вы это определили? Распространенной проблемой в этом сценарии является неправильная настройка прослушивателей Kafka.
import json из kafka import KafkaClient из kafka import SimpleProducer из kafka import KafkaProducer def lambda_handler(event, context): kafka = KafkaClient("XXXX.XXX.XX.XX:XXXX") print(kafka) производитель = SimpleProducer(kafka, async= True) print(producer) task_op= {"'message":"Привет, звоню из AWS Lambda"} print(json.dumps(task_op)) производитель.send_messages("topic_atx_ticket_update", json.dumps(task_op).encode(' utf-8')) print(producer.send_messages) return("Сообщения, отправленные в тему Kafka")





При создании объекта Kafka Producer
producer = SimpleProducer(kafka, async=True)
«асинхронная» строка должна быть ложной, например
producer = SimpleProducer(kafka, async=False)
Потом,
вы можете отправить сообщение Kafka в тему из AWS Lambda.
Не могли бы вы привести пример вашего кода.