У меня есть потоковый фрейм данных, который может выглядеть примерно так:
+--------------------+--------------------+
| owner| fruits|
+--------------------+--------------------+
|Brian | apple|
Brian | pear |
Brian | date|
Brian | avocado|
Bob | avocado|
Bob | apple|
........
+--------------------+--------------------+
Я выполнил groupBy, agg collect_list, чтобы навести порядок.
val myFarmDF = farmDF.withWatermark("timeStamp", "1 seconds").groupBy("fruits").agg(collect_list(col("fruits")) as "fruitsA")
на выходе получается одна строка для каждого владельца и массив каждого фрукта. Теперь я хотел бы присоединить этот очищенный массив к исходному фрейму данных потоковой передачи, отбросив столбец фруктов и просто получив столбец плод
val joinedDF = farmDF.join(myFarmDF, "owner").drop("fruits")
это, кажется, работает в моей голове, но Спарк, похоже, не согласен.
Я получаю
Failure when resolving conflicting references in Join:
'Join Inner
...
+- AnalysisBarrier
+- Aggregate [name#17], [name#17, collect_list(fruits#61, 0, 0) AS fruitA#142]
Когда я превращаю все в статический фрейм данных, он работает нормально. Разве это невозможно в контексте потоковой передачи?





Вы пробовали переименовать имя столбца? Есть аналогичная проблема https://issues.apache.org/jira/browse/SPARK-19860
@Brian, этот код работает для вас? Я использую Kafka как для источника, так и для приемника, и, если я использую outputMode("append"), я получаю Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermark;;, а если я использую outputMode("update"), я получаю Inner join between two streaming DataFrames/Datasets is not supported in Update output mode, only in Append output mode;;.
Я действительно понял это и забыл обновить ветку.