Я хочу создать автоматически сгенерированный идентификатор, который должен быть уникальным для всего задания и иметь тип Integer. Если я использую monotonically_increasing_id
, он генерирует уникальный идентификатор, но только для конкретного запуска задания, при запуске следующего пакета он конфликтует с идентификатором.
У меня есть пара столбцов, таких как patchid, которые представляют собой отметку даты и времени, и если я соединим «monotonically_increasing_id», это может сработать, но из-за искрового параллелизма и диапазона bigint могут возникнуть проблемы.
Я также пробовал раздел окна с runid (идентификатором задания), но он не дает мне уникального значения.
Я перечисляю все варианты, которые пробовал, любая помощь может быть полезна.
df = spark.table("table")
windowSpec = Window.partitionBy("runid", "batchid").orderBy("batchid")
df.withColumn("id", (hash(concat(col("id"), lit("_"), col("batchid"), lit("_"), col("runid"))) % 10000000000).cast("bigint"))
df.withColumn("id", (abs(hash(concat(col("batchid"), lit("_"), monotonically_increasing_id(), lit("_")))) % 10000000000).cast("bigint"))
df = df.withColumn(
"Id",
(hash(concat(
format_string("%010d", unix_timestamp(current_timestamp()) * 1000), # Timestamp in milliseconds
format_string("%05d", monotonically_increasing_id() % 100000), # Ensuring uniqueness for the same millisecond, restrict to 5 digits
concat_ws("_", *unique_cols) # Concatenate the unique columns
)) + lit(2**31)).cast("bigint"))
df = df.withColumn("unique_id",
(monotonically_increasing_id() * 1000000000) +
(row_number().over(windowSpec) * 100000) +
col("runid").cast("bigint") +
col("batchid").cast("bigint"))
monotonically_increasing_id()
не гарантирует уникальные значения для разных запусков задания или нескольких этапов одного выполнения задания.
Не рекомендуется вручную объединять столбцы с помощью monotonically_increasing_id()
или других функций, таких как current_timestamp()
и row_number()
, для создания уникального идентификатора. Это может не гарантировать уникальность, особенно при параллельной работе.
Вместо этого вы можете рассмотреть возможность использования комбинации нескольких столбцов для создания составного ключа, обеспечивающего уникальность.
Я попробовал следующий подход:
from pyspark.sql import SparkSession
from pyspark.sql.functions import concat, col, lit, row_number
from pyspark.sql.window import Window
data = [
("2022-01-01", "Job1", "Data1"),
("2022-01-01", "Job1", "Data2"),
("2022-01-02", "Job1", "Data3"),
("2022-01-02", "Job2", "Data4"),
("2022-01-03", "Job2", "Data5"),
("2022-01-03", "Job2", "Data6"),
]
df = spark.createDataFrame(data, ["batchid", "runid", "data"])
windowSpec = Window.partitionBy("runid", "batchid").orderBy("batchid")
df = df.withColumn(
"unique_id",
concat(col("runid"), lit("_"), col("batchid"), lit("_"), row_number().over(windowSpec))
)
df.show()
Результат:
+----------+-----+-----+-----------------+
| batchid|runid| data| unique_id|
+----------+-----+-----+-----------------+
|2022-01-01| Job1|Data1|Job1_2022-01-01_1|
|2022-01-01| Job1|Data2|Job1_2022-01-01_2|
|2022-01-02| Job1|Data3|Job1_2022-01-02_1|
|2022-01-02| Job2|Data4|Job2_2022-01-02_1|
|2022-01-03| Job2|Data6|Job2_2022-01-03_1|
|2022-01-03| Job2|Data5|Job2_2022-01-03_2|
+----------+-----+-----+-----------------+
В приведенном выше коде я использовал комбинацию runid, patchid и row_number()
внутри оконной функции для генерации уникального идентификатора. Этот подход обеспечивает уникальность при различных запусках заданий и обеспечивает параллелизм за счет использования оконной функции для назначения уникальных номеров внутри каждого раздела, определенных runid и Battid.
Используя UUIDs (org.apache.spark.sql.functions.uuid())
, UUID гарантируют уникальность для разных узлов и запусков заданий.
Вот пример:
from pyspark.sql import SparkSession
from pyspark.sql.functions import expr
data = [("Alice", 34), ("Bob", 45), ("Charlie", 37)]
df = spark.createDataFrame(data, ["name", "age"])
df_with_uuid = df.withColumn("uuid", expr("uuid()"))
df_with_uuid.show(truncate=False)
Полученные результаты:
+-------+---+------------------------------------+
|name |age|uuid |
+-------+---+------------------------------------+
|Alice |34 |1634bc71-bec0-4efe-a2f4-793c363110d5|
|Bob |45 |7ec2f25f-da0e-42ac-a52a-c665ccc12350|
|Charlie|37 |b2881250-8b5d-4ec8-8121-d6ec7690416e|
+-------+---+------------------------------------+
Как вы упомянули, проблема в том, что идентификатор должен иметь тип bigint. Вы можете попробовать создать уникальный идентификатор типа bigint, гарантируя при этом уникальность при выполнении заданий, используя комбинацию столбцов и хеширование.
Я попробовал функцию md5
для хэширования объединенной строки, но результат был NULL. Это связано с тем, что функция md5 в PySpark не поддерживает значения хеширования с типом данных BigInt
. Чтобы обойти эту проблему, вы можете привести столбец ID к строковому типу данных перед применением функции md5.
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, concat_ws, md5
df = df.withColumn("unique_id", concat_ws("_", col("runid"), col("batchid"), col("data")))
df = df.withColumn("id", md5(col("unique_id").cast("string")))
+----------+-----+-----+--------------------+--------------------+
| batchid|runid| data| unique_id| id|
+----------+-----+-----+--------------------+--------------------+
|2022-01-01| Job1|Data1|Job1_2022-01-01_D...|fffdd2babb8e9b60b...|
|2022-01-01| Job1|Data2|Job1_2022-01-01_D...|b8a430deba3f61223...|
|2022-01-02| Job1|Data3|Job1_2022-01-02_D...|3af865f308eff7fb8...|
|2022-01-02| Job2|Data4|Job2_2022-01-02_D...|56537c8a822bd1b7f...|
|2022-01-03| Job2|Data5|Job2_2022-01-03_D...|092dd4548f93e0070...|
|2022-01-03| Job2|Data6|Job2_2022-01-03_D...|8e7171be80fe94097...|
+----------+-----+-----+--------------------+--------------------+
Приведя столбец ID к строке (col("unique_id").cast("string"))
, функция md5
получит строковый ввод.
это все еще строка, но мне нужен bigint
Большой! Приятно слышать, что проблема решена
Я попробовал это, и, похоже, работает хорошо.
scale = 100000000000000 #10^14
df.withColumn("id",(monotonically_increasing_id() * scale +col("batch").cast("bigint")).cast("bigint"),)
Это работает и не обнаруживает коллизий. Пакет — это временная метка.
Спасибо, но проблема в том, что идентификатор должен иметь тип bigint, это правило.