У меня есть несколько таблиц (с разной степенью различий в схемах, но с общим набором полей), которые я хотел бы объединить и загрузить из бронзы -> серебра поэтапно. Таким образом, цель состоит в том, чтобы перейти от нескольких таблиц к одной таблице, используя DLT.
Пример:
X_Events
Y_Events
.... N_Events
To: All_Events
Я использую цикл for для просмотра всех баз данных -> таблицы, а затем выполняю readStream
, а затем UnionByName
.
Однако, если есть дополнительная таблица, добавленная/измененная динамически, которую мне нужно обработать при следующем запуске, я получаю ошибку контрольной точки.
There are [8] sources in the checkpoint offsets and now there are
[6] sources requested by the query. Cannot continue.
Есть ли способ решить это динамически?
Должен ли я создавать свою собственную инкрементную логику? Есть ли лучший способ добиться этого?
Это задокументированное ограничение Spark Structured Streaming:
Changes in the number or type (i.e. different source) of input sources: This is not allowed.
Но из вашего описания я вижу, что вам может и не понадобиться использовать UnionByName
— вы можете просто иметь N независимых потоков, которые будут писать в одну и ту же таблицу. В случае, когда вы просто добавляете в таблицу, одновременные добавления не приведут к конфликтам записи (каждый поток независим!):
bronze 1 \
bronze 2 \
bronze 3 >--> append to a Silver table
.......... /
bronze N /
В случае, если вам нужно выполнить слияние с целевой таблицей или некоторые другие изменения в ней, вы все равно можете следовать тому же подходу, присоединившись к промежуточной таблице, а затем получив поток из нее и объединив/обновив целевую таблицу. :
bronze 1 \
bronze 2 \
bronze 3 >--> append to an intermediate table --> merge into Silver
.......... /
bronze N /