Действия над rdd выполняются на одном узле, когда члены содержат ленивые значения, оцениваемые путем отражения.

Я пытаюсь преобразовать RDD объектов в DataFrame с той особенностью, что указанные объекты имеют ленивые значения.

Чтобы сделать это без необходимости указывать схему каждого объекта вручную, я создал трейт, который преобразует этот объект в формат JSON (или любой другой текстовый формат). Код следующий:

trait Formattable {

  lazy val fieldsMap: Map[String, Any] = {
    val mirror: Mirror = runtimeMirror(this.getClass.getClassLoader)
    val classSymbol: ClassSymbol = mirror.classSymbol(this.getClass)
    val instanceMirror: InstanceMirror = mirror.reflect(this)

    val laziesNames = classSymbol.info.decls.filter(_.toString.matches("(^|.* )lazy.*$")).map(_.name.toString.trim)
    val nonMethodsNames = classSymbol.info.decls.filter(!_.isMethod).map(_.name.toString.trim)
    val nonLaziesNames = nonMethodsNames.toSet -- laziesNames.toSet

    (laziesNames
      .map(name => name -> instanceMirror.reflectMethod(classSymbol.toType.member(TermName(name)).asMethod)()) ++
      nonLaziesNames
        .map(name => name -> instanceMirror.reflectField(classSymbol.toType.member(TermName(name)).asTerm).get)).toMap
  }

  def toJSON: String = {
    val fieldsStr =
      fieldsMap.map(e => s""""${e._1.trim}": ${format(e._2)}""").mkString(",")
    s"""{$fieldsStr}"""
  }

  def format(raw: Any, listSeparator: String = ","): String =
    raw match {
      case null => """"""""
      case _: String => s""""${raw.toString}""""
      case _: Char => s""""${raw.toString}""""
      case _: Boolean => raw.toString
      case _: Int => raw.toString
      case _: Float => raw.toString
      case _: Double => raw.toString
      case _: List[Any] =>
        s""""${
          raw.asInstanceOf[List[Any]]
            .map(format(_, listSeparator)).mkString(listSeparator).replace('"', ''')
        }""""
      case _: Map[Any, Any] =>
        s""""${
          raw.asInstanceOf[Map[Any, Any]].values
            .map(format(_, listSeparator)).mkString(listSeparator).replace('"', ''')
        }""""
      case opt: Option[Any] =>
        s""""${
          opt match {
            case Some(value) => value.toString
            case None => ""
          }
        }""""
      case obj: Product =>
        val objClassMirror: Mirror = runtimeMirror(obj.getClass.getClassLoader)
        val objClassSymbol: ClassSymbol = objClassMirror.classSymbol(obj.getClass)
        val objInstanceMirror: InstanceMirror = objClassMirror.reflect(obj)
        s""""${
          objClassSymbol.info.decls
            .filter(!_.isMethod) // Filter only attributes
            .map(sym => format(objInstanceMirror.reflectField(sym.asTerm).get, listSeparator))
            .mkString(listSeparator)
        }""""
      case date: Date => s""""${new SimpleDateFormat("yyyy-MM-dd").format(date)}""""
      case _ => s""""${raw.toString}""""
    }


}

Затем мои классы расширяют эту черту, и я конвертирую их в JSON, вызывая метод toJSON (), а затем читаю их с помощью метода, предоставленного SparkSession:

case class Test (
                  field1: String,
                  field2: Double,
                  ...
                  fieldN: Double
                ) extends Formattable {
  lazy val lazyField1: Double = field2 * fieldN
  lazy val lazyField2: Double = fieldN / field2
  lazy val lazyField3: Double = lazyField1 * lazyField2
  ...
}

Код несколько сложнее, поэтому мне пришлось использовать ленивые значения. Проблема возникает, когда я пытаюсь преобразовать RDD объектов этого типа в DataFrame, поскольку действие выполняется в одном узле, тем самым теряя возможность распараллеливания Spark:

val spark: SparkSession = ...
val myRDD: RDD[Test] = ... 

import spark.implicits._
val df: DataFrame = spark.read.json(myRDD.map(_.toJSON).toDS())

Чтобы поместить вас в контекст, я использую следующие версии:

Spark 2.3.0, Scala 2.11.0, Мастер: пряжа-клиент

Ты хоть представляешь, почему это может быть? Заранее большое спасибо.

Можете ли вы использовать существующий сериализатор JSON? Скорее всего, он будет работать лучше, будет быстрее и не потребует обслуживания с вашей стороны.

Ethan 10.08.2018 17:50

Спасибо за ответ, Итан. Ранее я пробовал сериализовать с помощью json4s, который является одной из самых надежных библиотек для Scala, и он не требует ленивого val во время генерации вывода. Вдобавок у меня также были проблемы при интерпретации переменных Broadcast. Подозреваю, что эта проблема может быть из-за использования отражения, продолжаю исследовать.

Adrián Ramos 14.08.2018 11:10
0
2
42
0

Другие вопросы по теме