Я создал фрейм данных pandas следующим образом:
import pandas as pd
import numpy as np
ds = { 'trend' : [1,1,1,1,2,2,3,3,3,3,3,3,4,4,4,4,4], 'price' : [23,43,56,21,43,55,54,32,9,12,11,12,23,3,2,1,1]}
df = pd.DataFrame(data=ds)
Кадр данных выглядит следующим образом:
display(df)
trend price
0 1 23
1 1 43
2 1 56
3 1 21
4 2 43
5 2 55
6 3 54
7 3 32
8 3 9
9 3 12
10 3 11
11 3 12
12 4 23
13 4 3
14 4 2
15 4 1
16 4 1
Я сохранил фрейм данных в файле .csv с именем df.csv
:
df.to_csv("df.csv", index = False)
Затем я создал функцию, которая рассчитывает индекс относительной силы (RSI — см.: https://www.investopedia.com/terms/r/rsi.asp):
def get_RSI(df, column, time_window):
"""Return the RSI indicator for the specified time window."""
diff = df[column].diff(1)
# This preservers dimensions off diff values.
up_chg = 0 * diff
down_chg = 0 * diff
# Up change is equal to the positive difference, otherwise equal to zero.
up_chg[diff > 0] = diff[diff > 0]
# Down change is equal to negative deifference, otherwise equal to zero.
down_chg[diff < 0] = diff[diff < 0]
# We set com = time_window-1 so we get decay alpha=1/time_window.
up_chg_avg = up_chg.ewm(com=time_window - 1,
min_periods=time_window).mean()
down_chg_avg = down_chg.ewm(com=time_window - 1,
min_periods=time_window).mean()
RS = abs(up_chg_avg / down_chg_avg)
df['RSI'] = 100 - 100 / (1 + RS)
df = df[['RSI']]
return df
Мне нужно создать новое поле под названием RSI
, которое:
price
, наблюдаемый на каждой итерации и последней
цены (длина RSI в этом примере равна 3), наблюдавшиеся в предыдущих трендах.Например:
Затем я написал этот код:
rsi = []
for i in range(len(df)):
ds = pd.read_csv("df.csv", nrows=i+1)
print(ds.info())
d = ds.groupby(['trend'], as_index=False).agg(
{'price':'last'})
get_RSI(d,'price',3)
rsi.append(d['RSI'].iloc[-1])
df['RSI'] = rsi
Набор данных выглядит правильно:
display(df)
trend price RSI
0 1 23 NaN
1 1 43 NaN
2 1 56 NaN
3 1 21 NaN
4 2 43 NaN
5 2 55 NaN
6 3 54 NaN
7 3 32 NaN
8 3 9 NaN
9 3 12 NaN
10 3 11 NaN
11 3 12 NaN
12 4 23 47.667343
13 4 3 28.631579
14 4 2 28.099174
15 4 1 27.586207
16 4 1 27.586207
Проблема в том, что мне нужно обработать около 4 миллионов записей, и это займет примерно 60 часов.
Кто-нибудь знает, как получить те же результаты быстро и эффективно?
да, конечно, это возможно
обновил код, теперь 4 миллиона рассчитываются за две минуты.
В вопросе не было до конца ясно, чего пытался достичь анализ — существует высокая вероятность того, что существует более эффективный метод, если это понять.
Используя приведенный ниже подход, мне удалось сократить время с 5 часов до 35 минут в смоделированном наборе данных, при этом используя ту же методологию.
Я смоделировал ваш набор данных 4 м следующим образом:
import random
from tqdm import tqdm
random.seed(1337)
trends = [x for x in range(1000)]
ds = {
# sorted as I assume the 'trend' is the time indicator.
"trend": sorted(
[trends[int((random.random() * 1000))] for _ in tqdm(range(int(4e6)))]
),
"price": [random.random() * 100 for _ in tqdm(range(int(4e6)))],
}
df = pd.DataFrame(data=ds)
df.to_csv(
"fake_data.csv", index=True
) # reccommend keeping index to keep track of the time series
Прежде чем перебирать df, я создал список декартовых произведений (список списков), где каждое декартово произведение представляет собой фрагмент исходного df, для которого вы рассчитываете RSI. Затем мы можем перебирать этот список, чтобы получить каждый необходимый фрагмент df.
import pandas as pd
from copy import deepcopy
from tqdm import tqdm
df = pd.read_csv("./fake_data.csv")
time_window = 3
def get_trend_ranges(df: pd.DataFrame, trends: list) -> dict[int, tuple]:
"""
trends - list of unique trend values
returns - start and end index of each range (assume df sorted by range)
"""
trends_ranges = {}
for trend in trends:
df_trend = df[df["trend"] == trend]
start_idx = df_trend.iloc[0].name
end_idx = df_trend.iloc[-1].name
trends_ranges[trend] = (start_idx, end_idx)
return trends_ranges
def _get_idx_slices(trends: list, trends_ranges: dict, time_window) -> list[list]:
"""
trends - list of unique trend values
trends_ranges - {trend: (start_index, end idx)}
returns [[idx1, idx2, ..,idxt+1], [idx11, idx22, ..., idx2t+1],..] where t is time_window
"""
if time_window >= len(trends) or time_window <= 0:
raise Exception("Time Window not within bounds")
else:
unique_trend_analyses = len(trends) - time_window
idx_lst = []
for i in tqdm(range(unique_trend_analyses)):
fixed_t = trends[i : i + time_window]
fixed_idx = [trends_ranges[trend][1] for trend in fixed_t]
var_t = trends[i + time_window]
var_t_start_idx, var_t_stop_idx = (
trends_ranges[var_t][0],
trends_ranges[var_t][1] + 1,
)
for j in range(var_t_start_idx, var_t_stop_idx):
full_idx = deepcopy(fixed_idx)
full_idx.append(j)
idx_lst.append(full_idx)
return idx_lst
# in order to store ranges of each trend, we assume trend is sorted.
def get_idx_slices(df, time_window):
trends = df["trend"].unique().tolist()
trends_ranges = get_trend_ranges(df, trends)
idx_slices = _get_idx_slices(trends, trends_ranges, time_window)
return idx_slices
Теперь мы можем перебирать список idx, чтобы использовать нужные нам фрагменты df.
def get_RSI(df, time_window):
"""Return the RSI indicator for the specified time window."""
up_chg = df.apply(lambda x: x if x > 0 else 0)
down_chg = df.apply(lambda x: x if x < 0 else 0)
# We set com = time_window-1 so we get decay alpha=1/time_window.
up_chg_avg = up_chg.ewm(com=time_window - 1, min_periods=time_window).mean()
down_chg_avg = down_chg.ewm(com=time_window - 1, min_periods=time_window).mean()
RS = abs(up_chg_avg / down_chg_avg)
RSI = 100 - 100 / (1 + RS)
return RSI.iloc[-1]
df["RSI"] = ""
idx_lst = get_idx_slices(df, time_window)
for idx in tqdm(idx_lst):
df_slice = df.iloc[idx]
df_slice_diff = df_slice["price"].diff()
RSI = get_RSI(df_slice_diff, time_window)
last_idx = idx[len(idx) - 1]
df.loc[last_idx, "RSI"] = RSI
Я частично воспользовался вашим подходом, исключив вызов на каждой линии, получилось гораздо быстрее.
Вместо того, чтобы читать файл по каждой строке в цикле, лучше сделать так:
ds = df.loc[:i]
Обновление 27.07.2024 Частично использовал то, что сделал dydev с range_group. Снятие звонка на каждой линии. Использование векторных операций. В результате за две минуты пересчитывается 4 миллиона строк. Это сработает, если тренды упорядочены. 1, 2, 3 и т. д., а не 1, 2, 3, 1, например. Перенес предыдущую версию вниз.
Генерация данных
n = 15
trend = np.random.randint(low=1, high=7, size=(n))
price = np.random.uniform(low=1, high=100, size=(n))
df = pd.DataFrame({'trend': trend, 'price': price})
df = df.sort_values(by=['trend']).reset_index(drop=True)
Сам код расчета:
time_window = 3
trends = df["trend"].unique()
arr = df['price'].values
range_group = np.stack(
[df[df["trend"] == trend].index.values.take([0, -1]) for trend in trends]
)
price = np.full((len(df), trends.size), np.nan)
prev = arr[range_group[:time_window, 1]]
for i in range(time_window, len(trends)):
stop = range_group[i, 1] + 1
price[range_group[i, 0]:stop, -1] = arr[range_group[i, 0]:stop]
price[range_group[i, 0]:stop, -(prev.size+1):-1] = prev
prev = price[range_group[i, 1], -(prev.size+1):]
price = price[range_group[time_window, 0]:]
diff = np.diff(price, axis=1)
up_chg = 0 * diff
down_chg = 0 * diff
up_chg[diff > 0] = diff[diff > 0]
down_chg[diff < 0] = diff[diff < 0]
up_chg_avg = pd.DataFrame(up_chg).T.ewm(
com=time_window - 1, min_periods=time_window).mean().iloc[-1]
down_chg_avg = pd.DataFrame(down_chg).T.ewm(
com=time_window - 1, min_periods=time_window).mean().iloc[-1]
RS = np.abs(up_chg_avg / down_chg_avg)
df.loc[range_group[time_window, 0]:, 'newRSI'] = (100 - 100 / (1 + RS)).values
Предыдущая версия.
Колонка ind
создана для получения indexes
в map
.
Создается пустой массив arr
, который заполняется в каждой строке func_numpy
. Остальное — это ваши операции, которые преобразуются в векторные, исключаются из обработки построчно и выполняются за один раз.
Я также проверил, что рассчитанные значения RSI совпадают с вашим алгоритмом.
df['ind'] = df.index
size_trend = df['trend'].unique().size
arr = np.full((len(df), size_trend), np.nan)
def func_numpy(x):
row_last = df.loc[:x].groupby('trend')['price'].last().values
arr[x, -row_last.size:] = row_last
df['ind'].map(func_numpy)
diff = np.diff(arr, axis=1)
up_chg = 0 * diff
down_chg = 0 * diff
up_chg[diff > 0] = diff[diff > 0]
down_chg[diff < 0] = diff[diff < 0]
up_chg_avg = pd.DataFrame(up_chg).T.ewm(
com=time_window - 1, min_periods=time_window).mean().iloc[-1].values
down_chg_avg = pd.DataFrame(down_chg).T.ewm(
com=time_window - 1, min_periods=time_window).mean().iloc[-1].values
RS = np.abs(up_chg_avg / down_chg_avg)
df['newRSI'] = 100 - 100 / (1 + RS)
Возможны ли строки с трендом 5, 6 и т. д. с одинаковым time_window =3?