Индексирование полей типа даты spark как дат в elasticsearch

Я пытаюсь проиндексировать DataFrame приведенной ниже схемы в ElasticSearch, используя коннектор elasticsearch-hadoop.

 |-- ROW_ID: long (nullable = false)
 |-- SUBJECT_ID: long (nullable = false)
 |-- HADM_ID: long (nullable = true)
 |-- CHARTDATE: date (nullable = false)
 |-- CATEGORY: string (nullable = false)
 |-- DESCRIPTION: string (nullable = false)
 |-- CGID: integer (nullable = true)
 |-- ISERROR: integer (nullable = true)
 |-- TEXT: string (nullable = true)

При записи этого DataFrame в ElasticSearch поле «CHARTDATE» записывается как длинное. Согласно документации для коннектора, который я использую (показано ниже) поля DateType в Spark должны быть записаны как даты в формате строки в ElasticSearch. Поскольку я надеялся создать несколько визуализаций в Kibana, используя поля даты, их написание в виде длинных строк оказалось проблематичным.

https://www.elastic.co/guide/en/elasticsearch/hadoop/6.4/spark.html

Код, использованный для возникновения ошибки

val elasticOptions = Map(
      "es.nodes"              -> esIP,
      "es.port"               -> esPort,
      "es.mapping.id"         -> primaryKey,
      "es.index.auto.create"  -> "yes",
      "es.nodes.wan.only"     -> "true",
      "es.write.operation"    -> "upsert",
      "es.net.http.auth.user" -> esUser,
      "es.net.http.auth.pass" -> esPassword,
      "es.spark.dataframe.write.null" -> "true",
      "es.mapping.date.rich" -> "true"
    )
castedDF.saveToEs(index, elasticOptions)

Есть ли шаг, который мне не хватает, чтобы эти значения записывались как даты ES?

0
0
942
2
Перейти к ответу Данный вопрос помечен как решенный

Ответы 2

Ответ принят как подходящий

Давно не пользовался Spark с ElasticSearch; Но эта проблема с DateType меня очень раздражала.

Что я делал, чтобы эта работа работала, было: * Преобразуйте DateType в временную метку эпохи в Spark (не уверен, что здесь необходим шаг) * Укажите в Kibana или с помощью запроса curL PUT, когда я инициализирую схему индекса, что поле CHARTDATE будет иметь тип date, например:

PUT /spark
{
 "mappings": {
  "log": {
    "properties": {
      "CHARTDATE": {
        "type": "date"
      }
    }
  }
 }
} 

Я не знаю, изменил ли Elastic 6.4 что-нибудь, и если вы найдете лучшее решение, я буду признателен, если вы поделитесь с нами позже!

Я знаю, что на самом деле это не лучшее решение - нужно ПОЛУЧИТЬ индекс перед запуском действия saveToEs из Spark. Но это действительно было то, что помогло мне.

Использование запросов на размещение для заполнения индекса ES полями даты решило мою проблему. Спасибо за предложение!

mongolol 19.09.2018 07:15

Согласно документации: https://www.elastic.co/guide/en/elasticsearch/hadoop/current/mapping.html#mapping-date

Вы должны использовать формат https://en.wikipedia.org/wiki/ISO_8601.

Спасибо за ответ! Я пробовал использовать currentDF.withColumn("CHARTDATE", to_date(col("CHARTDATE"), "yyyy-mm-dd'T'hh:mm:ss.SSSZ")) и, к сожалению, получил ту же проблему. Я рассмотрю это преобразование вместе с методом раздачи @tricky ниже.

mongolol 10.09.2018 22:36

Кроме того, для отладки того, что отправляет Spark, вы можете использовать HTTP-прокси, например Charles, это очень помогает понять, как работает es4hadoop. Держать нас в курсе ! Благодарность

Thomas Decaux 12.09.2018 10:20

В итоге просто нужно было создать метод, который заполнял индекс. Отметки времени и даты обрабатывались правильно.

mongolol 19.09.2018 07:16

@mongolol не могли бы вы пояснить, что вы имеете в виду под seeded the index? У меня такая же проблема

eugene 03.01.2020 07:02

@eugene Я рекурсивно просматриваю схему, возвращаемую df.schema, для создания сопоставлений JSON, а затем отправляю это в ES перед записью данных. Обратитесь к elastic.co/guide/en/elasticsearch/reference/current/… для API сопоставлений ES и stackoverflow.com/questions/37471346/… (ответ плоской схемы) для рекурсивного просмотра схемы. В этом случае вы ищете поля DateType и TimestampType.

mongolol 07.01.2020 16:16

Другие вопросы по теме