Автоматически созданный уникальный идентификатор типа bigint в блоках данных Azure pyspark

Я хочу создать автоматически сгенерированный идентификатор, который должен быть уникальным для всего задания и иметь тип Integer. Если я использую monotonically_increasing_id, он генерирует уникальный идентификатор, но только для конкретного запуска задания, при запуске следующего пакета он конфликтует с идентификатором.

У меня есть пара столбцов, таких как patchid, которые представляют собой отметку даты и времени, и если я соединим «monotonically_increasing_id», это может сработать, но из-за искрового параллелизма и диапазона bigint могут возникнуть проблемы.

Я также пробовал раздел окна с runid (идентификатором задания), но он не дает мне уникального значения.

Я перечисляю все варианты, которые пробовал, любая помощь может быть полезна.

df = spark.table("table")
windowSpec = Window.partitionBy("runid", "batchid").orderBy("batchid")

df.withColumn("id", (hash(concat(col("id"), lit("_"), col("batchid"), lit("_"), col("runid"))) % 10000000000).cast("bigint"))


df.withColumn("id", (abs(hash(concat(col("batchid"), lit("_"), monotonically_increasing_id(), lit("_")))) % 10000000000).cast("bigint"))

df = df.withColumn(
"Id",
(hash(concat(
    format_string("%010d", unix_timestamp(current_timestamp()) * 1000),  # Timestamp in milliseconds
    format_string("%05d", monotonically_increasing_id() % 100000),       # Ensuring uniqueness for the same millisecond, restrict to 5 digits
    concat_ws("_", *unique_cols)                                        # Concatenate the unique columns
)) + lit(2**31)).cast("bigint"))

df = df.withColumn("unique_id", 
               (monotonically_increasing_id() * 1000000000) +  
               (row_number().over(windowSpec) * 100000) +       
               col("runid").cast("bigint") +                    
               col("batchid").cast("bigint")) 
Стоит ли изучать PHP в 2023-2024 годах?
Стоит ли изучать PHP в 2023-2024 годах?
Привет всем, сегодня я хочу высказать свои соображения по поводу вопроса, который я уже много раз получал в своем сообществе: "Стоит ли изучать PHP в...
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
В JavaScript одним из самых запутанных понятий является поведение ключевого слова "this" в стрелочной и обычной функциях.
Приемы CSS-макетирования - floats и Flexbox
Приемы CSS-макетирования - floats и Flexbox
Здравствуйте, друзья-студенты! Готовы совершенствовать свои навыки веб-дизайна? Сегодня в нашем путешествии мы рассмотрим приемы CSS-верстки - в...
Тестирование функциональных ngrx-эффектов в Angular 16 с помощью Jest
В системе управления состояниями ngrx, совместимой с Angular 16, появились функциональные эффекты. Это здорово и делает код определенно легче для...
Концепция локализации и ее применение в приложениях React ⚡️
Концепция локализации и ее применение в приложениях React ⚡️
Локализация - это процесс адаптации приложения к различным языкам и культурным требованиям. Это позволяет пользователям получить опыт, соответствующий...
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
0
0
127
2
Перейти к ответу Данный вопрос помечен как решенный

Ответы 2

monotonically_increasing_id() не гарантирует уникальные значения для разных запусков задания или нескольких этапов одного выполнения задания.

Не рекомендуется вручную объединять столбцы с помощью monotonically_increasing_id() или других функций, таких как current_timestamp() и row_number(), для создания уникального идентификатора. Это может не гарантировать уникальность, особенно при параллельной работе.

Вместо этого вы можете рассмотреть возможность использования комбинации нескольких столбцов для создания составного ключа, обеспечивающего уникальность.

Я попробовал следующий подход:

from pyspark.sql import SparkSession
from pyspark.sql.functions import concat, col, lit, row_number
from pyspark.sql.window import Window
data = [
    ("2022-01-01", "Job1", "Data1"),
    ("2022-01-01", "Job1", "Data2"),
    ("2022-01-02", "Job1", "Data3"),
    ("2022-01-02", "Job2", "Data4"),
    ("2022-01-03", "Job2", "Data5"),
    ("2022-01-03", "Job2", "Data6"),
]
df = spark.createDataFrame(data, ["batchid", "runid", "data"])
windowSpec = Window.partitionBy("runid", "batchid").orderBy("batchid")
df = df.withColumn(
    "unique_id",
    concat(col("runid"), lit("_"), col("batchid"), lit("_"), row_number().over(windowSpec))
)
df.show()

Результат:

+----------+-----+-----+-----------------+
|   batchid|runid| data|        unique_id|
+----------+-----+-----+-----------------+
|2022-01-01| Job1|Data1|Job1_2022-01-01_1|
|2022-01-01| Job1|Data2|Job1_2022-01-01_2|
|2022-01-02| Job1|Data3|Job1_2022-01-02_1|
|2022-01-02| Job2|Data4|Job2_2022-01-02_1|
|2022-01-03| Job2|Data6|Job2_2022-01-03_1|
|2022-01-03| Job2|Data5|Job2_2022-01-03_2|
+----------+-----+-----+-----------------+

В приведенном выше коде я использовал комбинацию runid, patchid и row_number() внутри оконной функции для генерации уникального идентификатора. Этот подход обеспечивает уникальность при различных запусках заданий и обеспечивает параллелизм за счет использования оконной функции для назначения уникальных номеров внутри каждого раздела, определенных runid и Battid.

Используя UUIDs (org.apache.spark.sql.functions.uuid()), UUID гарантируют уникальность для разных узлов и запусков заданий.

Вот пример:

from pyspark.sql import SparkSession
from pyspark.sql.functions import expr
data = [("Alice", 34), ("Bob", 45), ("Charlie", 37)]
df = spark.createDataFrame(data, ["name", "age"])
df_with_uuid = df.withColumn("uuid", expr("uuid()"))
df_with_uuid.show(truncate=False)

Полученные результаты:

+-------+---+------------------------------------+
|name   |age|uuid                                |
+-------+---+------------------------------------+
|Alice  |34 |1634bc71-bec0-4efe-a2f4-793c363110d5|
|Bob    |45 |7ec2f25f-da0e-42ac-a52a-c665ccc12350|
|Charlie|37 |b2881250-8b5d-4ec8-8121-d6ec7690416e|
+-------+---+------------------------------------+

Как вы упомянули, проблема в том, что идентификатор должен иметь тип bigint. Вы можете попробовать создать уникальный идентификатор типа bigint, гарантируя при этом уникальность при выполнении заданий, используя комбинацию столбцов и хеширование.

Я попробовал функцию md5 для хэширования объединенной строки, но результат был NULL. Это связано с тем, что функция md5 в PySpark не поддерживает значения хеширования с типом данных BigInt. Чтобы обойти эту проблему, вы можете привести столбец ID к строковому типу данных перед применением функции md5.

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, concat_ws, md5
df = df.withColumn("unique_id", concat_ws("_", col("runid"), col("batchid"), col("data")))
df = df.withColumn("id", md5(col("unique_id").cast("string")))
+----------+-----+-----+--------------------+--------------------+
|   batchid|runid| data|           unique_id|                  id|
+----------+-----+-----+--------------------+--------------------+
|2022-01-01| Job1|Data1|Job1_2022-01-01_D...|fffdd2babb8e9b60b...|
|2022-01-01| Job1|Data2|Job1_2022-01-01_D...|b8a430deba3f61223...|
|2022-01-02| Job1|Data3|Job1_2022-01-02_D...|3af865f308eff7fb8...|
|2022-01-02| Job2|Data4|Job2_2022-01-02_D...|56537c8a822bd1b7f...|
|2022-01-03| Job2|Data5|Job2_2022-01-03_D...|092dd4548f93e0070...|
|2022-01-03| Job2|Data6|Job2_2022-01-03_D...|8e7171be80fe94097...|
+----------+-----+-----+--------------------+--------------------+

Приведя столбец ID к строке (col("unique_id").cast("string")), функция md5 получит строковый ввод.

Спасибо, но проблема в том, что идентификатор должен иметь тип bigint, это правило.

ashK 06.05.2024 07:33

это все еще строка, но мне нужен bigint

ashK 06.05.2024 12:42

Большой! Приятно слышать, что проблема решена

Dileep Raj Narayan Thumula 08.05.2024 07:14
Ответ принят как подходящий

Я попробовал это, и, похоже, работает хорошо.

scale = 100000000000000 #10^14
df.withColumn("id",(monotonically_increasing_id() * scale +col("batch").cast("bigint")).cast("bigint"),)

Это работает и не обнаруживает коллизий. Пакет — это временная метка.

Другие вопросы по теме