Spark анализирует вложенный json с переменными ключами json

У меня ниже структура

корень

 |-- groups: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- programs: struct (nullable = true)
 |    |    |    |-- **{ program id }**: struct (nullable = true)
 |    |    |    |    |-- Date: timestamp (nullable = true)
 |    |    |    |    |-- Name: string (nullable = true)
 |    |    |    |    |-- Some_Flags: struct (nullable = true)
 |    |    |    |    |    |-- abc: boolean (nullable = true)
 |    |    |    |    |    |-- def: boolean (nullable = true)
 |    |    |    |    |    |-- ghi: boolean (nullable = true)
 |    |    |    |    |    |-- xyz: boolean (nullable = true)




“groups” : [
 {
  … some other fields …
  “programs” : {
     “123c12b123456c1d76a4f265f10f20a0” : {
        “name” : “test_program_1”, 
        “some_flags” : {
           “abc” : true, 
           “def” : true, 
           “ghi” : false, 
           “xyz” : true
        }, 
        “date” : ISODate(“2019–11–16T03:29:00.000+0000”)
     }
 }
]

val data = spark.read.json("path").map(customParser) How do I use custom parser to map to case class?

данные поступают из mongo db. Нужно распределить синтаксический анализ, чтобы я мог перебирать каждую строку.

Не могли бы вы добавить несколько строк входного файла json?

werner 02.04.2022 15:12

сделано пожалуйста проверьте .спасибо

Dot Net Dev 19 02.04.2022 16:02
Как сделать HTTP-запрос в Javascript?
Как сделать HTTP-запрос в Javascript?
В JavaScript вы можете сделать HTTP-запрос, используя объект XMLHttpRequest или более новый API fetch. Вот пример для обоих методов:
1
2
39
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Поскольку документ json имеет переменный ключ (program id не является постоянным ключом, но меняется для каждой записи), Spark не может вывести схему. Один из вариантов — обработать документ вручную:

Классы кейсов:

case class SomeFlags(abc: Boolean, def1: Boolean, ghi: Boolean, xyz: Boolean)
case class Program(var programId: String, date: String, name: String, someFlags: SomeFlags)
case class Group(programs: Array[Program])
case class Groups(groups: Array[Group])

сопутствующие объекты для извлечения полей данных из строки json:


object Groups {
  def unapply(values: Map[String, Object]) = try {
    val groups = values("groups").asInstanceOf[List[Map[String, Object]]]
    val grps = new ListBuffer[Group]()
    for (group <- groups) {
      val Group(grp) = group
      grps += grp
    }
    Some(Groups(Array(grps: _*)))
  } catch {
    case NonFatal(ex) => {
      println(ex)
      None
    }
  }
}
  
object Group {
  def unapply(values: Map[String, Object]) = try {
    val programs = values("programs").asInstanceOf[Map[String, Object]]
    val prgs = new ListBuffer[Program]()
    for ((k, v) <- programs) {
      val Program(prg) = v.asInstanceOf[Map[String, Object]];
      prg.programId = k;
      prgs += prg;
    }
    Some(Group(Array(prgs: _*)))
  } catch {
    case NonFatal(ex) => {
      println(ex)
      None
    }
  }
}

object Program {
  def unapply(values: Map[String, Object]) = try {
    val SomeFlags(flags) = values("some_flags").asInstanceOf[Map[String, Object]]
    Some(Program("pid", values("date").asInstanceOf[String], values("name").asInstanceOf[String], flags))
  } catch {
    case NonFatal(ex) => {
      println(ex)
      None
    }
  }
}

object SomeFlags {
  def unapply(values: Map[String, Object]) = try {
    Some(SomeFlags(values("abc").asInstanceOf[Boolean], values("def").asInstanceOf[Boolean], values("ghi").asInstanceOf[Boolean], values("xyz").asInstanceOf[Boolean]))
  } catch {
    case NonFatal(ex) => {
      println(ex)
      None
    }
  }
}

Критическая часть здесь находится внутри Group.unapply, где prg.programId вручную устанавливается на ключ карты, содержащей все программы.

Наконец код Spark. DataframeReader.textFile используется для чтения файла (каждая строка должна содержать целый документ Json). Результатом является Dataset[String], и любой другой источник данных, который создает фрейм данных, содержащий один полный документ Json в строке, также будет работать.

import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.{DefaultScalaModule, ScalaObjectMapper}

val ds: Dataset[String] = spark.read.textFile(<path to file>)

val ds2: Dataset[Groups] = ds.map(s => {
  val mapper = new ObjectMapper() with ScalaObjectMapper //https://stackoverflow.com/a/20034844/2129801
  mapper.registerModule(DefaultScalaModule)
  val obj = mapper.readValue[Map[String, Object]](s)
  val Groups(groups) = obj
  groups
})

ds2 теперь имеет схему:

root
 |-- groups: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- programs: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- programId: string (nullable = true)
 |    |    |    |    |-- date: string (nullable = true)
 |    |    |    |    |-- name: string (nullable = true)
 |    |    |    |    |-- someFlags: struct (nullable = true)
 |    |    |    |    |    |-- abc: boolean (nullable = false)
 |    |    |    |    |    |-- def1: boolean (nullable = false)
 |    |    |    |    |    |-- ghi: boolean (nullable = false)
 |    |    |    |    |    |-- xyz: boolean (nullable = false)

Что нужно улучшить:

  • улучшенная обработка ошибок в методах unapply
  • замените функцию map на mapPartitions, чтобы улучшить производительность

Другие вопросы по теме