Как запустить коннектор mongo-kafka в качестве источника для kafka и интегрировать его с вводом logstash, чтобы использовать elasticsearch в качестве приемника?

Я создал сборку https://github.com/mongodb/mongo-kafka

Но как это работает для подключения к моему запущенному экземпляру kafka.

Даже как глупо звучит этот вопрос. Но, похоже, нет доступной документации, чтобы заставить это работать с локально запущенным replicaset из mongodb.

Все блоги указывают на использование mongo atlas.

Если у вас есть хороший ресурс, пожалуйста, направьте меня к нему.

ОБНОВЛЕНИЕ 1 --

Используемый плагин maven - https://search.maven.org/artifact/org.mongodb.kafka/mongo-kafka-connect

Поместил его в плагины кафки, перезапустил кафку.

ОБНОВЛЕНИЕ 2. Как включить mongodb в качестве источника для kafka?

https://github.com/mongodb/mongo-kafka/blob/master/config/MongoSourceConnector.properties

файл, который будет использоваться в качестве конфигурации для Kafka

bin/kafka-server-start.sh config/server.properties --override config/MongoSourceConnector.properties

ОБНОВЛЕНИЕ 3. Вышеупомянутый метод не сработал, если вернуться к блогу, в котором не упоминается, что такое порт 8083.

Установил Confluent и confluent-hub, все еще не уверен, что mongo-connector работает с kafka.

ОБНОВЛЕНИЕ 4 -

Zookeeper, Kafka Server, Kafka connect работает

Файлы библиотеки Mongo KafkaФайлы библиотеки Kafka Connect Avro Connector

Используя приведенные ниже команды, мой источник заработал -

bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties
bin/connect-standalone.sh config/connect-standalone.properties config/MongoSourceConnector.properties

И, используя приведенную ниже конфигурацию для logstash, я смог передать данные в elasticsearch -

input {
  kafka {
        bootstrap_servers => "localhost:9092"
        topics => ["users","organisations","skills"]
  }
}
output {
  elasticsearch {
        hosts => ["localhost:9200"]
  }
  stdout { codec => rubydebug }
}

Итак, теперь один MongoSourceConnector.properties хранит одно имя коллекции, из которого он считывается, мне нужно запустить kafka connect с разными файлами свойств для каждой коллекции.

Мой Logstash отправляет новые данные в elasticsearch вместо обновления старых данных. Кроме того, он не создает индексы в соответствии с названием коллекции. Идея в том, что это должно идеально синхронизироваться с моей базой данных MongoDB.

ПОСЛЕДНЕЕ ОБНОВЛЕНИЕ - Теперь все работает гладко,

  • Создано несколько файлов свойств для kafka connect.
  • Последний logstash фактически создает индекс в соответствии с именем темы и соответствующим образом обновляет индексы.

input {
    kafka {
        bootstrap_servers => "localhost:9092"
        decorate_events => true
        topics => ["users","organisations","skills"]
    }
}
filter {
    json {
        source => "message"
        target => "json_payload"
    }

    json {
        source => "[json_payload][payload]"
        target => "payload"
    }

    mutate {
        add_field => { "[es_index]" => "%{[@metadata][kafka][topic]}" }
        rename => { "[payload][fullDocument][_id][$oid]" => "mongo_id"}
        rename => { "[payload][fullDocument]" => "document"}
        remove_field => ["message","json_payload","payload"]
    }
}
output {
    elasticsearch {
        hosts => ["localhost:9200"]
        index => "%{es_index}"
        action => "update"
        doc_as_upsert => true
        document_id => "%{mongo_id}"
    }
    stdout {
        codec =>
        rubydebug {
            metadata => true
        }
    }
}

Вместо того, чтобы редактировать вопрос, вы можете опубликовать ответ с вашим решением

OneCricketeer 23.12.2020 18:37

Хорошо, я сделаю это.

zion 24.12.2020 03:57
Использование JavaScript и MongoDB
Использование JavaScript и MongoDB
Сегодня я собираюсь вкратце рассказать о прототипах в JavaScript, а также представить и объяснить вам работу с базой данных MongoDB.
1
2
1 017
2
Перейти к ответу Данный вопрос помечен как решенный

Ответы 2

Порт 8083 — это Kafka Connect, который вы запускаете с помощью одного из скриптов connect-*.sh.

Он автономен от брокера, и свойства не устанавливаются от kafka-server-start

Понял. Спасибо.

zion 23.12.2020 06:14
Ответ принят как подходящий

Шаги для успешной синхронизации MongoDb с Elasticsearch —

  • Сначала разверните реплику mongodb —
//Make sure no mongo deamon instance is running
//To check all the ports which are listening or open
sudo lsof -i -P -n | grep LISTEN 

//Kill the process Id of mongo instance
sudo kill 775

//Deploy replicaset
mongod --replSet "rs0" --bind_ip localhost --dbpath=/data/db
  • Создать свойства конфигурации для Kafka
//dummycollection.properties <- Filename
name=dummycollection-source
connector.class=com.mongodb.kafka.connect.MongoSourceConnector
tasks.max=1

# Connection and source configuration
connection.uri=mongodb://localhost:27017
database=dummydatabase
collection=dummycollection
copy.existing=true
topic.prefix=
poll.max.batch.size=1000
poll.await.time.ms=5000

# Change stream options
publish.full.document.only=true
pipeline=[]
batch.size=0
collation=
  • Убедитесь, что файлы JAR по указанным ниже URL-адресам доступны для ваших плагинов kafka -

Поиск в центральном репозитории Maven

Конвертер Kafka Connect Avro

  • Развернуть кафку
//Zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties

//Kaka Server
bin/kafka-server-start.sh config/server.properties

//Kaka Connect
bin/connect-standalone.sh config/connect-standalone.properties config/dummycollection.properties
  • Конфигурация Logstash -
// /etc/logstash/conf.d/apache.conf  <- File 
input {
  kafka {
        bootstrap_servers => "localhost:9092"
        decorate_events => true
        topics => ["dummydatabase.dummycollection"]
  }
}
filter {
    json {
        source => "message"
        target => "json_payload"
    }

    json {
        source => "[json_payload][payload]"
        target => "payload"
    }

    mutate {
        add_field => { "[es_index]" => "%{[@metadata][kafka][topic]}" }
        rename => { "[payload][fullDocument][_id][$oid]" => "mongo_id"}
        rename => { "[payload][fullDocument]" => "document"}
        remove_field => ["message","json_payload","payload"]
    }
}
output {
    elasticsearch {
        hosts => ["localhost:9200"]
        index => "%{es_index}"
        action => "update"
        doc_as_upsert => true
        document_id => "%{mongo_id}"
    }
    stdout {
      codec =>
        rubydebug {
            metadata => true
        }
    }
}
  • Запустите ElasticSearch, Kibana и Logstash.
sudo systemctl start elasticsearch
sudo systemctl start kibana
sudo systemctl start logstash
  • Тест

Откройте Mongo Compass и

  • создайте коллекцию, упомяните эту коллекцию в темах logstash и создайте файлы свойств для Kafka
  • Добавьте в него данные
  • Обновить данные

Обзор индексов в Elasticsearch

Другие вопросы по теме