Взорвать даты и заполнить строки в фрейме данных pyspark

У меня есть этот фрейм данных:

+---+----------+------+
| id|      date|amount|
+---+----------+------+
|123|2022-11-11|100.00|
|123|2022-11-12|100.00|
|123|2022-11-13|100.00|
|123|2022-11-14|200.00|
|456|2022-11-14|300.00|
|456|2022-11-15|300.00|
|456|2022-11-16|300.00|
|789|2022-11-11|400.00|
|789|2022-11-12|500.00|
+---+----------+------+

Мне нужно создавать новые записи для каждой даты до current_date() - 2. И значение, которое будет заполнено, должно быть самым последним.

Например, если date_sub(current_date(), 2) == "2022-11-16", то мне нужен следующий фрейм данных:

+------+----------+-------+
|id    |    date  | amount|
+------+----------+-------+
|   123|2022-11-11|100,00 |
|   123|2022-11-12|100,00 |
|   123|2022-11-13|100,00 |
|   123|2022-11-14|200,00 |
|   123|2022-11-15|200,00 |
|   123|2022-11-16|200,00 |
|   456|2022-11-14|300,00 |
|   456|2022-11-15|300,00 |
|   456|2022-11-16|300,00 |
|   789|2022-11-11|400,00 |
|   789|2022-11-12|500,00 |
|   789|2022-11-13|500,00 |
|   789|2022-11-14|500,00 |
|   789|2022-11-15|500,00 |
|   789|2022-11-16|500,00 |
+------+----------+-------+
import findspark
findspark.init()

import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local[4]").appName("Complete Rows").getOrCreate()

from pyspark.sql.functions import *
from pyspark.sql.types import StructType,StructField, IntegerType, DateType, DecimalType
from datetime import datetime
from decimal import Decimal

vdata = [
    (123,datetime.strptime('2022-11-11','%Y-%m-%d'),Decimal(100)),
    (123,datetime.strptime('2022-11-12','%Y-%m-%d'),Decimal(100)),
    (123,datetime.strptime('2022-11-13','%Y-%m-%d'),Decimal(100)),
    (123,datetime.strptime('2022-11-14','%Y-%m-%d'),Decimal(200)),
    (456,datetime.strptime('2022-11-14','%Y-%m-%d'),Decimal(300)),
    (456,datetime.strptime('2022-11-15','%Y-%m-%d'),Decimal(300)),
    (456,datetime.strptime('2022-11-16','%Y-%m-%d'),Decimal(300)),
    (789,datetime.strptime('2022-11-11','%Y-%m-%d'),Decimal(400)),
    (789,datetime.strptime('2022-11-12','%Y-%m-%d'),Decimal(500))]

schema = StructType([
    StructField("id",IntegerType(),False),
    StructField("date",DateType(),False),
    StructField("amount",DecimalType(10,2),False)])

df = spark.createDataFrame(vdata,schema)

df.show()

Я попытался определить максимальную дату для каждого идентификатора, затем определить последнее значение для этой максимальной даты и выполнить F.expr(sequence), чтобы создать список записей, а затем взорвать, чтобы создать строки, но это не очень хорошо работает. Спасибо за любую помощь, которую вы можете дать!

14 Задание: Типы данных и структуры данных Python для DevOps
14 Задание: Типы данных и структуры данных Python для DevOps
проверить тип данных используемой переменной, мы можем просто написать: your_variable=100
Python PyPDF2 - запись метаданных PDF
Python PyPDF2 - запись метаданных PDF
Python скрипт, который будет записывать метаданные в PDF файл, для этого мы будем использовать PDF ридер из библиотеки PyPDF2 . PyPDF2 - это...
Переменные, типы данных и операторы в Python
Переменные, типы данных и операторы в Python
В Python переменные используются как место для хранения значений. Пример переменной формы:
Почему Python - идеальный выбор для проекта AI и ML
Почему Python - идеальный выбор для проекта AI и ML
Блог, которым поделился Harikrishna Kundariya в нашем сообществе Developer Nation Community.
Как автоматически добавлять котировки в заголовки запросов с помощью PyCharm
Как автоматически добавлять котировки в заголовки запросов с помощью PyCharm
Как автоматически добавлять котировки в заголовки запросов с помощью PyCharm
Анализ продукта магазина на Tokopedia
Анализ продукта магазина на Tokopedia
Tokopedia - это место, где продавцы могут продавать свои товары. Товар должен быть размещен на витрине, чтобы покупателям было легче найти товар...
1
0
103
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Мне удалось найти следующее решение.
Для ясности я разделил его на три этапа; конечно, вы можете написать меньше строк кода, если сделаете их более компактными.

1) Поиск

Создайте таблицу поиска со всеми необходимыми датами (как присутствующими, так и нет) для каждого идентификатора.

import pyspark.sql.functions as F
from pyspark.sql.window import Window

lookup = (df
          .groupby('id')
          .agg(
            F.min('date').alias('start_date'),
            F.date_sub(F.current_date(), 2).alias('end_date')
          )
          .select('id', F.explode(F.expr('sequence(start_date, end_date, interval 1 day)')).alias('date'))
         )
lookup.show()

+---+----------+
| id|      date|
+---+----------+
|123|2022-11-11|
|123|2022-11-12|
|123|2022-11-13|
|123|2022-11-14|
|123|2022-11-15|
|123|2022-11-16|
|456|2022-11-14|
|456|2022-11-15|
|456|2022-11-16|
|789|2022-11-11|
|789|2022-11-12|
|789|2022-11-13|
|789|2022-11-14|
|789|2022-11-15|
|789|2022-11-16|
+---+----------+

2) Присоединяйтесь

После этого мы присоединяем таблицу поиска к нашему исходному фрейму данных: таким образом, необходимые строки добавляются с переменной amount, установленной как нулевая.

df = df.join(lookup, on=['id', 'date'], how='outer')
df.show()

+---+----------+------+
| id|      date|amount|
+---+----------+------+
|123|2022-11-11| 100.0|
|123|2022-11-12| 100.0|
|123|2022-11-13| 100.0|
|123|2022-11-14| 200.0|
|123|2022-11-15|  null|
|123|2022-11-16|  null|
|456|2022-11-14| 300.0|
|456|2022-11-15| 300.0|
|456|2022-11-16| 300.0|
|789|2022-11-11| 400.0|
|789|2022-11-12| 500.0|
|789|2022-11-13|  null|
|789|2022-11-14|  null|
|789|2022-11-15|  null|
|789|2022-11-16|  null|
+---+----------+------+

3) last функция

Мы используем функцию last с ignorenulls=True для получения последнего ненулевого значения в окне, разделенном по идентификатору и упорядоченном по дате.

w = Window.partitionBy('id').orderBy('date').rowsBetween(Window.unboundedPreceding, 0)

df = df.withColumn('amount', F.last('amount', ignorenulls=True).over(w))
df.show()

+---+----------+------+
| id|      date|amount|
+---+----------+------+
|123|2022-11-11| 100.0|
|123|2022-11-12| 100.0|
|123|2022-11-13| 100.0|
|123|2022-11-14| 200.0|
|123|2022-11-15| 200.0|
|123|2022-11-16| 200.0|
|456|2022-11-14| 300.0|
|456|2022-11-15| 300.0|
|456|2022-11-16| 300.0|
|789|2022-11-11| 400.0|
|789|2022-11-12| 500.0|
|789|2022-11-13| 500.0|
|789|2022-11-14| 500.0|
|789|2022-11-15| 500.0|
|789|2022-11-16| 500.0|
+---+----------+------+

Невероятный! Это работало как перчатка! И вы написали просто и поучительно, молодец! Моя жизненная цель - быть такой же, как ты, когда я вырасту! Успех!

TRCL 18.11.2022 17:50

Спасибо за ваши слова, вы меня очень улыбнули :)

Ric S 19.11.2022 00:53

Другие вопросы по теме