Как получить поле из массива объектов в индексе Elasticsearch в виде файла CSV в облачное хранилище Google с помощью Logstash

Я использую ElasticSearch для индексации данных и хотел экспортировать несколько полей из индекса, создаваемого каждый день, в облачное хранилище Google. Как получить поля из массива объектов в индексе эластичного поиска и отправить их в виде CSV-файла в корзину GCS с помощью Logstash

Пробовал ниже conf для извлечения вложенных полей из индекса:

input {

 elasticsearch {

    hosts => "host:443"

    user => "user"

    ssl => true

    connect_timeout_seconds => 600

    request_timeout_seconds => 600

    password => "pwd"

    ca_file => "ca.crt"

    index => "test"

    query => '

    {
    "_source": ["obj1.Name","obj1.addr","obj1.obj2.location", "Hierarchy.categoryUrl"],

    "query": {

    "match_all": {}

    }

    }

  '

  }

}

filter {
mutate {
    rename => {
        "[obj1][Name]" => "col1"
        "[obj1][addr]" => "col2"
        "[obj1][obj2][location]" => "col3"
        "[Hierarchy][0][categoryUrl]" => "col4"
    }
  }
 }


output {
   google_cloud_storage {
   codec => csv {
    include_headers => true
    columns => [ "col1", "col2","col3"]
   }
     bucket => "bucket"
     json_key_file => "creds.json"
     temp_directory => "/tmp"
     log_file_prefix => "log_gcs"
     max_file_size_kbytes => 1024
     date_pattern => "%Y-%m-%dT%H:00"
     flush_interval_secs => 600
     gzip => false
     uploader_interval_secs => 600
     include_uuid => true
     include_hostname => true
   }
}

Как получить поле, заполненное выше csv из массива объектов, в приведенном ниже примере нужно получить categoryUrl из первого объекта массива и заполнить таблицу csv и отправить ее в корзину GCS:

пробовали следующие подходы:

"_source": ["obj1.Name","obj1.addr","obj1.obj2.location", "Hierarchy.categoryUrl"]

и

  "_source": ["obj1.Name","obj1.addr","obj1.obj2.location", "Hierarchy[0].categoryUrl"]

с

mutate {
    rename => {
        "[obj1][Name]" => "col1"
        "[obj1][addr]" => "col2"
        "[obj1][obj2][location]" => "col3"
        "[Hierarchy][0][categoryUrl]" => "col4"
}
    

для входного образца:

"Hierarchy" : [
            {
              "level" : "1",
              "category" : "test",
              "categoryUrl" : "testurl1"
            },
            {
              "level" : "2",
              "category" : "test2",
              "categoryUrl" : "testurl2"
            }}

Прикрепляю образец документа, в котором я пытаюсь получить merchandisingHierarchy[0].categoryUrl и PriceInfo[0].basePrice :

{
        "_index" : "amulya-test",
        "_type" : "_doc",
        "_id" : "ldZPJoYBFi8LOEDK_M2f",
        "_score" : 1.0,
        "_ignored" : [
          "itemDetails.description.keyword"
        ],
        "_source" : {
          "itemDetails" : {
            "compSku" : "202726",
            "compName" : "abc.com",
            "compWebsite" : "abc.com",
            "title" : "Monteray 38.25 in. x 73.375 in. Frameless Hinged Corner Shower Enclosure in Brushed Nickel",
            "description" : "Create the modthroom of your dreams with the clean lines of the VIGO Monteray Frameless Shower Enclosure. Solid 3/8 in. tempered glass combined with stainless steel and solid brass construction makes this enclosure strong and long-lasting. The sleek, reversible, outward-opening door features a convenient towel bar. This versatile enclosure can be installed on a tile floor or with a VIGO Shower Base. With a single water deflector along the bottom seal strip, water is redirected back into the shower to keep your bathroom dry, clean, and pristine.",
            "modelNumber" : "VG6011BNCL40",
            "upc" : "8137756684",
            "hasVariations" : false,
            "productDetailsBulletPoints" : [ ],
            "itemUrls" : {
              "productPageUrl" : "https://.abc.com/p/VIGO-Monteray-38-in-x-73-375-in-Frameless-Hinged-Corner-Shower-Enclosure-in-Brushed-Nickel-VG6011BNCL40/202722616",
              "primaryImageUrl" : "https://images.thdstatic.com/productImages/d77d9e8b-1ea1-4811-a470-8364c8e47402/svn/vigo-shower-enclosures-vg6011bncl40-64_600.jpg",
              "secondaryImageUrls" : [
                "https://images.thdstatic.com/productImages/d77d9e8b-1e1-4811-a470-8364c8e47402/svn/vigo-shower-enclosures-vg6011bncl40-64_1000.jpg",
                "https://images.thdstatic.com/productImages/db539ff9-6df-48c2-897a-18dd1e1794e3/svn/vigo-shower-enclosures-vg6011bncl40-e1_1000.jpg",
                "https://images.thdstatic.com/productImages/47c5090b-49a-46bc-a36d-921ddae5e1ab/svn/vigo-shower-enclosures-vg6011bncl40-40_1000.jpg",
                "https://images.thdstatic.com/productImages/add6691c-a02-466d-9a1a-47200b05685e/svn/vigo-shower-enclosures-vg6011bncl40-a0_1000.jpg",
                "https://images.thdstatic.com/productImages/d638230e-0d9-40c9-be93-7f7bf24f0732/svn/vigo-shower-enclosures-vg6011bncl40-1d_1000.jpg"
              ]
            }
          },
          "merchandisingHierarchy" : [
            {
              "level" : "1",
              "category" : "Home",
              "categoryUrl" : "host/"
            },
            {
              "level" : "2",
              "category" : "Bath",
              "categoryUrl" : "host/b/Bath/N-5yc1vZbzb3"
            },
            {
              "level" : "3",
              "category" : "Showers",
              "categoryUrl" : "host/b/Bath-Showers/N-5yc1vZbzcd"
            },
            {
              "level" : "4",
              "category" : "Shower Doors",
              "categoryUrl" : "host/b/Bath-Showers-Shower-Doors/N-5yc1vZbzcg"
            },
            {
              "level" : "5",
              "category" : "Shower Enclosures",
              "categoryUrl" : "host/b/Bath-Showers-Shower-Doors-Shower-Enclosures/N-5yc1vZcbn2"
            }
          ],
          "reviewsAndRatings" : {
            "pdtReviewCount" : 105
          },
          "additionalAttributes" : {
            "isAddon" : false
          },
          "productSpecifications" : {
            "Warranties" : { },
            "Details" : { },
            "Dimensions" : { }
          },
          "promoDetails" : [
            {
              "promoName" : "Save $150.00 (15%)",
              "promoPrice" : 849.9
            }
          ],
          "locationDetails" : { },
          "storePickupDetails" : {
            "deliveryText" : "Get it by Mon, Feb 20",
            "toEddDate" : "Mon, Feb 20",
            "isBackordered" : false,
            "selectedEddZipcode" : "20147",
            "shipToStoreEnabled" : true,
            "homeDeliveryEnabled" : true,
            "scheduledDeliveryEnabled" : false
          },
          "recommendedProducts" : [ ],
          "pricingInfo" : [
            {
              "type" : "SAS",
              "offerPrice" : 849.9,
              "sellerName" : "abc.com",
              "onClearance" : false,
              "inStock" : true,
              "isBuyBoxWinner" : true,
              "promo" : [
                {
                  "onPromo" : true,
                  "promoName" : "Save $150.00 (15%)",
                  "promoPrice" : 849.9
                }
              ],
              "basePrice" : 999.9,
              "priceVariants" : [
                {
                  "basePrice" : 999.9,
                  "offerPrice" : 849.9
                }
              ],
              "inventoryDetails" : {
                "stockInStore" : false,
                "stockOnline" : true
              }
            }
          ]
        }
      }

0
0
67
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Вы можете сделать это следующим образом:

input {
 elasticsearch {
    ...
    query => '
    {
    "_source": ["merchandisingHierarchy.categoryUrl"],
     "query": {
      "match_all": {}
     }
    }
  '
 }
}
filter {
    mutate {
        add_field => {
            "col1" => "%{[merchandisingHierarchy][0][categoryUrl]}"
            "col2" => "%{[pricingInfo][0][basePrice]}"
        }
    }
}

output {
    stdout {
       codec => csv {
        include_headers => true
        columns => [ "col1"]
       }
    }
}

Я протестировал ваш образец документа и получил приведенный ниже результат, который, похоже, работает в соответствии с вашими ожиданиями:

col1,col2
host/,999.9

Привет @Val Большое спасибо за ваш ответ, попробовал запрос выше, он не сработал, обновил вопрос с запросами, которые пробовали

Amulya M 06.02.2023 16:02

В _source нужно указать только Hierarchy, а не Hierarchy[0]

Val 06.02.2023 16:15

Привет @Val, пробовал оба способа, все равно не работает, для col4 данные не заполняются в csv

Amulya M 06.02.2023 16:29

Какая у вас версия? Я только что протестировал его на Logstash 7.16, и он работает.

Val 06.02.2023 16:54

Можете ли вы показать образец документа?

Val 06.02.2023 17:13

Привет @Val, пожалуйста, найдите прикрепленный образец документа выше

Amulya M 06.02.2023 17:47

Спасибо, я перепроверил и обновил свой ответ

Val 06.02.2023 18:03

Большое спасибо @Val, это сработало !!, потому что вывод basePrice печатает в этом формате 0,9999e3, должно быть 999,9, в чем может быть проблема

Amulya M 07.02.2023 06:44

Я обновил свой ответ, чтобы сделать это немного по-другому

Val 07.02.2023 07:29

Привет @Val, большое спасибо, это сработало !!, для чего используется %{} в приведенном выше запросе, когда использовать add_field и когда использовать переименование в фильтре logstash

Amulya M 07.02.2023 07:45

Круто, рад, что помогло! %{...}упорядочивает значение. add_field создает новое поле из других полей и rename просто переименовывает существующее

Val 07.02.2023 07:53

Привет @Val, спасибо за разъяснение, если базовой цены не существует, то мы получаем %{[pricingInfo][0][basePrice]} на выходе. мы можем использовать так: filter { mutate { rename => { "[pricingInfo][0][basePrice]" => "list_price" } convert => { "list_price" => "float" }

Amulya M 07.02.2023 08:03

Давайте, может быть, создадим новую тему, так как это уже другая проблема, если вы не против.

Val 07.02.2023 08:41

Привет @Val, проблема решена после использования вышеуказанного запроса с convert => { "list_price" => "float" }. не могли бы вы предложить, есть ли у вас какие-либо материалы по этому запросу: stackoverflow.com/questions/75312170/…

Amulya M 07.02.2023 11:33

Другие вопросы по теме

Эмуляторы angular firebase для хранилища и firestore, не используемые приложением
Com.google.auth.oauth2.GoogleAuthException: ошибка при получении токена доступа для сервисного аккаунта: поток закрыт. при загрузке изображения в облачное хранилище
Чтение аудио/mp3-файла из корзины GCP заблокировано политикой CORS: отсутствует заголовок «Access-Control-Allow-Origin»
GCS: Можем ли мы иметь разные объекты Storage Class внутри ведра?
Агент не отображается в пользовательском интерфейсе (служба передачи хранилища GCP)
Как загрузить URL-адрес изображения base64 в Firebase и получить его токен доступа?
Как указать определенный файл в облачном хранилище Google для запуска облачной функции Google
Возврат файла из Google Cloud Storage через REST API без предварительной загрузки локально
Python — вернуть Google Cloud Storage — вернуть файл
Как написать облачную функцию для прослушивания и реагирования на такие события, как создание, изменение или удаление файла?