Elasticsearch объединяет документы в ответ

У меня есть данные в 3 индексах. Я хочу создать отчет по счету, используя информацию из других индексов. Например, ниже приведены образцы документов для каждого индекса.

Индекс пользователей

{
    "_id": "userId1",
    "name": "John"
}

Индекс счета-фактуры

{
    "_id": "invoiceId1",
    "userId": "userId1",
    "cost": "10000",
    "startdate": "",
    "enddate": ""
}

Индекс заказов

{
    "_id": "orderId1",
    "userId": "userId1",
    "productName": "Mobile"
}

Я хочу создать отчет по счету, объединив информацию из этих трех индексов следующим образом

{
    "_id": "invoiceId1",
    "userName": "John",
    "productName": "Mobile",
    "cost": "10000",
    "startdate": "",
    "enddate": ""
}

Как написать запрос Elasticsearch, который возвращает ответ путем объединения информации из других индексных документов?

4
0
37
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Вы не могу выполняете соединения во время запроса в Elasticsearch, и вам нужно будет денормализовать ваши данные, чтобы эффективно извлекать и группировать их.


Сказав это, вы могли:

  1. использовать многоцелевой синтаксис и запрашивать сразу несколько индексов
  2. используйте запрос OR для id и userId - поскольку любой из них упоминается Хотя бы один раз в любой ваших документов
  3. а затем тривиально объедините свои данные с помощью инструмента сопоставления / сокращения под названием сценарии агрегирования показателей

Небольшое примечание: вы не сможете использовать ключевое слово _id в своих документах, потому что оно зарезервировано.

Предполагая, что ваши документы и индексы структурированы следующим образом:

POST users_index/_doc
{"id":"userId1","name":"John"}

POST invoices_index/_doc
{"id":"invoiceId1","userId":"userId1","cost":"10000","startdate":"","enddate":""}

POST orders_index/_doc
{"id":"orderId1","userId":"userId1","productName":"Mobile"}

Вот как может выглядеть зашифрованное агрегирование показателей:

POST users_index,invoices_index,orders_index/_search
{
  "size": 0, 
  "query": {
    "bool": {
      "should": [
        {
          "term": {
            "id.keyword": {
              "value": "userId1"
            }
          }
        },
        {
          "term": {
            "userId.keyword": {
              "value": "userId1"
            }
          }
        }
      ]
    }
  },
  "aggs": {
    "group_by_invoiceId": {
      "scripted_metric": {
        "init_script": "state.users = []; state.invoices = []; state.orders = []",
        "map_script": """
          def source = params._source;
          
          if (source.containsKey("name")) {
            // we're dealing with the users index
            state.users.add(source);
          } else if (source.containsKey("cost")) {
            // we're dealing with the invoices index
            state.invoices.add(source);
          } else if (source.containsKey("productName")) {
            // we're dealing with the orders index
            state.orders.add(source);
          }
        """,
        "combine_script": """
          def non_empty_state = [:];
          for (entry in state.entrySet()) {
            if (entry != null && entry.getValue().length > 0) {
              non_empty_state[entry.getKey()] = entry.getValue();
            }
          }
          return non_empty_state;
        """,
        "reduce_script": """
          def final_invoices = [];
          
          def all_users = [];
          def all_invoices = [];
          def all_orders = [];
          
          // flatten all resources
          for (state in states) {
            for (kind_entry in state.entrySet()) {
              def map_kind = kind_entry.getKey();
              if (map_kind == "users") {
                all_users.addAll(kind_entry.getValue());
              } else if (map_kind == "invoices") {
                all_invoices.addAll(kind_entry.getValue());
              } else if (map_kind == "orders") {
                all_orders.addAll(kind_entry.getValue());
              } 
            }
          }
          
          // iterate the invoices and enrich them
          for (invoice_entry in all_invoices) {
            def invoiceId = invoice_entry.id;
            def userId = invoice_entry.userId;
            def userName = all_users.stream().filter(u -> u.id == userId).findFirst().get().name;
            def productName = all_orders.stream().filter(o -> o.userId == userId).findFirst().get().productName;
            def cost = invoice_entry.cost;
            def startdate = invoice_entry.startdate;
            def enddate = invoice_entry.enddate;
            
            final_invoices.add([
              'id': invoiceId,
              'userName': userName,
              'productName': productName,
              'cost': cost,
              'startdate': startdate,
              'enddate': enddate
            ]);
          }
          
          return final_invoices;
        """
      }
    }
  }
}

который вернется

{
  ...
  "aggregations" : {
    "group_by_invoiceId" : {
      "value" : [
        {
          "cost" : "10000",
          "enddate" : "",
          "id" : "invoiceId1",
          "userName" : "John",
          "startdate" : "",
          "productName" : "Mobile"
        }
      ]
    }
  }
}

Подводя итог, можно сказать, что существуют обходные пути для достижения соединений во время запроса. В то же время подобные скрипты не следует использовать в продакшене, потому что они могут длиться вечно.

Вместо этого эту агрегацию следует эмулировать вне Elasticsearch после того, как запрос будет разрешен и вернет специфичные для индекса совпадения.

Кстати, я установил size: 0, чтобы он возвращал просто результатов агрегации, поэтому увеличьте этот параметр, если вы хотите получить несколько реальных совпадений.

Другие вопросы по теме