Как создать отдельные диапазоны дат из набора диапазонов в sql

У меня есть таблица с логическим полем, которое меня волнует, а также диапазоны дат, которые эти поля актуальны. Эти диапазоны дат могут перекрываться, могут быть полностью внутренними по отношению к другим диапазонам дат и могут иметь промежутки.

Меня интересуют только строки, в которых находится логическое поле true, но мне нужно создать непересекающиеся диапазоны дат, чтобы я мог разумно присоединиться к своим данным, не создавая повторяющихся записей.

то есть мои данные могут выглядеть так:

user | is_relevant | from_date  | to_date    | description
-----|-------------|------------|------------|-------------
1    | true        | 2024-01-01 | 2024-02-01 | month of january
1    | true        | 2024-01-02 | 2024-01-03 | subset of january
1    | true        | 2024-01-15 | 2024-02-15 | mid-jan to mid-feb
1    | true        | 2024-03-01 | 2024-04-01 | distinct date range

Мне нужно добраться до таблицы с двумя строками: одна от 2024-01-01 до 2024-02-15 и другая от 2024-03-01 до 2024-04-01.

Любая помощь вообще будет оценена по достоинству. Если это имеет значение, я запускаю это в Spark SQL, но решение PySpark подойдет.

ReactJs | Supabase | Добавление данных в базу данных
ReactJs | Supabase | Добавление данных в базу данных
Это и есть ваш редактор таблиц в supabase.👇
Понимание Python и переход к SQL
Понимание Python и переход к SQL
Перед нами лабораторная работа по BloodOath:
0
0
81
3
Перейти к ответу Данный вопрос помечен как решенный

Ответы 3

Чтобы добиться желаемого результата, вам необходимо объединить перекрывающиеся диапазоны дат, где is_relevant истинно. Это можно эффективно сделать в PySpark. Ниже приведен пошаговый подход к объединению перекрывающихся интервалов:

  1. Сортировка и сбор данных. Сначала соберите диапазоны дат в список, отсортированный по from_date.
  2. Объединить интервалы: перебирать отсортированный список и объединять перекрывающиеся или соседние интервалы.

Вот код PySpark для этого:

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, collect_list, struct
from pyspark.sql.types import DateType

# Initialize Spark session
spark = SparkSession.builder.appName("MergeIntervals").getOrCreate()

# Sample data
data = [
    (1, True, "2024-01-01", "2024-02-01", "month of january"),
    (1, True, "2024-01-02", "2024-01-03", "subset of january"),
    (1, True, "2024-01-15", "2024-02-15", "mid-jan to mid-feb"),
    (1, True, "2024-03-01", "2024-04-01", "distinct date range")
]

# Create DataFrame
df = spark.createDataFrame(data, ["user", "is_relevant", "from_date", "to_date", "description"])

# Convert string dates to DateType
df = df.withColumn("from_date", col("from_date").cast(DateType())) \
       .withColumn("to_date", col("to_date").cast(DateType()))

# Filter only rows where is_relevant is True
df = df.filter(col("is_relevant") == True)

# Collect and sort the intervals
intervals = df.select("from_date", "to_date").orderBy("from_date").collect()

# Function to merge overlapping intervals
def merge_intervals(intervals):
    merged = []
    for interval in intervals:
        if not merged or merged[-1][1] < interval[0]:
            merged.append(interval)
        else:
            merged[-1] = (merged[-1][0], max(merged[-1][1], interval[1]))
    return merged

# Apply the merge_intervals function
merged_intervals = merge_intervals([(row["from_date"], row["to_date"]) for row in intervals])

# Convert merged intervals back to DataFrame
merged_df = spark.createDataFrame(merged_intervals, ["from_date", "to_date"])

# Show the result
merged_df.show()

Объяснение:

  1. Подготовка данных: инициализируйте сеанс Spark и создайте DataFrame с образцом данных.
  2. Фильтрация и преобразование столбцов даты. Отфильтруйте DataFrame, чтобы включить только строки, где is_relevant равно True, и преобразуйте столбцы даты из строк в DateType.
  3. Сбор и сортировка интервалов. Соберите диапазоны дат в список и отсортируйте их по from_date.
  4. Объединить интервалы: определите функцию merge_intervals для объединения перекрывающихся или соседних интервалов. Эта функция перебирает отсортированные интервалы и объединяет их, если они перекрываются.
  5. Создать объединенный фрейм данных: преобразуйте объединенные интервалы обратно в фрейм данных и отобразите результат.

Это решение гарантирует объединение перекрывающихся и смежных диапазонов дат, в результате чего образуются непересекающиеся интервалы, которые можно использовать для обратного объединения с данными без создания повторяющихся записей.

Ответ принят как подходящий

Это классическая gaps-and-islands проблема.

from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    col,
    collect_list,
    struct,
    row_number,
    last,
    min,
    max,
    when,
    lit,
    sum,
)
from pyspark.sql.types import DateType
from pyspark.sql.window import Window

# Initialize Spark session
spark = SparkSession.builder.appName("MergeIntervals").getOrCreate()

# Sample data
data = [
    (1, True, "2024-01-01", "2024-02-01", "month of january"),
    (1, True, "2024-01-02", "2024-01-03", "subset of january"),
    (1, True, "2024-01-15", "2024-02-15", "mid-jan to mid-feb"),
    (1, True, "2024-03-01", "2024-04-01", "distinct date range")
]

# Create DataFrame
df = spark.createDataFrame(data, ["user", "is_relevant", "from_date", "to_date", "description"])

# Convert string dates to DateType
df = df.withColumn("start", col("from_date").cast(DateType())) \
       .withColumn("end", col("to_date").cast(DateType()))

# Filter only rows where is_relevant is True
df = df.where(col("is_relevant"))

df = (
    df.withColumn("row_num", row_number().over(Window.orderBy("start", "end")))
    .withColumn(
        "previous_end",
        max(col("end")).over(
            Window.partitionBy("user")
            .orderBy("start", "end")
            .rowsBetween(Window.unboundedPreceding, Window.currentRow - 1)
        ),
    )
    .withColumn("island_start_indicator", when(col("previous_end") >= col("start"), lit(0)).otherwise(lit(1)))
    .withColumn("island_id", sum("island_start_indicator").over(Window.orderBy("start", "end")))
    .withColumn("island_min_start", min("start").over(Window.partitionBy('island_id')))
    .withColumn("island_max_end", max("end").over(Window.partitionBy('island_id')))
    .select('user', 'is_relevant', col('island_min_start').alias('start_date'), col('island_max_end').alias('end_date'))
    .dropDuplicates()
)

df.show()

+----+-----------+----------+----------+
|user|is_relevant|start_date|  end_date|
+----+-----------+----------+----------+
|   1|       true|2024-01-01|2024-02-15|
|   1|       true|2024-03-01|2024-04-01|
+----+-----------+----------+----------+

Спасибо! С небольшой доработкой у меня все получилось :)

WhatAmIDoing 23.05.2024 05:28

Благодаря вышеизложенному мне удалось запустить это в Spark SQL:

with source_data as (
    select
        group_key
        , thing_i_care_about
        , from_date
        , date_add(to_date, 1) as closed_to_date
    from my_table
)

, windowed as (
    select
        group_key
        , thing_i_care_about
        , from_date
        , closed_to_date
        , lag(closed_to_date) over group_window as previous_to_date
    from source_data
    window group_window as (
        partition by group_key, thing_i_care_about
        order by from_date, closed_to_date
    )
)

, islands as (
    select
        group_key
        , think_i_care_about
        , from_date
        , closed_to_date
        -- This is where the magic happens, I had to pen/paper this to be sure
        , sum(
            case when previous_to_date = from_date then 0 else 1 end
        ) over (
            partition by group_key, thing_i_care_about
            order by from_date, closed_to_date
            rows between unbounded preceding and 0 preceding
        ) as island_id
        , is_basic
    from windowed
)

select distinct
    group_key
    , thing_i_care_about
    , min(from_date) over w as island_min_start
    , max(closed_to_date) over w as island_max_end
from islands
window w as (
    partition by group_key, thing_i_care_about, island_id
)

Примечание: мне нужно было обновиться до закрытого to_date, чтобы previous_to_date = from_date для островов. Я мог бы провести сравнение с from_date - 1, но это было бы уродливее.

Другие вопросы по теме

Похожие вопросы

Выберите уникальный второй столбец для каждого первого столбца с минимальным значением третьего столбца
Снежинка SQL | Как суммировать различные значения за один проход?
Как фильтровать идентификаторы, где значение другого столбца одинаково для всех строк?
SQL – множественное ЛЕВОЕ ВНЕШНЕЕ СОЕДИНЕНИЕ
У меня есть запрос на создание последовательности дат на основе заданной даты начала. Я хочу выполнить тот же запрос для всех остальных дат и объединить результаты
Восстановите данные из таблицы, воссозданной несколько раз в снежинке
SQL-соединение, отображение нескольких значений в одной строке
Обход дерева с самым длинным путем и несколькими ветвями в реалистичные сроки
Как написать SQL-запрос, который будет выводить валюту каждого первого числа каждого месяца в 2024 году?
Oracle: Как определить триггер, который после каждой вставки вставляет еще одну строку в ту же таблицу?