У меня есть набор данных временных рядов в виде поляры (v0.20.6) pl.DataFrame, который имеет сильную сезонность, которая хорошо прогнозируется и моделируется с помощью (линейных) прогнозов на один час вперед. На данный момент это невероятно медленно, вызывает проблемы с памятью и, в конечном итоге, сбой ядра Python.
В качестве примера я использую строку pl.DataFrame размером ~2 м с ~300 столбцами, которая представляет собой набор данных таймсерии с ежеминутной частотой. На этих данных я выполняю
def ols_slope(y: pl.Expr) -> pl.Expr:
# Calculate linear regression slope
x = y.rank("ordinal")
numerator = ((x - x.mean())*(y - y.mean())).sum()
denominator = ((x - x.mean())**2).sum()
return numerator / denominator
def ols_offset(y: pl.Expr) -> pl.Expr:
# Calculate linear regression offset
x = y.rank("ordinal")
numerator = ((x - x.mean())*(y - y.mean())).sum()
denominator = ((x - x.mean())**2).sum()
return (numerator / denominator) * x.mean() - y.mean()
в столбце date_utc+value, чтобы получить локальную линейную подгонку с помощью
raw_data = (
pl.read_ipc("../data_vault_1min.feather")
.with_columns(
[
pl.col("A1").rolling_map(ols_slope, window_size=60, min_periods=3).alias("A1_hourly_lin_pred"),
pl.col("A1").rolling_map(ols_slope, window_size=60, min_periods=3).alias("A1_hourly_lin_pred")
]
)
)
Насколько я вижу, под капотом это выполнение group_by_dynamic(), которое, как я ожидаю, будет достаточно производительным. Однако эта единственная операция всегда исчерпывает всю мою память и приводит к сбою ядра.
Он хорошо работает с небольшим подмножеством данных <1 тыс. строк, но уже для ~ 50 тыс. строк у меня это занимает более 5 минут и использует все мои 32 ГБ памяти, которые у меня есть.
Помимо моего ответа ниже, возможно, стоит взглянуть на расширение Polars_ds.
Посмотрю - на первый взгляд не имеет отношения к данному проекту, но обязательно пригодится на будущее!
обновил приведенное выше, чтобы сделать его намного более эффективным: def ols_reg(x: pl.Expr, y: pl.Expr, pred_dist: pl.Expr) -> pl.Expr: # Calculate linear regression a * x + b n = y.count() x_sum = x.sum() y_sum = y.sum() x_s_sq = (x**2).sum() xy_sum = (x * y).sum() slope = (n * xy_sum - x_sum * y_sum) / (n * x_s_sq - x_sum**2 + pl.lit(1e-5)) offset = (y_sum - slope * x_sum) / n return slope * (n + pred_dist) + offset





Как отметил @jqurious в комментариях, вместо этого вы можете попробовать использовать pl.DataFrame.rolling.
import polars as pl
import numpy as np
from datetime import datetime, timedelta
N = 100_000
start_date = datetime(2024, 1, 1)
end_date = start_date + timedelta(days=N)
df = pl.DataFrame({
"date": np.arange(start_date, end_date, timedelta(days=1)),
"price": np.linspace(1, 10, N) ** 3,
})
pl.DataFrame.rollingДля агрегирования остальных столбцов в фрейме данных мы используем pl.Expr.last. Это позволит сохранить столбцы незатронутыми агрегацией.
Более того, по состоянию на апрель 2024 года pl.DataFrame.rolling не поддерживает параметры min_periods, как pl.Expr.rolling_map . Тем не менее, мы можем получить такое поведение, используя конструкцию pl.when().then().
(
df
.rolling(index_column = "date", period = "60d", check_sorted=False)
.agg(
pl.exclude("date").last(),
pl.when(pl.len() >= 3).then(ols_slope(pl.col("price")).alias("price_slope")),
pl.when(pl.len() >= 3).then(ols_offset(pl.col("price")).alias("price_offset")),
)
)
shape: (100_000, 4)
┌─────────────────────┬────────────┬─────────────┬──────────────┐
│ date ┆ price ┆ price_slope ┆ price_offset │
│ --- ┆ --- ┆ --- ┆ --- │
│ datetime[μs] ┆ f64 ┆ f64 ┆ f64 │
╞═════════════════════╪════════════╪═════════════╪══════════════╡
│ 2024-01-01 00:00:00 ┆ 1.0 ┆ null ┆ null │
│ 2024-01-02 00:00:00 ┆ 1.00027 ┆ null ┆ null │
│ 2024-01-03 00:00:00 ┆ 1.00054 ┆ 0.00027 ┆ -0.99973 │
│ 2024-01-04 00:00:00 ┆ 1.00081 ┆ 0.00027 ┆ -0.99973 │
│ 2024-01-05 00:00:00 ┆ 1.00108 ┆ 0.00027 ┆ -0.99973 │
│ … ┆ … ┆ … ┆ … │
│ 2297-10-11 00:00:00 ┆ 999.892003 ┆ 0.026984 ┆ -998.272825 │
│ 2297-10-12 00:00:00 ┆ 999.919001 ┆ 0.026984 ┆ -998.299794 │
│ 2297-10-13 00:00:00 ┆ 999.946 ┆ 0.026985 ┆ -998.326764 │
│ 2297-10-14 00:00:00 ┆ 999.973 ┆ 0.026985 ┆ -998.353734 │
│ 2297-10-15 00:00:00 ┆ 1000.0 ┆ 0.026986 ┆ -998.380705 │
└─────────────────────┴────────────┴─────────────┴──────────────┘
Для N = 100_000 на моей машине это занимает ~340 мс — в отличие от ~12,2 с для реализации с использованием pl.Expr.rolling_map.
Спасибо, Херикс, сработало! Единственное, что мне нужно было изменить, это добавить .sort("date"), так как мои поляры 0.20.6 не принимали флаг .rolling(..., check_sorted=False) ¯_(ツ)_/¯
Я считаю, что
rolling_mapимеет свою собственную реализацию (она материализует реальные объекты Series и передает их функции) - отсюда и предупреждения о производительности в документации. Поскольку ваша логика состоит из полярных выражений, нельзя ли это сделать с помощью.rolling()?