У меня есть DataBricks DataFrame с
Columns : tno,data_value
Вывод первого запуска Databricks:
tno, data_value
1,hdjsjsjnsns
2,dhjdjdjsnsn
3,jdjsjsjsjsjjs
Когда я снова запускаю тот же блокнот через некоторое время, он должен выглядеть так:
tno, data_value
4,hdjsjsjnsns
5,dhjdjdjsnsn
6,jdjsjsjsjsjjs
Точно так же, как «Последовательность» Oracle или SQL Server.
Я выполнил rownumber() и monotically_increasing_id().
Но они создаются с самого начала с 1 для каждого запуска.
Итак, просто думайте об этом как о транзакционном_ключе. Как этого добиться в pyspark Databricks.
Чтобы достичь вашего требования, вам нужно получить максимальное значение в фрейме данных последнего запуска и, используя его, преобразовать column
в текущем запуске.
После первого запуска блокнота сохраните фрейм данных в файле блоков данных. В следующем блокноте прочитайте данные из этого блокнота в фрейме данных и получите максимальное значение из своего столбца.
После этого добавьте столбец возрастающего идентификатора my_id
в фрейм данных, используя row_number()
, начиная с 1
.
from pyspark.sql.functions import *
from pyspark.sql.window import *
last_max = my_df.select(max(my_df.tno)).collect()[0]['max(tno)']
print(last_max)
window = Window.orderBy(col('tno'))
df_id = my_df.withColumn('my_id', row_number().over(window))
df_id.show()
Теперь добавьте значение last_max
в столбец my_id
, сохраните каждую строку в столбце tno
и удалите дополнительный столбец my_id
.
res_df=df_id.withColumn("tno", col("my_id") + lit(last_max)).drop('my_id')
res_df.display()
В конце перезапишите фрейм данных в тот же файл блоков данных, чтобы то же самое можно было сделать при следующем запуске блокнота.
Результат:
Вы также можете сохранить максимальное значение столбца tno
в файл и использовать его при следующем запуске.
Вы можете просмотреть эту ссылку от @ ram.sankarasubramanian, чтобы узнать об этом больше.
Если он параллельный, вам нужно использовать столбец идентификаторов с использованием временной таблицы SQL, как предложено @Chris, AFAIK, вам нужно либо запустить его в последовательности, либо использовать столбец идентификаторов, поскольку у pyspark нет столбца идентификаторов или транзакционного ключа.
Я выполнил rownumber() и monotically_increasing_id().
Но они создаются с самого начала с 1 для каждого запуска.
Итак, просто думайте об этом как о транзакционном_ключе. Как этого добиться в pyspark Databricks.
для действительно уникальных номеров и хранения идентификаторов с разницей в блоках данных вы можете использовать: identity_columns
Если вам нужен общий, но гарантированно уникальный, вам нужна либо генерация стиля снежинки (Качество имеет 160-битный вариант, не тестируется через pyspark), либо центральный сервер, который резервирует блоки (как это делает Oracle) и разделы карты.
Это работает, если я работаю над таблицей SQL. Но мне нужно получить их в фрейме данных, чтобы я мог записать их в целевые файлы Json. Итак, здесь я даже не могу использовать временную дельта-таблицу в качестве сцены, потому что мне нужно сгенерировать только эти 3 записи json, а не всю дельта-таблицу, трудно выбрать точные данные.
Тогда, похоже, вам нужно использовать функцию стиля снежинки, вы можете попробовать функцию «Качество» (должна работать с pyspark, но не будет работать с подключением)
Даже я подумал об этом, проблема здесь в том, что когда параллельные экземпляры выполняют одно и то же задание, например, если запуск 1 и запуск 2 выполняются почти одновременно. Эта логика не сработает. Итак, если быть точным, нужен что-то вроде транзакционного ключа.