Я попытался подключиться к Confluent Kafka с помощью KafkaSource (из MLRun) и исторически использовал этот простой код:
# code with usage 'kafka-python>=2.0.2'
from kafka import KafkaProducer, KafkaConsumer
consumer = KafkaConsumer(
'ak47-data.v1',
bootstrap_servers =[
'cpkafka01.eu.prod:9092',
'cpkafka02.eu.prod:9092',
'cpkafka03.eu.prod:9092'
],
client_id='test',
auto_offset_reset='earliest',
sasl_mechanism = "SCRAM-SHA-256",
sasl_plain_password = "***********",
sasl_plain_username = "***********",
security_protocol='SASL_SSL',
ssl_cafile = "/v3io/bigdata/rootca.crt",
ssl_certfile=None,
ssl_keyfile=None)
# print first topic
for message in consumer:
print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
message.offset, message.key, message.value))
break
Как переписать этот код с использованием KafkaSource?
Я использую последнюю версию MLRun>=1.1.2 (но также 1.2.0-rc13)
Позвольте мне поделиться кодом функции для KafkaSource (для MLRun>=1.1.0). Вы также можете указать сертификат (см. rootca.crt) и список тем kafka.
from mlrun.datastore.sources import KafkaSource
# certificate
with open('/v3io/bigdata/rootca.crt') as x:
caCert = x.read()
# definition of KafkaSource
kafka_source = KafkaSource(
brokers=['cpkafka01.eu.prod:9092',
'cpkafka02.eu.prod:9092',
'cpkafka03.eu.prod:9092'],
topics=["ak47-data.v1"],
initial_offset = "earliest",
group = "test",
attributes = {"sasl" : {
"enable": True,
"password" : "******",
"user" : "*******",
"handshake" : True,
"mechanism" : "SCRAM-SHA-256"},
"tls" : {
"enable": True,
"insecureSkipVerify" : False
},
"caCert" : caCert}
)
Какую версию MLRun вы используете?