Моя цель — отсортировать миллионы журналов по отметке времени, которую я получаю от Elasticsearch.
Примеры журналов:
{"realIp": "192.168.0.2", "@timestamp": "2020-12-06T02:00:09.000Z"}
{"realIp": "192.168.0.2", "@timestamp": "2020-12-06T02:01:09.000Z"}
{"realIp": "192.168.0.2", "@timestamp": "2020-12-06T02:02:09.000Z"}
{"realIp": "192.168.0.2", "@timestamp": "2020-12-06T02:04:09.000Z"}
К сожалению, я не могу отсортировать все журналы Elastic. Кажется, я должен сделать это сам.
Подходы, которые я пытался получить, отсортировали данные из эластичности:
es = Search(index = "somelogs-*").using(client).params(preserve_order=True)
for hit in es.scan():
print(hit['@timestamp'])
Другой подход:
notifications = (es
.query("range", **{
"@timestamp": {
'gte': 'now-48h',
'lt' : 'now'
}
})
.sort("@timestamp")
.scan()
)
Поэтому я ищу способ отсортировать эти журналы самостоятельно или напрямую через Elasticsearch. В настоящее время я сохраняю все данные в локальном «logs.json», и мне кажется, что мне нужно перебрать и отсортировать их самостоятельно.
Вы обязательно должны позволить Elasticsearch выполнить сортировку, а затем вернуть вам уже отсортированные данные.
Проблема в том, что вы используете .scan() . Он использует API сканирования/прокрутки Elasticsearch, который, к сожалению, применяет параметры сортировки только к каждой странице/фрагменту, а не ко всему результату поиска. Это отмечено в документах elasticsearch-dsl по нумерации страниц:
Пагинация
...
Если вы хотите получить доступ ко всем документам, соответствующим вашему запросу, вы можете используйте метод сканирования, который использует API эластичного поиска сканирования/прокрутки:for hit in s.scan(): print(hit.title)
Обратите внимание, что в этом случае результаты не будут сортироваться.
(emphasis mine)
Использование разбиения на страницы, безусловно, является вариантом, особенно когда у вас есть «миллионы журналов», как вы сказали. Существует API разбивки на страницы search_after:
Искать после
Вы можете использовать параметр
search_after
для получения следующей страницы хиты, используя набор значений сортировки с предыдущей страницы.
...
Чтобы получить первую страницу результатов, отправьте поисковый запрос сsort
аргумент.
...
Ответ поиска включает массив значенийsort
для каждый удар.
...
Чтобы получить следующую страницу результатов, повторите предыдущий поиск, используя значенияsort
последнего совпадения в качестве аргументаsearch_after
. ... Аргументы поискаquery
иsort
должны остаться без изменений. Если предоставлен, аргументfrom
должен быть0
(по умолчанию) или-1
.
...
Вы можете повторить этот процесс, чтобы получить дополнительные страницы результатов.
(omitted the raw JSON requests since I'll show a sample in Python below)
Вот пример того, как это сделать с помощью elasticsearch-dsl для Python. Обратите внимание, что я ограничиваю fields
и количество результатов, чтобы упростить тестирование. Важными частями здесь являются sort
и extra(search_after=)
.
search = Search(using=client, index='some-index')
# The main query
search = search.extra(size=100)
search = search.query('range', **{'@timestamp': {'gte': '2020-12-29T09:00', 'lt': '2020-12-29T09:59'}})
search = search.source(fields=('@timestamp', ))
search = search.sort({
'@timestamp': {
'order': 'desc'
},
})
# Store all the results (it would be better to be wrap all this in a generator to be performant)
hits = []
# Get the 1st page
results = search.execute()
hits.extend(results.hits)
total = results.hits.total
print(f'Expecting {total}')
# Get the next pages
# Real use-case condition should be "until total" or "until no more results.hits"
while len(hits) < 1000:
print(f'Now have {len(hits)}')
last_hit_sort_id = hits[-1].meta.sort[0]
search = search.extra(search_after=[last_hit_sort_id])
results = search.execute()
hits.extend(results.hits)
with open('results.txt', 'w') as out:
for hit in hits:
out.write(f'{hit["@timestamp"]}\n')
Это приведет к уже отсортированным данным:
# 1st 10 lines
2020-12-29T09:58:57.749Z
2020-12-29T09:58:55.736Z
2020-12-29T09:58:53.627Z
2020-12-29T09:58:52.738Z
2020-12-29T09:58:47.221Z
2020-12-29T09:58:45.676Z
2020-12-29T09:58:44.523Z
2020-12-29T09:58:43.541Z
2020-12-29T09:58:40.116Z
2020-12-29T09:58:38.206Z
...
# 250-260
2020-12-29T09:50:31.117Z
2020-12-29T09:50:27.754Z
2020-12-29T09:50:25.738Z
2020-12-29T09:50:23.601Z
2020-12-29T09:50:17.736Z
2020-12-29T09:50:15.753Z
2020-12-29T09:50:14.491Z
2020-12-29T09:50:13.555Z
2020-12-29T09:50:07.721Z
2020-12-29T09:50:05.744Z
2020-12-29T09:50:03.630Z
...
# 675-685
2020-12-29T09:43:30.609Z
2020-12-29T09:43:30.608Z
2020-12-29T09:43:30.602Z
2020-12-29T09:43:30.570Z
2020-12-29T09:43:30.568Z
2020-12-29T09:43:30.529Z
2020-12-29T09:43:30.475Z
2020-12-29T09:43:30.474Z
2020-12-29T09:43:30.468Z
2020-12-29T09:43:30.418Z
2020-12-29T09:43:30.417Z
...
# 840-850
2020-12-29T09:43:27.953Z
2020-12-29T09:43:27.929Z
2020-12-29T09:43:27.927Z
2020-12-29T09:43:27.920Z
2020-12-29T09:43:27.897Z
2020-12-29T09:43:27.895Z
2020-12-29T09:43:27.886Z
2020-12-29T09:43:27.861Z
2020-12-29T09:43:27.860Z
2020-12-29T09:43:27.853Z
2020-12-29T09:43:27.828Z
...
# Last 3
2020-12-29T09:43:25.878Z
2020-12-29T09:43:25.876Z
2020-12-29T09:43:25.869Z
Есть некоторые соображения по использованию search_after, как описано в документации по API:
Если между этими запросами происходит обновление, порядок ваших результатов может измениться, что приведет к несогласованности результатов на разных страницах. Чтобы предотвратить это, вы можете создать момент времени (PIT), чтобы сохранить текущее состояние индекса во время ваших поисков.
extra
'pit': {'id':xxxx, 'keep_alive':5m}
к каждому запросу.Мы рекомендуем включить в сортировку поле разрешения конфликтов. Это поле разрешения конфликтов должно содержать уникальное значение для каждого документа. Если вы не включите поле для разрешения конфликтов, в выводимых на страницы результатах могут отсутствовать или дублироваться совпадения.
# Add some ID as a tiebreaker to the `sort` call
search = search.sort(
{'@timestamp': {
'order': 'desc'
}},
{'some.id': {
'order': 'desc'
}}
)
# Include both the sort ID and the some.ID in `search_after`
last_hit_sort_id, last_hit_route_id = hits[-1].meta.sort
search = search.extra(search_after=[last_hit_sort_id, last_hit_route_id])
Спасибо Джино Мемпин. Оно работает!
Но я также понял, что простое изменение делает ту же работу.
добавив .params(preserve_order=True)
elasticsearch, вы отсортируете все данные.
es = Search(index = "somelog-*").using(client)
notifications = (es
.query("range", **{
"@timestamp": {
'gte': 'now-48h',
'lt' : 'now'
}
})
.sort("@timestamp")
.params(preserve_order=True)
.scan()
)