Я использую ElasticSearch для индексации данных и хотел экспортировать несколько полей из индекса, создаваемого каждый день, в облачное хранилище Google. Как получить поля из массива объектов в индексе эластичного поиска и отправить их в виде CSV-файла в корзину GCS с помощью Logstash
Пробовал ниже conf для извлечения вложенных полей из индекса:
input {
elasticsearch {
hosts => "host:443"
user => "user"
ssl => true
connect_timeout_seconds => 600
request_timeout_seconds => 600
password => "pwd"
ca_file => "ca.crt"
index => "test"
query => '
{
"_source": ["obj1.Name","obj1.addr","obj1.obj2.location", "Hierarchy.categoryUrl"],
"query": {
"match_all": {}
}
}
'
}
}
filter {
mutate {
rename => {
"[obj1][Name]" => "col1"
"[obj1][addr]" => "col2"
"[obj1][obj2][location]" => "col3"
"[Hierarchy][0][categoryUrl]" => "col4"
}
}
}
output {
google_cloud_storage {
codec => csv {
include_headers => true
columns => [ "col1", "col2","col3"]
}
bucket => "bucket"
json_key_file => "creds.json"
temp_directory => "/tmp"
log_file_prefix => "log_gcs"
max_file_size_kbytes => 1024
date_pattern => "%Y-%m-%dT%H:00"
flush_interval_secs => 600
gzip => false
uploader_interval_secs => 600
include_uuid => true
include_hostname => true
}
}
Как получить поле, заполненное выше csv из массива объектов, в приведенном ниже примере нужно получить categoryUrl из первого объекта массива и заполнить таблицу csv и отправить ее в корзину GCS:
пробовали следующие подходы:
"_source": ["obj1.Name","obj1.addr","obj1.obj2.location", "Hierarchy.categoryUrl"]
и
"_source": ["obj1.Name","obj1.addr","obj1.obj2.location", "Hierarchy[0].categoryUrl"]
с
mutate {
rename => {
"[obj1][Name]" => "col1"
"[obj1][addr]" => "col2"
"[obj1][obj2][location]" => "col3"
"[Hierarchy][0][categoryUrl]" => "col4"
}
для входного образца:
"Hierarchy" : [
{
"level" : "1",
"category" : "test",
"categoryUrl" : "testurl1"
},
{
"level" : "2",
"category" : "test2",
"categoryUrl" : "testurl2"
}}
Прикрепляю образец документа, в котором я пытаюсь получить merchandisingHierarchy[0].categoryUrl и PriceInfo[0].basePrice :
{
"_index" : "amulya-test",
"_type" : "_doc",
"_id" : "ldZPJoYBFi8LOEDK_M2f",
"_score" : 1.0,
"_ignored" : [
"itemDetails.description.keyword"
],
"_source" : {
"itemDetails" : {
"compSku" : "202726",
"compName" : "abc.com",
"compWebsite" : "abc.com",
"title" : "Monteray 38.25 in. x 73.375 in. Frameless Hinged Corner Shower Enclosure in Brushed Nickel",
"description" : "Create the modthroom of your dreams with the clean lines of the VIGO Monteray Frameless Shower Enclosure. Solid 3/8 in. tempered glass combined with stainless steel and solid brass construction makes this enclosure strong and long-lasting. The sleek, reversible, outward-opening door features a convenient towel bar. This versatile enclosure can be installed on a tile floor or with a VIGO Shower Base. With a single water deflector along the bottom seal strip, water is redirected back into the shower to keep your bathroom dry, clean, and pristine.",
"modelNumber" : "VG6011BNCL40",
"upc" : "8137756684",
"hasVariations" : false,
"productDetailsBulletPoints" : [ ],
"itemUrls" : {
"productPageUrl" : "https://.abc.com/p/VIGO-Monteray-38-in-x-73-375-in-Frameless-Hinged-Corner-Shower-Enclosure-in-Brushed-Nickel-VG6011BNCL40/202722616",
"primaryImageUrl" : "https://images.thdstatic.com/productImages/d77d9e8b-1ea1-4811-a470-8364c8e47402/svn/vigo-shower-enclosures-vg6011bncl40-64_600.jpg",
"secondaryImageUrls" : [
"https://images.thdstatic.com/productImages/d77d9e8b-1e1-4811-a470-8364c8e47402/svn/vigo-shower-enclosures-vg6011bncl40-64_1000.jpg",
"https://images.thdstatic.com/productImages/db539ff9-6df-48c2-897a-18dd1e1794e3/svn/vigo-shower-enclosures-vg6011bncl40-e1_1000.jpg",
"https://images.thdstatic.com/productImages/47c5090b-49a-46bc-a36d-921ddae5e1ab/svn/vigo-shower-enclosures-vg6011bncl40-40_1000.jpg",
"https://images.thdstatic.com/productImages/add6691c-a02-466d-9a1a-47200b05685e/svn/vigo-shower-enclosures-vg6011bncl40-a0_1000.jpg",
"https://images.thdstatic.com/productImages/d638230e-0d9-40c9-be93-7f7bf24f0732/svn/vigo-shower-enclosures-vg6011bncl40-1d_1000.jpg"
]
}
},
"merchandisingHierarchy" : [
{
"level" : "1",
"category" : "Home",
"categoryUrl" : "host/"
},
{
"level" : "2",
"category" : "Bath",
"categoryUrl" : "host/b/Bath/N-5yc1vZbzb3"
},
{
"level" : "3",
"category" : "Showers",
"categoryUrl" : "host/b/Bath-Showers/N-5yc1vZbzcd"
},
{
"level" : "4",
"category" : "Shower Doors",
"categoryUrl" : "host/b/Bath-Showers-Shower-Doors/N-5yc1vZbzcg"
},
{
"level" : "5",
"category" : "Shower Enclosures",
"categoryUrl" : "host/b/Bath-Showers-Shower-Doors-Shower-Enclosures/N-5yc1vZcbn2"
}
],
"reviewsAndRatings" : {
"pdtReviewCount" : 105
},
"additionalAttributes" : {
"isAddon" : false
},
"productSpecifications" : {
"Warranties" : { },
"Details" : { },
"Dimensions" : { }
},
"promoDetails" : [
{
"promoName" : "Save $150.00 (15%)",
"promoPrice" : 849.9
}
],
"locationDetails" : { },
"storePickupDetails" : {
"deliveryText" : "Get it by Mon, Feb 20",
"toEddDate" : "Mon, Feb 20",
"isBackordered" : false,
"selectedEddZipcode" : "20147",
"shipToStoreEnabled" : true,
"homeDeliveryEnabled" : true,
"scheduledDeliveryEnabled" : false
},
"recommendedProducts" : [ ],
"pricingInfo" : [
{
"type" : "SAS",
"offerPrice" : 849.9,
"sellerName" : "abc.com",
"onClearance" : false,
"inStock" : true,
"isBuyBoxWinner" : true,
"promo" : [
{
"onPromo" : true,
"promoName" : "Save $150.00 (15%)",
"promoPrice" : 849.9
}
],
"basePrice" : 999.9,
"priceVariants" : [
{
"basePrice" : 999.9,
"offerPrice" : 849.9
}
],
"inventoryDetails" : {
"stockInStore" : false,
"stockOnline" : true
}
}
]
}
}
Вы можете сделать это следующим образом:
input {
elasticsearch {
...
query => '
{
"_source": ["merchandisingHierarchy.categoryUrl"],
"query": {
"match_all": {}
}
}
'
}
}
filter {
mutate {
add_field => {
"col1" => "%{[merchandisingHierarchy][0][categoryUrl]}"
"col2" => "%{[pricingInfo][0][basePrice]}"
}
}
}
output {
stdout {
codec => csv {
include_headers => true
columns => [ "col1"]
}
}
}
Я протестировал ваш образец документа и получил приведенный ниже результат, который, похоже, работает в соответствии с вашими ожиданиями:
col1,col2
host/,999.9
В _source
нужно указать только Hierarchy
, а не Hierarchy[0]
Привет @Val, пробовал оба способа, все равно не работает, для col4 данные не заполняются в csv
Какая у вас версия? Я только что протестировал его на Logstash 7.16, и он работает.
Можете ли вы показать образец документа?
Привет @Val, пожалуйста, найдите прикрепленный образец документа выше
Спасибо, я перепроверил и обновил свой ответ
Большое спасибо @Val, это сработало !!, потому что вывод basePrice печатает в этом формате 0,9999e3, должно быть 999,9, в чем может быть проблема
Я обновил свой ответ, чтобы сделать это немного по-другому
Привет @Val, большое спасибо, это сработало !!, для чего используется %{} в приведенном выше запросе, когда использовать add_field и когда использовать переименование в фильтре logstash
Круто, рад, что помогло! %{...}
упорядочивает значение. add_field
создает новое поле из других полей и rename
просто переименовывает существующее
Привет @Val, спасибо за разъяснение, если базовой цены не существует, то мы получаем %{[pricingInfo][0][basePrice]} на выходе. мы можем использовать так: filter { mutate { rename => { "[pricingInfo][0][basePrice]" => "list_price" } convert => { "list_price" => "float" }
Давайте, может быть, создадим новую тему, так как это уже другая проблема, если вы не против.
Привет @Val, проблема решена после использования вышеуказанного запроса с convert => { "list_price" => "float" }. не могли бы вы предложить, есть ли у вас какие-либо материалы по этому запросу: stackoverflow.com/questions/75312170/…
Привет @Val Большое спасибо за ваш ответ, попробовал запрос выше, он не сработал, обновил вопрос с запросами, которые пробовали