Я пытаюсь вставить запись в elasticsearch, а также обновить поле существующего документа, чей _id я получу из текущей записи. После поиска в Интернете я обнаружил, что мы можем использовать API _update_by_query с плагином http в logstash. Это приведенная ниже конфигурация.
output {
elasticsearch {
hosts => ["localhost:9200"]
index => "my_index_*"
document_id => "%{id_field}"
}
http {
url => "http://localhost:9200/my_index_*/_update_by_query"
http_method => "post"
content_type => "application/json"
format => "message"
message => '{"query":{"match":{"_id":"%{previous_record_id}"}},"script":{"source":"ctx._source.field_to_be_updated=xyz","lang":"painless"}}'
}
}
У Elasticsearch нет защиты паролем, поэтому я не добавлял заголовок авторизации. Но когда я запускаю logstash, текущая запись вставляется, но я всегда получаю следующую ошибку для плагина http.
2022-05-05T11:31:51,916][ERROR][logstash.outputs.http ][logstash_txe] [HTTP Output Failure] Encountered non-2xx HTTP code 400 {:response_code=>400, :url=>"http://localhost:9200/my_index_*/_update_by_query", :event=>#<LogStash::Event:0x192606f8>}
Это не то, как вы должны это делать, вы можете просто использовать вывод elasticsearch для обоих случаев использования.
Первый для индексации новой записи, а следующий для частичного обновления другой записи с идентификатором previous_record_id. Доступ к данным события можно получить в params.event внутри скрипта:
elasticsearch {
hosts => ["localhost:9200"]
index => "my_index_xyz"
document_id => "%{previous_record_id}"
action => "update"
script => "ctx._source.field_to_be_updated = params.event.xyz"
script_lang => "painless"
script_type => "inline"
}
Имя индекса должно быть конкретным, оно не может быть псевдонимом или именем индекса с подстановочными знаками, например my_index_*.
Это проблема. Я не знаю, в каком индексе хранится документ, и поэтому я подумал, что update_by_query будет лучшим способом. Какой другой альтернативный подход вы можете придумать?
Ваши индексы основаны на времени? Если да, то как вы решаете, в какой индекс будет первоначально проиндексирован новый документ?
Да! Мы сохраняем их в индексе на основе года и номера недели из поля метки времени, которое мы получаем в записи. Пример: my_index_2022w18. Итак, вы предлагаете нам получить имя индекса и использовать этот индекс в выходном плагине elasticsearch? Я тоже об этом подумал. Мы будем запрашивать эту запись с помощью подключаемого модуля фильтра elasticsearch и из ответа сохранять имя индекса?
Как получить previous_record_id? У вас также есть временная метка этой записи?
Мы получаем previous_record_id из текущей записи, но не метку времени предыдущей записи. Единственный способ найти предыдущую запись — использовать этот идентификатор и запросить его по всем индексам. Я не уверен, что мы можем сделать это в плагине фильтра и получить имя индекса этой предыдущей записи.
Да, с фильтром elasticsearch можно, надо попробовать.
Спасибо за подход. Вот что я сделал. elasticsearch { hosts => [ "localhost:9200" ] query => "_id:%{previous_record_id}" index => "my_index" fields => { "timestamp" => "previousRecordsTime" } } Я запросил его и получил временную метку предыдущей записи, сгенерировал на ее основе индекс предыдущей записи и обновил его с помощью скрипта. Одна важная вещь, которую я заметил, заключается в том, что после запроса было невозможно получить имя индекса ответа на запрос. Пробовал { "_index" => "prevIndex" }, но давал ноль. Но эта штука заработала, спасибо большое
Привет. Я пробовал это, но я все еще получаю следующую ошибку. {:status=>400, :action=>["update", {:_id=>"00002", :_index=>"my_index_", :routing=>nil, :_type=>"_doc", :retry_on_conflict=>1}, #<LogStash::Event:0x62a1a697>], :response=>{"update"=>{"_index"=> "мой_индекс_", "type"=>"_doc", "_id"=>"00002", "status"=>400, "error"=>{"type"=>"invalid_index_name_exception", "reason"=>"Недопустимое имя индекса [data_lake], не должен содержать следующие символы [ , \", *, \\, <, |, ,, >, /, ?]", "index_uuid"=>"_na_", "index"=>"my_index_"}}}} Говорит, что индекс не должен содержать '*' .