Logstash извлекает данные из разных типов сообщений

Ниже приведены 3 примера типов журналов, которые я получаю от нашей платформы автоматизации. Я хочу извлечь раздел customOptions. Проблема, с которой я сталкиваюсь, заключается в том, что в разделе пользовательских параметров их может быть много. Я думаю, что мне нужно сделать, это разделить массив пользовательских параметров, а затем проанализировать его. Я пробовал logstash dissect, grok и mutate и изо всех сил пытался получить эти данные.

2020-12-09_18:06:30.58027 executing local task [refId:3122, lockTimeout:330000, lockTtl:300000, jobType:jobTemplateExecute, lockId:job.execute.3122, jobTemplateId:3122, jobDate:1607537190133, userId:1897, customConfig:{"AnsibleRequestedUser":"testing1","AnsibleRequestedUserPassword":"VMware321!"}, jobTemplateExecutionId:5677, customInputs:[customOptions:[AnsibleRequestedUser:testing1, AnsibleRequestedUserPassword:VMware321!]], processConfig:[accountId:947, status:executing, username:user1, userId:1897, userDisplayName:user1 user1, refType:jobTemplate, refId:3122, timerCategory:TEST: 0.  Enterprise Create User, timerSubCategory:3122, description: Enterprise Create User], processMap:[success:true, refType:jobTemplate, refId:3122, subType:null, subId:null, process: : 25172, timerCategory:TEST: 0. OpenManage Enterprise Create User, timerSubCategory:3122, zoneId:null, processId:25172], taskConfig:[:],:@45eb737f]



2020-12-09_15:33:43.21913 executing local task [refId:3117, lockTimeout:330000, lockTtl:300000, jobType:jobTemplateExecute, lockId:job.execute.3117, jobTemplateId:3117, jobDate:1607528023018, userId:320, customConfig:null, jobTemplateExecutionId:5667, customInputs:[customOptions:[AnsibleIdentPoolDesc:asdf123, AnsibleIdentPoolCount:50, TrackingUseCase:Customer Demo/Training, AnsiblePoolName:asdf123]], processConfig:[accountId:2, status:executing, username:[email protected], userId:320, userDisplayName:user, refType:jobTemplate, refId:3117, timerCategory:TEST: 2.  Enterprise - Create Identity Pool, timerSubCategory:3117, description:TEST: 2. Enterprise - Create Identity Pool], processMap:[success:true, refType:jobTemplate, refId:3117, subType:null, subId:null, process: : 25147, timerCategory:TEST: 2. Enterprise - Create Identity Pool, timerSubCategory:3117, zoneId:null, processId:25147], taskConfig:[:], :@21ff5f47]



2020-12-09_15:30:53.83030 executing local task [refId:3112, lockTimeout:330000, lockTtl:300000, jobType:jobTemplateExecute, lockId:job.execute.3112, jobTemplateId:3112, jobDate:1607527853230, userId:320, customConfig:null, jobTemplateExecutionId:5662, customInputs:[customOptions:[ReferenceServer:10629, ReferenceServerTemplateName:asdfasdf, TrackingUseCase:Internal Testing/Training, ReferenceServerTemplateDescription:asdfasdf]], processConfig:[accountId:2, status:executing, username:[email protected], userId:320, userDisplayName:user, refType:jobTemplate, refId:3112, timerCategory:TEST: 1. Enterprise - Create Template From Reference Device, timerSubCategory:3112, description:TEST: 1. Enterprise - Create Template From Reference Device], processMap:[success:true, refType:jobTemplate, refId:3112, subType:null, subId:null, process: : 25142, timerCategory:TEST: 1. Enterprise - Create Template From Reference Device, timerSubCategory:3112, zoneId:null, processId:25142], taskConfig:[:],:@29ac1e41]

Данные нужно взять следующие из сообщений выше.

Сообщение 1:

[customOptions:[AnsibleRequestedUser:testing1, AnsibleRequestedUserPassword:VMware321!]] Я бы хотел, чтобы они были в новое поле. имя пользователя: user1 должно быть указано в поле. timerCategory:TEST: 0. Enterprise Create User должен иметь это в поле.

Остальные данные могут оставаться в исходном поле сообщения.

Сообщение 2:

[customOptions:[AnsibleIdentPoolDesc:asdf123, AnsibleIdentPoolCount: 50, TrackingUseCase: демонстрация/обучение клиентов, AnsiblePoolName:asdf123]] — мне нужно разделить их на разные поля. имя пользователя:[email protected] должно быть полем. timerCategory: ТЕСТ: 2. Предприятие — Создать пул удостоверений, — мне нужно поле.

Сообщение 3:

[customOptions:[ReferenceServer:10629, ReferenceServerTemplateName:asdfasdf, TrackingUseCase:Internal Тестирование/обучение, ReferenceServerTemplateDescription:asdfasdf]], - I нужно разделить их на отдельные поля. имя пользователя: [email protected]

  • должно быть поле. timerCategory:TEST: 1. Предприятие — Создать шаблон из эталонного устройства — должно быть полем.

Имейте в виду, что категория таймера будет постоянно меняться в зависимости от того, что выдает журнал, но должна оставаться в том же формате, что и выше. Пользовательские параметры будут постоянно меняться — это означает, что в зависимости от запуска автоматизации будут определяться дополнительные пользовательские параметры, но опять же формат, указанный выше, должен оставаться прежним. Имя пользователя может быть адресом электронной почты или общим.

Вот некоторые из фильтров тайника журнала, которые я пробовал с некоторым успехом, но не для обработки изменяющегося характера сообщения журнала.

# Testing a new method to get information from the logs. 
#if "executing local task" in [message] and "beats" in [tags]{
#   dissect {
#       mapping => {
#           "message" => "%{date} %{?skip1} %{?skip2} %{?skip3} %{?refid} %{?lockTimeout} %{?lockTtl} %{?jobtemplate} %{?jobType} %{?jobTemplateId} %{?jobDate} %{?userId} %{?jobTemplateExecutionId} %{?jobTemplateExecutionId1} customInputs:[customOptions:[%{?RequestedPassword}:%{?RequestedPassword} %{?TrackingUseCase1}:%{TrackingUseCase}, %{?RequestedUser}, %{?processConfig}, %{?status}, username:%{username}, %{?userId}, %{?userDisplayName}, %{?refType}, %{?refID}, %{?timerCategory}:%{TaskName}, %{?timeCat}, %{?description}, %{?extra}"
#       }
#   }
#}
# Testing Grok Filters instead.  
if "executing local task" in [messages] and "beats" in [tags]{
    grok {
        match => { "message" => "%{YEAR:year}-%{MONTHNUM2:month}-%{MONTHDAY:day}_%{TIME:time}%{SPACE}%{CISCO_REASON}%{SYSLOG5424PRINTASCII}%{SPACE}%{NOTSPACE}%{SPACE}%{NOTSPACE}%{SPACE}%{PROG}%{SPACE}%{PROG}%{SPACE}%{PROG}%{SPACE}%{PROG}%{SPACE}%{PROG}%{SPACE}%{PROG}%{SPACE}%{PROG}%{SPACE}%{SYSLOGPROG}%{SYSLOG5424SD:testing3}%{NOTSPACE}%{SPACE}%{PROG}%{SYSLOG5424SD:testing2}%{NOTSPACE}%{SPACE}%{PROG}%{SYSLOG5424SD:testing}%{GREEDYDATA}}"
        }
    }   
}

Я думаю, что мне нужно использовать grok, но я не знаком с тем, как разделять/добавлять поля для удовлетворения вышеуказанных потребностей.

Любая помощь будет принята с благодарностью.

0
0
344
2
Перейти к ответу Данный вопрос помечен как решенный

Ответы 2

Ответ принят как подходящий

Я не рекомендую пытаться делать все в одном фильтре, особенно в одном шаблоне grok. Я бы начал с использования dissect для удаления метки времени. Я сохраняю его в поле [@metadata], чтобы он был доступен в конвейере logstash, но не обрабатывался выводом.

    dissect { mapping => { "message" => "%{[@metadata][timestamp]} %{} [%{[@metadata][restOfline]}" } }
    date { match => [ "[@metadata][timestamp]", "YYYY-MM-dd_HH:mm:ss.SSSSS" ] }

Затем я бы разбил restOfLine, используя шаблоны grok. Если вам нужны только поля из processConfig, то это единственный шаблон grok, который вам нужен. Я привожу остальные в качестве примера того, как извлечь несколько шаблонов из одного сообщения.

    grok {
        break_on_match => false
        match => {
            "[@metadata][restOfline]" => [
                "customOptions:\[(?<[@metadata][customOptions]>[^\]]+)",
                "processConfig:\[(?<[@metadata][processConfig]>[^\]]+)",
                "processMap:\[(?<[@metadata][processMap]>[^\]]+)"
            ]
        }
    }

Теперь мы можем разобрать [@metadata][processConfig], который представляет собой строку ключ/значение. Снова сохраняем проанализированные значения в [@metadata] и просто копируем нужные.

    kv {
        source => "[@metadata][processConfig]"
        target => "[@metadata][processConfigValues]"
        field_split_pattern => ", "
        value_split => ":"
        add_field => {
            "username" => "%{[@metadata][processConfigValues][username]}"
            "timeCategory" => "%{[@metadata][processConfigValues][timerCategory]}"
         }
    }

Это приводит к событиям с такими полями, как

    "username" => "[email protected]",
"timeCategory" => "TEST: 2.  Enterprise - Create Identity Pool"

использование предоставленного вами набора данных не дало никаких результатов. это почти так, как будто он прошел фильтр все вместе. если "выполнение локальной задачи" в [сообщениях] и "beats_morpheus" в [тегах] - это начало раздела фильтра, который вы указали выше. Я сломал их, как вы их.

Jared 10.12.2020 22:12

служба logstash запускается, поэтому синтаксис кажется правильным, и она продолжает работать. я отправил ему 3 новых журнала и не получил никаких новых полей.

Jared 10.12.2020 22:14

Я думаю, что нашел свою ошибку - если «выполнение локальной задачи» в [сообщениях] должно быть сообщение. Снова тестирование.

Jared 10.12.2020 22:19

Итак, это устранило проблему - могу ли я сделать то же самое с customOptions - просто настроить другой KV и добавить туда известные поля, которые я хочу, например TrackingUseCase, если я хочу выйти из этого раздела KV?

Jared 10.12.2020 22:23

Да просто используйте другой кв.

Badger 10.12.2020 22:27

ОК, когда я это делаю, я не получаю KV в пользовательских параметрах. Следуя вашему примеру с именем пользователя и категорией. ``` kv { source => "[@metadata][customOptions]" target => "[@metadata][customOptionsValues]" field_split_pattern => ", " value_split => ":" add_field => { "TrackingUseCase" => "%{[@metadata][customOptionsValues][TrackingUseCase]}" } }```

Jared 10.12.2020 22:33

Если он не найдет значение ключа, он не потерпит неудачу, верно, он просто добавит ноль?

Jared 10.12.2020 22:38

trackingUseCase %{[@metadata][customOptionsValues][TrackingUseCase] ​​--> хорошо, похоже, работает, моя собственная глупость допустила ошибку в синтаксисе. - но похоже, что файл будет заполнен, если данные не будут найдены, он просто вводит строку отслеживания. придется найти способ отфильтровать это позже. Спасибо за помощь, это заставляет меня хотя бы начать.

Jared 10.12.2020 22:43

Это еще один ответ, посвященный гроку (но я согласен, что его немного сложно поддерживать во времени, а также просто понять в настоящем).

  1. Извлеките с правильным (и немного длинным) выражением grok поле customOptions
  2. Работайте только с этим конкретным полем с другим фильтром (ключевое значение) и поместите, например, в поле customOptionsSplitter (чтобы не сломать существующее поле).

Этот код является реализацией этого:

filter{

    grok {
        match => { "message" => "%{DATE:date}_%{TIME:time} %{CISCO_REASON} \[refId\:%{INT:refId}, lockTimeout:%{INT:lockTimeout}, lockTtl:%{INT:lockTtl}, jobType:%{NOTSPACE:jobType}, lockId:%{NOTSPACE:lockId}, jobTemplateId:%{INT:jobTemplateId}, jobDate:%{INT:jobDate}, userId:%{INT:userId}, customConfig:(\{%{GREEDYDATA:customConfig}\}|null), jobTemplateExecutionId:%{INT:jobTemplateExecutionId}, customInputs:\[customOptions:\[%{GREEDYDATA:customOptions}\]\], processConfig:\[%{GREEDYDATA:processConfig}\], processMap:\[%{GREEDYDATA:processMap}\], taskConfig:\[%{GREEDYDATA:taskConfig}\], :%{NOTSPACE:serial}\]"
        }
    }

    kv {
        source => "customOptions"
        target => "customOptionsSplitter"
        field_split_pattern => ", "
        value_split => ":"
    }

}

Другие вопросы по теме