Каждый час я получал обновления значений в виде нового DataFrame. Мне приходится уменьшать DataFrames для дедупликации сущностей и отслеживания истории обновлений значений. Поскольку логика сокращения слишком сложна, я преобразовываю DataFrames в JavaRDD, уменьшая, а затем преобразовывая JavaRDD обратно в DataFrame.
Проблема в том, что после сокращения мне приходится использовать вложенные структуры данных.
Я прочитал вывод схемы с использованием отражения, но все равно мне не понятно:
Поддерживает ли Spark SQL только вложенные массивы примитивов или вложенные массивы bean-компонентов?
Почему код варианта 1 не работает, а код случая 2 работает?
Из следующего кода я получил:
scala.MatchError: History(timestamp=1970-01-01 00:00:00.0, value=10.0) (of class com.somepackage.History)
Итак, я могу сделать вывод, что Spark не поддерживает вложенный массив bean-компонентов. Но см. Случай 2.
@Data
@NoArgsConstructor
@AllArgsConstructor
public static class Entity implements Serializable {
private Integer id;
private History[] history;
}
@Data
@NoArgsConstructor
@AllArgsConstructor
public static class History implements Serializable {
private Timestamp timestamp;
private Double value;
}
JavaRDD<Entity> rdd = JavaSparkContext
.fromSparkContext(spark().sparkContext())
.parallelize(asList(
new Entity(1, new History[] {
new History(new Timestamp(0L), 10.0)
})
));
spark()
//EXCEPTION HERE!
.createDataFrame(rdd, Entity.class)
.show();
С другой стороны, следующий код корректно работает с вложенными массивами bean-компонентов:
Dataset<Entity> dataSet = spark()
.read()
.option("multiLine", true).option("mode", "PERMISSIVE")
.schema(fromJson("/data/testSchema.json"))
.json(getAbsoluteFilePath("data/testData.json"))
.as(Encoders.bean(Entity.class));
JavaRDD<Entity> rdd = dataSet
.toJavaRDD()
.mapToPair(o -> tuple(RowFactory.create(o.getId()), o))
.reduceByKey((o1, o2) -> o2)
.values()
.saveAsTextFile("output.json");
-------
private String getAbsoluteFilePath(String relativePath) {
return this
.getClass()
.getClassLoader()
.getResource("")
.getPath() + relativePath;
}
private StructType fromJson(String pathToSchema) {
return (StructType) StructType.fromJson(
new BufferedReader(
new InputStreamReader(
Resources.class.getResourceAsStream(pathToSchema)
)
)
.lines()
.collect(Collectors.joining(System.lineSeparator()))
);
}
testData.json
[
{
"id": 1,
"history": [
{
"timestamp": "2018-10-29 23:11:44.000",
"value": 12.5
}
]
},
{
"id": 1,
"history": [
{
"timestamp": "2018-10-30 14:43:05.000",
"value": 13.2
}
]
}
]
testSchema.json
{
"type": "struct",
"fields": [
{
"name": "id",
"type": "integer",
"nullable": true,
"metadata": {}
},
{
"name": "history",
"type": {
"type": "array",
"elementType": {
"type": "struct",
"fields": [
{
"name": "timestamp",
"type": "timestamp",
"nullable": true,
"metadata": {}
},
{
"name": "value",
"type": "double",
"nullable": true,
"metadata": {}
}
]
},
"containsNull": true
},
"nullable": true,
"metadata": {}
}
]
}
@SiLaf см. Рабочий пример «Случай 2». Вы можете использовать массив типов структур




Вы поняли это? В частности, вы получили ответ на вопрос «Поддерживает ли Spark SQL только вложенные массивы примитивов или вложенные массивы bean-компонентов?»