Поддержка вложенных массивов и bean-компонентов Spark SQL

Каждый час я получал обновления значений в виде нового DataFrame. Мне приходится уменьшать DataFrames для дедупликации сущностей и отслеживания истории обновлений значений. Поскольку логика сокращения слишком сложна, я преобразовываю DataFrames в JavaRDD, уменьшая, а затем преобразовывая JavaRDD обратно в DataFrame.

Проблема в том, что после сокращения мне приходится использовать вложенные структуры данных.

Вопрос

Я прочитал вывод схемы с использованием отражения, но все равно мне не понятно:

Поддерживает ли Spark SQL только вложенные массивы примитивов или вложенные массивы bean-компонентов?

Почему код варианта 1 не работает, а код случая 2 работает?

Дело 1

Из следующего кода я получил:

scala.MatchError: History(timestamp=1970-01-01 00:00:00.0, value=10.0) (of class com.somepackage.History)

Итак, я могу сделать вывод, что Spark не поддерживает вложенный массив bean-компонентов. Но см. Случай 2.

@Data
@NoArgsConstructor
@AllArgsConstructor
public static class Entity implements Serializable {

    private Integer id;
    private History[] history;
}

@Data
@NoArgsConstructor
@AllArgsConstructor
public static class History implements Serializable {

    private Timestamp timestamp;
    private Double value;
}

JavaRDD<Entity> rdd = JavaSparkContext
    .fromSparkContext(spark().sparkContext())
    .parallelize(asList(
        new Entity(1, new History[] {
           new History(new Timestamp(0L), 10.0)
        })
    ));
spark()
    //EXCEPTION HERE!
    .createDataFrame(rdd, Entity.class)
    .show();

Случай 2

С другой стороны, следующий код корректно работает с вложенными массивами bean-компонентов:

Dataset<Entity> dataSet = spark()
    .read()
    .option("multiLine", true).option("mode", "PERMISSIVE")
    .schema(fromJson("/data/testSchema.json"))
    .json(getAbsoluteFilePath("data/testData.json"))
    .as(Encoders.bean(Entity.class));

JavaRDD<Entity> rdd = dataSet
    .toJavaRDD()
    .mapToPair(o -> tuple(RowFactory.create(o.getId()), o))
    .reduceByKey((o1, o2) -> o2)
    .values()
    .saveAsTextFile("output.json");

-------

private String getAbsoluteFilePath(String relativePath) {
    return this
        .getClass()
        .getClassLoader()
        .getResource("")
        .getPath() + relativePath;
}

private StructType fromJson(String pathToSchema) {
    return (StructType) StructType.fromJson(
        new BufferedReader(
            new InputStreamReader(
                Resources.class.getResourceAsStream(pathToSchema)
            )
        )
            .lines()
            .collect(Collectors.joining(System.lineSeparator()))
    );
}

testData.json

[
  {
    "id": 1,
    "history": [
      {
        "timestamp": "2018-10-29 23:11:44.000",
        "value": 12.5
      }
    ]
  },
  {
    "id": 1,
    "history": [
      {
        "timestamp": "2018-10-30 14:43:05.000",
        "value": 13.2
      }
    ]
  }
]

testSchema.json

{
  "type": "struct",
  "fields": [
    {
      "name": "id",
      "type": "integer",
      "nullable": true,
      "metadata": {}
    },
    {
      "name": "history",
      "type": {
        "type": "array",
        "elementType": {
          "type": "struct",
          "fields": [
            {
              "name": "timestamp",
              "type": "timestamp",
              "nullable": true,
              "metadata": {}
            },
            {
              "name": "value",
              "type": "double",
              "nullable": true,
              "metadata": {}
            }
          ]
        },
        "containsNull": true
      },
      "nullable": true,
      "metadata": {}
    }
  ]
}

Вы поняли это? В частности, вы получили ответ на вопрос «Поддерживает ли Spark SQL только вложенные массивы примитивов или вложенные массивы bean-компонентов?»

sil 16.10.2019 10:39

@SiLaf см. Рабочий пример «Случай 2». Вы можете использовать массив типов структур

VB_ 16.10.2019 11:27
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
В компьютерном программировании биты играют важнейшую роль в представлении и манипулировании данными на двоичном уровне. Побитовые операции...
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Приходилось ли вам сталкиваться с требованиями, в которых вас могли попросить поднять тревогу или выдать ошибку, когда метод Java занимает больше...
Полный курс Java для разработчиков веб-сайтов и приложений
Полный курс Java для разработчиков веб-сайтов и приложений
Получите сертификат Java Web и Application Developer, используя наш курс.
0
2
220
0

Другие вопросы по теме