Я создал сборку https://github.com/mongodb/mongo-kafka
Но как это работает для подключения к моему запущенному экземпляру kafka.
Даже как глупо звучит этот вопрос. Но, похоже, нет доступной документации, чтобы заставить это работать с локально запущенным replicaset
из mongodb
.
Все блоги указывают на использование mongo atlas.
Если у вас есть хороший ресурс, пожалуйста, направьте меня к нему.
ОБНОВЛЕНИЕ 1 --
Используемый плагин maven - https://search.maven.org/artifact/org.mongodb.kafka/mongo-kafka-connect
Поместил его в плагины кафки, перезапустил кафку.
ОБНОВЛЕНИЕ 2. Как включить mongodb в качестве источника для kafka?
https://github.com/mongodb/mongo-kafka/blob/master/config/MongoSourceConnector.properties
файл, который будет использоваться в качестве конфигурации для Kafka
bin/kafka-server-start.sh config/server.properties --override config/MongoSourceConnector.properties
ОБНОВЛЕНИЕ 3. Вышеупомянутый метод не сработал, если вернуться к блогу, в котором не упоминается, что такое порт 8083.
Установил Confluent и confluent-hub, все еще не уверен, что mongo-connector работает с kafka.
ОБНОВЛЕНИЕ 4 -
Zookeeper, Kafka Server, Kafka connect работает
Файлы библиотеки Mongo KafkaФайлы библиотеки Kafka Connect Avro Connector
Используя приведенные ниже команды, мой источник заработал -
bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties
bin/connect-standalone.sh config/connect-standalone.properties config/MongoSourceConnector.properties
И, используя приведенную ниже конфигурацию для logstash, я смог передать данные в elasticsearch -
input {
kafka {
bootstrap_servers => "localhost:9092"
topics => ["users","organisations","skills"]
}
}
output {
elasticsearch {
hosts => ["localhost:9200"]
}
stdout { codec => rubydebug }
}
Итак, теперь один MongoSourceConnector.properties хранит одно имя коллекции, из которого он считывается, мне нужно запустить kafka connect с разными файлами свойств для каждой коллекции.
Мой Logstash отправляет новые данные в elasticsearch вместо обновления старых данных. Кроме того, он не создает индексы в соответствии с названием коллекции. Идея в том, что это должно идеально синхронизироваться с моей базой данных MongoDB.
ПОСЛЕДНЕЕ ОБНОВЛЕНИЕ - Теперь все работает гладко,
input {
kafka {
bootstrap_servers => "localhost:9092"
decorate_events => true
topics => ["users","organisations","skills"]
}
}
filter {
json {
source => "message"
target => "json_payload"
}
json {
source => "[json_payload][payload]"
target => "payload"
}
mutate {
add_field => { "[es_index]" => "%{[@metadata][kafka][topic]}" }
rename => { "[payload][fullDocument][_id][$oid]" => "mongo_id"}
rename => { "[payload][fullDocument]" => "document"}
remove_field => ["message","json_payload","payload"]
}
}
output {
elasticsearch {
hosts => ["localhost:9200"]
index => "%{es_index}"
action => "update"
doc_as_upsert => true
document_id => "%{mongo_id}"
}
stdout {
codec =>
rubydebug {
metadata => true
}
}
}
Хорошо, я сделаю это.
Порт 8083 — это Kafka Connect, который вы запускаете с помощью одного из скриптов connect-*.sh
.
Он автономен от брокера, и свойства не устанавливаются от kafka-server-start
Понял. Спасибо.
Шаги для успешной синхронизации MongoDb с Elasticsearch —
//Make sure no mongo deamon instance is running
//To check all the ports which are listening or open
sudo lsof -i -P -n | grep LISTEN
//Kill the process Id of mongo instance
sudo kill 775
//Deploy replicaset
mongod --replSet "rs0" --bind_ip localhost --dbpath=/data/db
//dummycollection.properties <- Filename
name=dummycollection-source
connector.class=com.mongodb.kafka.connect.MongoSourceConnector
tasks.max=1
# Connection and source configuration
connection.uri=mongodb://localhost:27017
database=dummydatabase
collection=dummycollection
copy.existing=true
topic.prefix=
poll.max.batch.size=1000
poll.await.time.ms=5000
# Change stream options
publish.full.document.only=true
pipeline=[]
batch.size=0
collation=
Поиск в центральном репозитории Maven
//Zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties
//Kaka Server
bin/kafka-server-start.sh config/server.properties
//Kaka Connect
bin/connect-standalone.sh config/connect-standalone.properties config/dummycollection.properties
// /etc/logstash/conf.d/apache.conf <- File
input {
kafka {
bootstrap_servers => "localhost:9092"
decorate_events => true
topics => ["dummydatabase.dummycollection"]
}
}
filter {
json {
source => "message"
target => "json_payload"
}
json {
source => "[json_payload][payload]"
target => "payload"
}
mutate {
add_field => { "[es_index]" => "%{[@metadata][kafka][topic]}" }
rename => { "[payload][fullDocument][_id][$oid]" => "mongo_id"}
rename => { "[payload][fullDocument]" => "document"}
remove_field => ["message","json_payload","payload"]
}
}
output {
elasticsearch {
hosts => ["localhost:9200"]
index => "%{es_index}"
action => "update"
doc_as_upsert => true
document_id => "%{mongo_id}"
}
stdout {
codec =>
rubydebug {
metadata => true
}
}
}
sudo systemctl start elasticsearch
sudo systemctl start kibana
sudo systemctl start logstash
Откройте Mongo Compass и
Обзор индексов в Elasticsearch
Вместо того, чтобы редактировать вопрос, вы можете опубликовать ответ с вашим решением