Как сортировать журналы с разбивкой на страницы по @timestamp с помощью Elasticsearch?

Моя цель — отсортировать миллионы журналов по отметке времени, которую я получаю от Elasticsearch.

Примеры журналов:

{"realIp": "192.168.0.2", "@timestamp": "2020-12-06T02:00:09.000Z"}
{"realIp": "192.168.0.2", "@timestamp": "2020-12-06T02:01:09.000Z"}
{"realIp": "192.168.0.2", "@timestamp": "2020-12-06T02:02:09.000Z"}
{"realIp": "192.168.0.2", "@timestamp": "2020-12-06T02:04:09.000Z"}

К сожалению, я не могу отсортировать все журналы Elastic. Кажется, я должен сделать это сам.

Подходы, которые я пытался получить, отсортировали данные из эластичности:

es = Search(index = "somelogs-*").using(client).params(preserve_order=True)
for hit in es.scan():
    print(hit['@timestamp'])

Другой подход:

notifications = (es
    .query("range", **{
        "@timestamp": {
            'gte': 'now-48h',
            'lt' : 'now'
        }
    })
    .sort("@timestamp")
    .scan()
)

Поэтому я ищу способ отсортировать эти журналы самостоятельно или напрямую через Elasticsearch. В настоящее время я сохраняю все данные в локальном «logs.json», и мне кажется, что мне нужно перебрать и отсортировать их самостоятельно.

Почему в Python есть оператор "pass"?
Почему в Python есть оператор "pass"?
Оператор pass в Python - это простая концепция, которую могут быстро освоить даже новички без опыта программирования.
Некоторые методы, о которых вы не знали, что они существуют в Python
Некоторые методы, о которых вы не знали, что они существуют в Python
Python - самый известный и самый простой в изучении язык в наши дни. Имея широкий спектр применения в области машинного обучения, Data Science,...
Основы Python Часть I
Основы Python Часть I
Вы когда-нибудь задумывались, почему в программах на Python вы видите приведенный ниже код?
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
Алиса и Боб имеют неориентированный граф из n узлов и трех типов ребер:
Оптимизация кода с помощью тернарного оператора Python
Оптимизация кода с помощью тернарного оператора Python
И последнее, что мы хотели бы показать вам, прежде чем двигаться дальше, это
Советы по эффективной веб-разработке с помощью Python
Советы по эффективной веб-разработке с помощью Python
Как веб-разработчик, Python может стать мощным инструментом для создания эффективных и масштабируемых веб-приложений.
1
0
2 607
2
Перейти к ответу Данный вопрос помечен как решенный

Ответы 2

Вы обязательно должны позволить Elasticsearch выполнить сортировку, а затем вернуть вам уже отсортированные данные.

Проблема в том, что вы используете .scan() . Он использует API сканирования/прокрутки Elasticsearch, который, к сожалению, применяет параметры сортировки только к каждой странице/фрагменту, а не ко всему результату поиска. Это отмечено в документах elasticsearch-dsl по нумерации страниц:

Пагинация

...
Если вы хотите получить доступ ко всем документам, соответствующим вашему запросу, вы можете используйте метод сканирования, который использует API эластичного поиска сканирования/прокрутки:

for hit in s.scan():
    print(hit.title)

Обратите внимание, что в этом случае результаты не будут сортироваться.

(emphasis mine)

Использование разбиения на страницы, безусловно, является вариантом, особенно когда у вас есть «миллионы журналов», как вы сказали. Существует API разбивки на страницы search_after:

Искать после

Вы можете использовать параметр search_after для получения следующей страницы хиты, используя набор значений сортировки с предыдущей страницы.
...
Чтобы получить первую страницу результатов, отправьте поисковый запрос с sort аргумент.
...
Ответ поиска включает массив значений sort для каждый удар.
...
Чтобы получить следующую страницу результатов, повторите предыдущий поиск, используя значения sort последнего совпадения в качестве аргумента search_after. ... Аргументы поиска query и sort должны остаться без изменений. Если предоставлен, аргумент from должен быть 0 (по умолчанию) или -1.
...
Вы можете повторить этот процесс, чтобы получить дополнительные страницы результатов.

(omitted the raw JSON requests since I'll show a sample in Python below)

Вот пример того, как это сделать с помощью elasticsearch-dsl для Python. Обратите внимание, что я ограничиваю fields и количество результатов, чтобы упростить тестирование. Важными частями здесь являются sort и extra(search_after=).

search = Search(using=client, index='some-index')

# The main query
search = search.extra(size=100)
search = search.query('range', **{'@timestamp': {'gte': '2020-12-29T09:00', 'lt': '2020-12-29T09:59'}})
search = search.source(fields=('@timestamp', ))
search = search.sort({
    '@timestamp': {
        'order': 'desc'
    },
})

# Store all the results (it would be better to be wrap all this in a generator to be performant)
hits = []

# Get the 1st page
results = search.execute()
hits.extend(results.hits)
total = results.hits.total
print(f'Expecting {total}')

# Get the next pages
# Real use-case condition should be "until total" or "until no more results.hits"
while len(hits) < 1000:  
    print(f'Now have {len(hits)}')
    last_hit_sort_id = hits[-1].meta.sort[0]
    search = search.extra(search_after=[last_hit_sort_id])
    results = search.execute()
    hits.extend(results.hits)

with open('results.txt', 'w') as out:
    for hit in hits:
        out.write(f'{hit["@timestamp"]}\n')

Это приведет к уже отсортированным данным:

# 1st 10 lines
2020-12-29T09:58:57.749Z
2020-12-29T09:58:55.736Z
2020-12-29T09:58:53.627Z
2020-12-29T09:58:52.738Z
2020-12-29T09:58:47.221Z
2020-12-29T09:58:45.676Z
2020-12-29T09:58:44.523Z
2020-12-29T09:58:43.541Z
2020-12-29T09:58:40.116Z
2020-12-29T09:58:38.206Z
...
# 250-260
2020-12-29T09:50:31.117Z
2020-12-29T09:50:27.754Z
2020-12-29T09:50:25.738Z
2020-12-29T09:50:23.601Z
2020-12-29T09:50:17.736Z
2020-12-29T09:50:15.753Z
2020-12-29T09:50:14.491Z
2020-12-29T09:50:13.555Z
2020-12-29T09:50:07.721Z
2020-12-29T09:50:05.744Z
2020-12-29T09:50:03.630Z 
...
# 675-685
2020-12-29T09:43:30.609Z
2020-12-29T09:43:30.608Z
2020-12-29T09:43:30.602Z
2020-12-29T09:43:30.570Z
2020-12-29T09:43:30.568Z
2020-12-29T09:43:30.529Z
2020-12-29T09:43:30.475Z
2020-12-29T09:43:30.474Z
2020-12-29T09:43:30.468Z
2020-12-29T09:43:30.418Z
2020-12-29T09:43:30.417Z
...
# 840-850
2020-12-29T09:43:27.953Z
2020-12-29T09:43:27.929Z
2020-12-29T09:43:27.927Z
2020-12-29T09:43:27.920Z
2020-12-29T09:43:27.897Z
2020-12-29T09:43:27.895Z
2020-12-29T09:43:27.886Z
2020-12-29T09:43:27.861Z
2020-12-29T09:43:27.860Z
2020-12-29T09:43:27.853Z
2020-12-29T09:43:27.828Z
...
# Last 3
2020-12-29T09:43:25.878Z
2020-12-29T09:43:25.876Z
2020-12-29T09:43:25.869Z 

Есть некоторые соображения по использованию search_after, как описано в документации по API:

  • Используйте параметр Point In Time или PIT
    • Если между этими запросами происходит обновление, порядок ваших результатов может измениться, что приведет к несогласованности результатов на разных страницах. Чтобы предотвратить это, вы можете создать момент времени (PIT), чтобы сохранить текущее состояние индекса во время ваших поисков.

    • Вам нужно сначала сделать POST-запрос, чтобы получить PIT ID
    • Затем добавьте параметр extra'pit': {'id':xxxx, 'keep_alive':5m} к каждому запросу.
    • Обязательно используйте идентификатор PIT из последнего ответа.
  • Используйте тай-брейк
    • Мы рекомендуем включить в сортировку поле разрешения конфликтов. Это поле разрешения конфликтов должно содержать уникальное значение для каждого документа. Если вы не включите поле для разрешения конфликтов, в выводимых на страницы результатах могут отсутствовать или дублироваться совпадения.

    • Это будет зависеть от вашей схемы документа
      # Add some ID as a tiebreaker to the `sort` call
      search = search.sort(
          {'@timestamp': {
              'order': 'desc'
          }},
          {'some.id': {
              'order': 'desc'
          }}
      )
      
      # Include both the sort ID and the some.ID in `search_after`
      last_hit_sort_id, last_hit_route_id = hits[-1].meta.sort
      search = search.extra(search_after=[last_hit_sort_id, last_hit_route_id])
      
Ответ принят как подходящий

Спасибо Джино Мемпин. Оно работает!

Но я также понял, что простое изменение делает ту же работу.

добавив .params(preserve_order=True) elasticsearch, вы отсортируете все данные.

es = Search(index = "somelog-*").using(client)
notifications = (es
    .query("range", **{
        "@timestamp": {
            'gte': 'now-48h',
            'lt' : 'now'
        }
    })
    .sort("@timestamp")
    .params(preserve_order=True)
    .scan()
)

Другие вопросы по теме