У меня ниже структура
корень
|-- groups: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- programs: struct (nullable = true)
| | | |-- **{ program id }**: struct (nullable = true)
| | | | |-- Date: timestamp (nullable = true)
| | | | |-- Name: string (nullable = true)
| | | | |-- Some_Flags: struct (nullable = true)
| | | | | |-- abc: boolean (nullable = true)
| | | | | |-- def: boolean (nullable = true)
| | | | | |-- ghi: boolean (nullable = true)
| | | | | |-- xyz: boolean (nullable = true)
“groups” : [
{
… some other fields …
“programs” : {
“123c12b123456c1d76a4f265f10f20a0” : {
“name” : “test_program_1”,
“some_flags” : {
“abc” : true,
“def” : true,
“ghi” : false,
“xyz” : true
},
“date” : ISODate(“2019–11–16T03:29:00.000+0000”)
}
}
]
val data = spark.read.json("path").map(customParser) How do I use custom parser to map to case class?
данные поступают из mongo db. Нужно распределить синтаксический анализ, чтобы я мог перебирать каждую строку.
сделано пожалуйста проверьте .спасибо
Поскольку документ json имеет переменный ключ (program id
не является постоянным ключом, но меняется для каждой записи), Spark не может вывести схему. Один из вариантов — обработать документ вручную:
Классы кейсов:
case class SomeFlags(abc: Boolean, def1: Boolean, ghi: Boolean, xyz: Boolean)
case class Program(var programId: String, date: String, name: String, someFlags: SomeFlags)
case class Group(programs: Array[Program])
case class Groups(groups: Array[Group])
сопутствующие объекты для извлечения полей данных из строки json:
object Groups {
def unapply(values: Map[String, Object]) = try {
val groups = values("groups").asInstanceOf[List[Map[String, Object]]]
val grps = new ListBuffer[Group]()
for (group <- groups) {
val Group(grp) = group
grps += grp
}
Some(Groups(Array(grps: _*)))
} catch {
case NonFatal(ex) => {
println(ex)
None
}
}
}
object Group {
def unapply(values: Map[String, Object]) = try {
val programs = values("programs").asInstanceOf[Map[String, Object]]
val prgs = new ListBuffer[Program]()
for ((k, v) <- programs) {
val Program(prg) = v.asInstanceOf[Map[String, Object]];
prg.programId = k;
prgs += prg;
}
Some(Group(Array(prgs: _*)))
} catch {
case NonFatal(ex) => {
println(ex)
None
}
}
}
object Program {
def unapply(values: Map[String, Object]) = try {
val SomeFlags(flags) = values("some_flags").asInstanceOf[Map[String, Object]]
Some(Program("pid", values("date").asInstanceOf[String], values("name").asInstanceOf[String], flags))
} catch {
case NonFatal(ex) => {
println(ex)
None
}
}
}
object SomeFlags {
def unapply(values: Map[String, Object]) = try {
Some(SomeFlags(values("abc").asInstanceOf[Boolean], values("def").asInstanceOf[Boolean], values("ghi").asInstanceOf[Boolean], values("xyz").asInstanceOf[Boolean]))
} catch {
case NonFatal(ex) => {
println(ex)
None
}
}
}
Критическая часть здесь находится внутри Group.unapply
, где prg.programId
вручную устанавливается на ключ карты, содержащей все программы.
Наконец код Spark. DataframeReader.textFile используется для чтения файла (каждая строка должна содержать целый документ Json). Результатом является Dataset[String]
, и любой другой источник данных, который создает фрейм данных, содержащий один полный документ Json в строке, также будет работать.
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.{DefaultScalaModule, ScalaObjectMapper}
val ds: Dataset[String] = spark.read.textFile(<path to file>)
val ds2: Dataset[Groups] = ds.map(s => {
val mapper = new ObjectMapper() with ScalaObjectMapper //https://stackoverflow.com/a/20034844/2129801
mapper.registerModule(DefaultScalaModule)
val obj = mapper.readValue[Map[String, Object]](s)
val Groups(groups) = obj
groups
})
ds2
теперь имеет схему:
root
|-- groups: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- programs: array (nullable = true)
| | | |-- element: struct (containsNull = true)
| | | | |-- programId: string (nullable = true)
| | | | |-- date: string (nullable = true)
| | | | |-- name: string (nullable = true)
| | | | |-- someFlags: struct (nullable = true)
| | | | | |-- abc: boolean (nullable = false)
| | | | | |-- def1: boolean (nullable = false)
| | | | | |-- ghi: boolean (nullable = false)
| | | | | |-- xyz: boolean (nullable = false)
Что нужно улучшить:
unapply
map
на mapPartitions
, чтобы улучшить производительность
Не могли бы вы добавить несколько строк входного файла json?