У меня есть таблица с логическим полем, которое меня волнует, а также диапазоны дат, которые эти поля актуальны. Эти диапазоны дат могут перекрываться, могут быть полностью внутренними по отношению к другим диапазонам дат и могут иметь промежутки.
Меня интересуют только строки, в которых находится логическое поле true, но мне нужно создать непересекающиеся диапазоны дат, чтобы я мог разумно присоединиться к своим данным, не создавая повторяющихся записей.
то есть мои данные могут выглядеть так:
user | is_relevant | from_date | to_date | description
-----|-------------|------------|------------|-------------
1 | true | 2024-01-01 | 2024-02-01 | month of january
1 | true | 2024-01-02 | 2024-01-03 | subset of january
1 | true | 2024-01-15 | 2024-02-15 | mid-jan to mid-feb
1 | true | 2024-03-01 | 2024-04-01 | distinct date range
Мне нужно добраться до таблицы с двумя строками: одна от 2024-01-01 до 2024-02-15 и другая от 2024-03-01 до 2024-04-01.
Любая помощь вообще будет оценена по достоинству. Если это имеет значение, я запускаю это в Spark SQL, но решение PySpark подойдет.


Чтобы добиться желаемого результата, вам необходимо объединить перекрывающиеся диапазоны дат, где is_relevant истинно. Это можно эффективно сделать в PySpark. Ниже приведен пошаговый подход к объединению перекрывающихся интервалов:
from_date.Вот код PySpark для этого:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, collect_list, struct
from pyspark.sql.types import DateType
# Initialize Spark session
spark = SparkSession.builder.appName("MergeIntervals").getOrCreate()
# Sample data
data = [
(1, True, "2024-01-01", "2024-02-01", "month of january"),
(1, True, "2024-01-02", "2024-01-03", "subset of january"),
(1, True, "2024-01-15", "2024-02-15", "mid-jan to mid-feb"),
(1, True, "2024-03-01", "2024-04-01", "distinct date range")
]
# Create DataFrame
df = spark.createDataFrame(data, ["user", "is_relevant", "from_date", "to_date", "description"])
# Convert string dates to DateType
df = df.withColumn("from_date", col("from_date").cast(DateType())) \
.withColumn("to_date", col("to_date").cast(DateType()))
# Filter only rows where is_relevant is True
df = df.filter(col("is_relevant") == True)
# Collect and sort the intervals
intervals = df.select("from_date", "to_date").orderBy("from_date").collect()
# Function to merge overlapping intervals
def merge_intervals(intervals):
merged = []
for interval in intervals:
if not merged or merged[-1][1] < interval[0]:
merged.append(interval)
else:
merged[-1] = (merged[-1][0], max(merged[-1][1], interval[1]))
return merged
# Apply the merge_intervals function
merged_intervals = merge_intervals([(row["from_date"], row["to_date"]) for row in intervals])
# Convert merged intervals back to DataFrame
merged_df = spark.createDataFrame(merged_intervals, ["from_date", "to_date"])
# Show the result
merged_df.show()
is_relevant равно True, и преобразуйте столбцы даты из строк в DateType.from_date.merge_intervals для объединения перекрывающихся или соседних интервалов. Эта функция перебирает отсортированные интервалы и объединяет их, если они перекрываются.Это решение гарантирует объединение перекрывающихся и смежных диапазонов дат, в результате чего образуются непересекающиеся интервалы, которые можно использовать для обратного объединения с данными без создания повторяющихся записей.
Это классическая gaps-and-islands проблема.
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
col,
collect_list,
struct,
row_number,
last,
min,
max,
when,
lit,
sum,
)
from pyspark.sql.types import DateType
from pyspark.sql.window import Window
# Initialize Spark session
spark = SparkSession.builder.appName("MergeIntervals").getOrCreate()
# Sample data
data = [
(1, True, "2024-01-01", "2024-02-01", "month of january"),
(1, True, "2024-01-02", "2024-01-03", "subset of january"),
(1, True, "2024-01-15", "2024-02-15", "mid-jan to mid-feb"),
(1, True, "2024-03-01", "2024-04-01", "distinct date range")
]
# Create DataFrame
df = spark.createDataFrame(data, ["user", "is_relevant", "from_date", "to_date", "description"])
# Convert string dates to DateType
df = df.withColumn("start", col("from_date").cast(DateType())) \
.withColumn("end", col("to_date").cast(DateType()))
# Filter only rows where is_relevant is True
df = df.where(col("is_relevant"))
df = (
df.withColumn("row_num", row_number().over(Window.orderBy("start", "end")))
.withColumn(
"previous_end",
max(col("end")).over(
Window.partitionBy("user")
.orderBy("start", "end")
.rowsBetween(Window.unboundedPreceding, Window.currentRow - 1)
),
)
.withColumn("island_start_indicator", when(col("previous_end") >= col("start"), lit(0)).otherwise(lit(1)))
.withColumn("island_id", sum("island_start_indicator").over(Window.orderBy("start", "end")))
.withColumn("island_min_start", min("start").over(Window.partitionBy('island_id')))
.withColumn("island_max_end", max("end").over(Window.partitionBy('island_id')))
.select('user', 'is_relevant', col('island_min_start').alias('start_date'), col('island_max_end').alias('end_date'))
.dropDuplicates()
)
df.show()
+----+-----------+----------+----------+
|user|is_relevant|start_date| end_date|
+----+-----------+----------+----------+
| 1| true|2024-01-01|2024-02-15|
| 1| true|2024-03-01|2024-04-01|
+----+-----------+----------+----------+
Благодаря вышеизложенному мне удалось запустить это в Spark SQL:
with source_data as (
select
group_key
, thing_i_care_about
, from_date
, date_add(to_date, 1) as closed_to_date
from my_table
)
, windowed as (
select
group_key
, thing_i_care_about
, from_date
, closed_to_date
, lag(closed_to_date) over group_window as previous_to_date
from source_data
window group_window as (
partition by group_key, thing_i_care_about
order by from_date, closed_to_date
)
)
, islands as (
select
group_key
, think_i_care_about
, from_date
, closed_to_date
-- This is where the magic happens, I had to pen/paper this to be sure
, sum(
case when previous_to_date = from_date then 0 else 1 end
) over (
partition by group_key, thing_i_care_about
order by from_date, closed_to_date
rows between unbounded preceding and 0 preceding
) as island_id
, is_basic
from windowed
)
select distinct
group_key
, thing_i_care_about
, min(from_date) over w as island_min_start
, max(closed_to_date) over w as island_max_end
from islands
window w as (
partition by group_key, thing_i_care_about, island_id
)
Примечание: мне нужно было обновиться до закрытого to_date, чтобы previous_to_date = from_date для островов. Я мог бы провести сравнение с from_date - 1, но это было бы уродливее.
Спасибо! С небольшой доработкой у меня все получилось :)