Я хотел бы иметь возможность хранить различные связанные типы в Spark DataFrame, но работать со строго типизированными классами случаев через DataSet. Например. скажем, у меня есть признак Base
и два класса случаев A
и B
, которые расширяют этот признак:
trait Base {
def name: String
}
case class A(name: String, number: Int) extends Base
case class B(name: String, text: String) extends Base
Я хотел бы создать val lb = List[Base](A("Alice", 20), B("Bob", "Foo"))
, а затем создать DataFrame через lb.toDS()
. Неудивительно, что это не работает, поскольку для этого признака нет кодировщика для его различных расширенных классов.
Я мог бы вручную создать класс Case, представляющий структуру, которая может хранить информацию как для A
, так и для B
:
case class Struct(typ: String, name: String, number: Option[Int] = None, text: Option[String] = None)
И я мог бы добавить несколько функций для создания Struct
из экземпляра черты Base
и наоборот:
trait Base {
def name: String
def asStruct: Struct = {
this match {
case A(name, number) => Struct("A", name, number = Some(number))
case B(name, text) => Struct("B", name, text = Some(text))
}
}
}
case class Struct(typ: String, name: String, number: Option[Int] = None, text: Option[String] = None) {
def asBase: Base = {
this match {
case Struct("A", name, Some(number), None) => A(name, number)
case Struct("B", name, None, Some(text)) => B(name, text)
case _ => throw new Exception(f"Invalid Base structure {s}")
}
}
}
Затем я могу создать свой DataFrame следующим образом:
val a = A("Alice", 32)
val b = B("Bob", "foo")
val ls = List[Struct](a.asStruct, b.asStruct)
val sparkSession = spark
import sparkSession.implicits._
val df = ls.toDS()
df.show()
+---+-----+------+----+
|typ| name|number|text|
+---+-----+------+----+
| A|Alice| 32|NULL|
| B| Bob| NULL| foo|
+---+-----+------+----+
Я могу работать с этим подходом, но мне интересно, можно ли написать кодировщик, который автоматически обрабатывает класс Base
как класс Struct
, используя метод asStruct
, написанный выше?
Спасибо, я не знаю, почему я не смог найти эти статьи при поиске.
безрамочные инъекции :
// the injector
implicit val baseHolder: Injection[Base, Struct] = new Injection[Base, Struct] with Serializable {
def invert(a: Struct): Base =
a match {
case Struct("A", name, Some(number), None) => A(name, number)
case Struct("B", name, None, Some(text)) => B(name, text)
case _ => throw new Exception(f"Invalid Base structure {s}")
}
def apply(b: Base): Struct =
b match {
case A(name, number) => Struct("A", name, number = Some(number))
case B(name, text) => Struct("B", name, text = Some(text))
}
}
import frameless._
implicit val enc = TypedExpressionEncoder[Base]
import sparkSession.implicits._
val lb: Dataset[Base] = Seq[Base](A("Alice", 20), B("Bob", "Foo")).toDS
lb.show
дает:
+---+-----+------+----+
|typ| name|number|text|
+---+-----+------+----+
| A|Alice| 20|null|
| B| Bob| null| Foo|
+---+-----+------+----+
но действительно ли они на пятёрки и четверки?
lb.collect().foreach{ b =>
println(b.getClass.getName)
}
покажет А и Б.
Порядок параметров при внедрении важен: левый — это тот, для которого мы создаем внедрение, а второй параметр — это тип, который мы храним в фрейме данных.
Это очень здорово. Именно то, что я искал.
Крис, еще один вопрос. Я протестировал набор данных, сформированный из класса Case, содержащего базовые поля. Если я вызываю UDF, который принимает параметр Base, декодирование не работает... вместо этого UDF передается тип Struct. Является ли это ограничением или есть какое-то другое неявное свойство, которое я могу объявить, чтобы преобразовать структуру в A или B?
Это был мой класс Case class C(x: Base, y: Base) и мой тест: val a = A("Alice", 32) val b = B("Bob", "foo") val lb = Seq(C(a, b)) val ds: Dataset[C] = lb.toDS defprocessBase(b: Base): String = { b match { case A(name, Number) => f"$name имеет номер $ число" case B(name, text) => f"$name содержит текст $text" case _ => "Не знаю" } } valprocessBaseUdf = udf(processBase(_)) var df = ds.withColumn(" x",processBaseUdf(ds("x"))) df = df.withColumn("y",processBaseUdf(ds("y")))
вам, вероятно, следует задать это как другой вопрос и показать код. Я предполагаю, что вывод типа для C неверен, и вам следует использовать: неявный val cenc = TypedExpressionEncoder[C]
также вам нужно будет указать, тестируете ли вы локально или в затененной банке (перемещение scala с оттенком maven не работает)
Хорошо, я настрою ценс и проверю, работает ли это. Если нет, я задам еще один вопрос и покажу весь код. Спасибо за помощь.
Я задал вопрос о декодировании и UDF здесь: stackoverflow.com/questions/78349325/…