Как создать идентификатор непрерывной последовательности независимо от запусков в Databricks

У меня есть DataBricks DataFrame с

Columns : tno,data_value

Вывод первого запуска Databricks:

tno, data_value
1,hdjsjsjnsns
2,dhjdjdjsnsn
3,jdjsjsjsjsjjs

Когда я снова запускаю тот же блокнот через некоторое время, он должен выглядеть так:

tno, data_value
4,hdjsjsjnsns
5,dhjdjdjsnsn
6,jdjsjsjsjsjjs

Точно так же, как «Последовательность» Oracle или SQL Server.

Я выполнил rownumber() и monotically_increasing_id().

Но они создаются с самого начала с 1 для каждого запуска.

Итак, просто думайте об этом как о транзакционном_ключе. Как этого добиться в pyspark Databricks.

Как установить LAMP Stack - Security 5/5 на виртуальную машину Azure Linux VM
Как установить LAMP Stack - Security 5/5 на виртуальную машину Azure Linux VM
В предыдущей статье мы завершили установку базы данных, для тех, кто не знает.
Как установить LAMP Stack 1/2 на Azure Linux VM
Как установить LAMP Stack 1/2 на Azure Linux VM
В дополнение к нашему предыдущему сообщению о намерении Azure прекратить поддержку Azure Database для MySQL в качестве единого сервера после 16...
1
0
63
2
Перейти к ответу Данный вопрос помечен как решенный

Ответы 2

Чтобы достичь вашего требования, вам нужно получить максимальное значение в фрейме данных последнего запуска и, используя его, преобразовать column в текущем запуске.

После первого запуска блокнота сохраните фрейм данных в файле блоков данных. В следующем блокноте прочитайте данные из этого блокнота в фрейме данных и получите максимальное значение из своего столбца.

После этого добавьте столбец возрастающего идентификатора my_id в фрейм данных, используя row_number(), начиная с 1.

from pyspark.sql.functions import *
from pyspark.sql.window import *

last_max = my_df.select(max(my_df.tno)).collect()[0]['max(tno)']
print(last_max)

window = Window.orderBy(col('tno'))
df_id = my_df.withColumn('my_id', row_number().over(window))
df_id.show()

Теперь добавьте значение last_max в столбец my_id, сохраните каждую строку в столбце tno и удалите дополнительный столбец my_id.

res_df=df_id.withColumn("tno", col("my_id") + lit(last_max)).drop('my_id')
res_df.display()

В конце перезапишите фрейм данных в тот же файл блоков данных, чтобы то же самое можно было сделать при следующем запуске блокнота.

Результат:

Вы также можете сохранить максимальное значение столбца tno в файл и использовать его при следующем запуске.

Вы можете просмотреть эту ссылку от @ ram.sankarasubramanian, чтобы узнать об этом больше.

Даже я подумал об этом, проблема здесь в том, что когда параллельные экземпляры выполняют одно и то же задание, например, если запуск 1 и запуск 2 выполняются почти одновременно. Эта логика не сработает. Итак, если быть точным, нужен что-то вроде транзакционного ключа.

Rocking Surya 25.07.2024 22:21

Если он параллельный, вам нужно использовать столбец идентификаторов с использованием временной таблицы SQL, как предложено @Chris, AFAIK, вам нужно либо запустить его в последовательности, либо использовать столбец идентификаторов, поскольку у pyspark нет столбца идентификаторов или транзакционного ключа.

Rakesh Govindula 26.07.2024 05:23
Ответ принят как подходящий

Я выполнил rownumber() и monotically_increasing_id().

Но они создаются с самого начала с 1 для каждого запуска.

Итак, просто думайте об этом как о транзакционном_ключе. Как этого добиться в pyspark Databricks.

для действительно уникальных номеров и хранения идентификаторов с разницей в блоках данных вы можете использовать: identity_columns

Если вам нужен общий, но гарантированно уникальный, вам нужна либо генерация стиля снежинки (Качество имеет 160-битный вариант, не тестируется через pyspark), либо центральный сервер, который резервирует блоки (как это делает Oracle) и разделы карты.

Это работает, если я работаю над таблицей SQL. Но мне нужно получить их в фрейме данных, чтобы я мог записать их в целевые файлы Json. Итак, здесь я даже не могу использовать временную дельта-таблицу в качестве сцены, потому что мне нужно сгенерировать только эти 3 записи json, а не всю дельта-таблицу, трудно выбрать точные данные.

Rocking Surya 25.07.2024 22:24

Тогда, похоже, вам нужно использовать функцию стиля снежинки, вы можете попробовать функцию «Качество» (должна работать с pyspark, но не будет работать с подключением)

Chris 26.07.2024 10:42

Другие вопросы по теме

Похожие вопросы

Функция Azure с проблемой сети речи Azure AI (WS_OPEN_ERROR_UNDERLYING_IO_OPEN_FAILED)
Я не могу извлечь таблицы .sql CREATE TABLE для каждой таблицы в базе данных SQL Azure в репозитории Azure
Уведомление конвейера Azure DevOps для прямого сообщения команд
Как добавить пользовательские утверждения для пользователей Azure AD
Azure: пакета под названием «dynlm» не существует
Пока включено иерархическое пространство имен, невозможно загрузить большой двоичный объект с метаданными hdi_isfolder
Можете ли вы создать «Потоки пользователей» в Azure с помощью учетной записи с оплатой по мере использования?
Пользовательское утверждение Entra внешнего идентификатора в токене идентификатора клиента, а не в ClaimsPrincipal пользователя веб-API ASP.NET Core
SchemaColumnConvertNotSupportedException: столбец: [Col_Name], физический тип: INT64, логический тип: строка
Невозможно прочитать большие двоичные объекты через BlobContainerClient для больших двоичных объектов с пустыми префиксами папок