Я пытаюсь проиндексировать DataFrame
приведенной ниже схемы в ElasticSearch, используя коннектор elasticsearch-hadoop.
|-- ROW_ID: long (nullable = false)
|-- SUBJECT_ID: long (nullable = false)
|-- HADM_ID: long (nullable = true)
|-- CHARTDATE: date (nullable = false)
|-- CATEGORY: string (nullable = false)
|-- DESCRIPTION: string (nullable = false)
|-- CGID: integer (nullable = true)
|-- ISERROR: integer (nullable = true)
|-- TEXT: string (nullable = true)
При записи этого DataFrame в ElasticSearch поле «CHARTDATE» записывается как длинное. Согласно документации для коннектора, который я использую (показано ниже) поля DateType
в Spark должны быть записаны как даты в формате строки в ElasticSearch. Поскольку я надеялся создать несколько визуализаций в Kibana, используя поля даты, их написание в виде длинных строк оказалось проблематичным.
https://www.elastic.co/guide/en/elasticsearch/hadoop/6.4/spark.html
Код, использованный для возникновения ошибки
val elasticOptions = Map(
"es.nodes" -> esIP,
"es.port" -> esPort,
"es.mapping.id" -> primaryKey,
"es.index.auto.create" -> "yes",
"es.nodes.wan.only" -> "true",
"es.write.operation" -> "upsert",
"es.net.http.auth.user" -> esUser,
"es.net.http.auth.pass" -> esPassword,
"es.spark.dataframe.write.null" -> "true",
"es.mapping.date.rich" -> "true"
)
castedDF.saveToEs(index, elasticOptions)
Есть ли шаг, который мне не хватает, чтобы эти значения записывались как даты ES?
Давно не пользовался Spark с ElasticSearch; Но эта проблема с DateType меня очень раздражала.
Что я делал, чтобы эта работа работала, было: * Преобразуйте DateType в временную метку эпохи в Spark (не уверен, что здесь необходим шаг) * Укажите в Kibana или с помощью запроса curL PUT, когда я инициализирую схему индекса, что поле CHARTDATE будет иметь тип date, например:
PUT /spark
{
"mappings": {
"log": {
"properties": {
"CHARTDATE": {
"type": "date"
}
}
}
}
}
Я не знаю, изменил ли Elastic 6.4 что-нибудь, и если вы найдете лучшее решение, я буду признателен, если вы поделитесь с нами позже!
Я знаю, что на самом деле это не лучшее решение - нужно ПОЛУЧИТЬ индекс перед запуском действия saveToEs из Spark. Но это действительно было то, что помогло мне.
Согласно документации: https://www.elastic.co/guide/en/elasticsearch/hadoop/current/mapping.html#mapping-date
Вы должны использовать формат https://en.wikipedia.org/wiki/ISO_8601.
Спасибо за ответ! Я пробовал использовать currentDF.withColumn("CHARTDATE", to_date(col("CHARTDATE"), "yyyy-mm-dd'T'hh:mm:ss.SSSZ"))
и, к сожалению, получил ту же проблему. Я рассмотрю это преобразование вместе с методом раздачи @tricky ниже.
Кроме того, для отладки того, что отправляет Spark, вы можете использовать HTTP-прокси, например Charles, это очень помогает понять, как работает es4hadoop. Держать нас в курсе ! Благодарность
В итоге просто нужно было создать метод, который заполнял индекс. Отметки времени и даты обрабатывались правильно.
@mongolol не могли бы вы пояснить, что вы имеете в виду под seeded the index
? У меня такая же проблема
@eugene Я рекурсивно просматриваю схему, возвращаемую df.schema
, для создания сопоставлений JSON, а затем отправляю это в ES перед записью данных. Обратитесь к elastic.co/guide/en/elasticsearch/reference/current/… для API сопоставлений ES и stackoverflow.com/questions/37471346/… (ответ плоской схемы) для рекурсивного просмотра схемы. В этом случае вы ищете поля DateType
и TimestampType
.
Использование запросов на размещение для заполнения индекса ES полями даты решило мою проблему. Спасибо за предложение!