У меня потоковый процесс в основном выглядит так
Stream(Int, Boolean, Int).Keyby(0, 1).Window().process()
Ключевым моментом является то, что я хочу определить комбинированный ключ, а затем обработать его. Однако, если я использую keyby(0, 1)
и process(... Key: (Int, Boolean), ...)
, ключевой тип в процессе всегда выдает ошибку. Пытался определить keyby(_._1, _._2)
, но не правильно.
Итак, в любом случае определить комбинированный ключ с помощью scala, чтобы я мог вывести тип ключа, такой как (Int, Boolean)
, в следующей функции процесса?
Заранее спасибо!
Проблема в том, что input.keyBy(0, 1).timeWindow(Time.days(1))
создает KeyedStream[(Int, Boolean, Int), Tuple]
, где Tuple
- это кортежный класс Flink. Это также будет тип ключевого параметра функции process
. Чтобы получить доступ к полям Tuple
, вам необходимо вызвать tuple.[T]getField(idx)
с T
в качестве типа поля.
Если вы хотите использовать кортеж Scala в качестве ключа к ProcessWindowFunction
, вам необходимо определить KeySelector
. Следующий фрагмент делает свое дело:
input
.keyBy(a => (a._1, a._2))
.timeWindow(Time.days(1))
.process(new ProcessWindowFunction[(Int, Boolean, Int), Int, (Int, Boolean), TimeWindow] {
override def process(key: (Int, Boolean), context: Context, elements: Iterable[(Int, Boolean, Int)], out: Collector[Int]): Unit = {
out.collect(key._1)
}
})