Дорогие доброжелательные помощники,
У меня есть индекс, который загружается из базы данных через Kafka. Теперь эта база данных содержит поле, которое объединяет пару фрагментов информации, таких как ключ/значение; ключ/значение; (не спрашивайте о причине, я понятия не имею, кому это понравилось и почему ;-))
93/4; 34/12;
он может быть пустым или содержать 1..n пар ключ/значение.
Я хочу использовать конвейер загрузки и в идеале иметь «вложенное» поле, которое содержит все значения, находящиеся в этом поле.
Наверное вот так:
{"категории": { "93": 7, «82»: 4 } }
Вариант использования следующий: мы хотим визуализировать сумму отфильтрованного количества этих категорий (они говорят мне, сколько минут занял конкретный процесс) и связать их в диапазонах.
Пример: я фильтрую категории x, y, z, а затем группирую количество документов за день без задержки, с задержкой до 5 минут и с задержкой от 5 до 15 минут.
Я попытался аккуратно разделить поля с помощью процессора kv и хотел работать с этого момента, но я думаю, что это был совершенно неправильный подход.
"kv": {
"field": "IncomingField",
"field_split": ";",
"value_split": "/",
"target_field": "delays",
"ignore_missing": true,
"trim_key": "\\s",
"trim_value": "\\s",
"ignore_failure": true
}
Когда я тестирую конвейер, кажется, что все в порядке
"delays": {
"62": "3",
"86": "2"
}
но есть две вещи, которые не работают.
Я изучил другие процессоры (dorexpander), но не могу понять, как заставить это работать.
Я надеюсь, что мой вопрос ясен (мне не хватает навыков английского, извините) и что кто-то может указать мне правильное направление.
Большое спасибо!
Вам лучше структурировать их как массив объектов с общими средствами доступа, например:
[ {key: 93, value: 7}, ...]
Таким образом, вы сможете агрегировать categories.key
и categories.value
.
Таким образом, это означает повторение категорий entrySet()
с использованием пользовательского обработчика сценариев, например:
POST _ingest/pipeline/_simulate
{
"pipeline": {
"description": "extracts k/v pairs",
"processors": [
{
"script": {
"source": """
def categories = ctx.categories;
def kv_pairs = new ArrayList();
for (def pair : categories.entrySet()) {
def k = pair.getKey();
def v = pair.getValue();
kv_pairs.add(["key": k, "value": v]);
}
ctx.categories = kv_pairs;
"""
}
}
]
},
"docs": [
{
"_source": {
"categories": {
"82": 4,
"93": 7
}
}
}
]
}
P.S.: Убедитесь, что ваше поле categories
отображено как вложенное b/c, иначе вы потеряете связь между ключами и значениями (также называемое сглаживанием).
Дорогой Джо, спасибо за ответ и извините за поздний ответ. Думаю, это решает половину моей проблемы. В источнике, который у меня есть, поле выглядит так, как указано в первой части моего сообщения: «65/3; 81/1; 98/5». ТАК, не могли бы вы сначала применить процессор kv, а затем предложенное вами решение?
Да, либо это, либо все разделение KV в одном сценарии.
Не могли бы вы помочь больше со сценариями. Эти армейские первые шаги в написании сценариев безболезненны, и я должен сказать, что поиск нужной документации действительно мучителен! Где найти все методы и т.д. и т.п. Какая функция была бы эквивалентна процессору kv, если бы я захотел его использовать. Или у вас есть хороший источник документации, где я могу прочитать? Кажется, я брожу по кругу между эластиком, java-сайтом и прочими. Извините за недостаток знаний здесь и спасибо за вашу готовность помочь!
В итоге я использовал два процессора подряд в узле загрузки в соответствии с рекомендацией Джо. Таким образом, для меня ответ решил мою проблему! Спасибо Джо!
Без проблем. Я не профессионал в Java, поэтому я бы сделал то же самое, что и вы — google😉
Привет, извините, я неправильно прочитал вопрос. Моей отправной точкой было
{"delays":{"62":"3","86":"2"}}
, и именно здесьentrySet()
сработает. Поскольку фактической отправной точкой является строка, подобная93/4; 34/12;
, вам нужно сначала разбить ее по пробелам, а затем по/
— все это можно сделать внутри скрипта. Надеюсь это поможет.