Моя цель - показать данные (полученные из файла csv) за каждые 15 минут в день.
Решение, которое я придумал, - это запрос sql, который создает нужные мне данные:
select
dateadd(minute, datediff(minute, 0, cast ([date] + ' ' + [time] as datetime2) ) / 15 * 15, 0) as dateInterval,
SecurityDesc,
StartPrice,
SUM(CAST(TradedVolume as decimal(18,2))) as totalTradedVolume,
SUM(cast(NumberOfTrades as int)) as totalNumberOfTrades,
ROW_NUMBER() over(PARTITION BY dateadd(minute, datediff(minute, 0, cast ([date] + ' ' + [time] as datetime) ) / 15 * 15, 0) ORDER BY Date) as rn
from MyTable
group by [date],[time],SecurityDesc,StartPrice
Но как только я хочу использовать это в моем коде на Python для Spark, он жалуется на dateiff / dateadd и даже приводит к datetime.
Я понимаю, что, вероятно, не вижу функций sql, но я импортировал:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
from pyspark.sql import Row
import pyspark.sql.functions as F
from datetime import datetime as d
from pyspark.sql.functions import datediff, to_date, lit
Что мне делать, чтобы все заработало? Я предпочитаю, чтобы мой запрос работал, если не как вообще я могу отображать агрегированные данные за каждые 15 минут в искровом питоне?
ОБНОВЛЕНИЕ: ищу результат, например:
Вы импортировали функции с псевдонимом (что, на мой взгляд, является хорошей практикой):
import pyspark.sql.functions as F
Это означает, что вам нужно использовать переменную F
для использования импортированных функций, таких как F.to_date
. Используемые вами функции являются функциями SQL-запросов и не относятся к фактическим функциям, доступным в pyspark.sql.functions (список доступных функций см. В документации здесь)
Чтобы решить вашу проблему в Spark, я бы использовал dataFrame, а затем работал бы над ним, чтобы вычислить ваши результаты с помощью искровых функций.
P.S в следующий раз лучше опубликовать фактическое сообщение об ошибке, чем заявлять, что искра "жалуется";)