Как рассчитать индекс относительной силы (RSI) с помощью итераций записи в фрейме данных pandas

Я создал фрейм данных pandas следующим образом:

import pandas as pd
import numpy as np
    
ds = { 'trend' : [1,1,1,1,2,2,3,3,3,3,3,3,4,4,4,4,4], 'price' : [23,43,56,21,43,55,54,32,9,12,11,12,23,3,2,1,1]}

df = pd.DataFrame(data=ds)

Кадр данных выглядит следующим образом:

display(df)

   trend    price
0   1        23
1   1        43
2   1        56
3   1        21
4   2        43
5   2        55
6   3        54
7   3        32 
8   3         9
9   3        12
10  3        11
11  3        12
12  4        23
13  4         3
14  4         2
15  4         1
16  4         1

Я сохранил фрейм данных в файле .csv с именем df.csv:

df.to_csv("df.csv", index = False)

Затем я создал функцию, которая рассчитывает индекс относительной силы (RSI — см.: https://www.investopedia.com/terms/r/rsi.asp):

def get_RSI(df, column, time_window):
    """Return the RSI indicator for the specified time window."""
    diff = df[column].diff(1)

    # This preservers dimensions off diff values.
    up_chg = 0 * diff
    down_chg = 0 * diff

    # Up change is equal to the positive difference, otherwise equal to zero.
    up_chg[diff > 0] = diff[diff > 0]

    # Down change is equal to negative deifference, otherwise equal to zero.
    down_chg[diff < 0] = diff[diff < 0]

    # We set com = time_window-1 so we get decay alpha=1/time_window.
    up_chg_avg = up_chg.ewm(com=time_window - 1,
                            min_periods=time_window).mean()
    down_chg_avg = down_chg.ewm(com=time_window - 1,
                                min_periods=time_window).mean()

    RS = abs(up_chg_avg / down_chg_avg)
    df['RSI'] = 100 - 100 / (1 + RS)
    df = df[['RSI']]
    return df

Мне нужно создать новое поле под названием RSI, которое:

  1. перебирает каждую запись кадра данных
  2. вычисляет RSI, учитывая price, наблюдаемый на каждой итерации и последней цены (длина RSI в этом примере равна 3), наблюдавшиеся в предыдущих трендах.

Например:

  • Я повторяю запись 0, а RSI равен NaN (отсутствует).
  • Я повторяю запись 1, и RSI все еще NaN (отсутствует)
  • Я повторяю запись 12, и RSI равен 47,667343 (он учитывает цену записи 3, цену записи 5 и цену записи 12).
  • Я повторяю запись 13, и RSI равен 28,631579 (он учитывает цену записи 3, цену записи 5 и цену записи 13).
  • Я повторяю запись 15, и RSI равен 27,586207 (он учитывает цену записи 3, цену записи 5 и цену записи 15).
  • и так далее .....

Затем я написал этот код:

rsi = []

for i in range(len(df)):

    ds = pd.read_csv("df.csv", nrows=i+1)
    print(ds.info())
    d = ds.groupby(['trend'], as_index=False).agg(
                                                    {'price':'last'})

    get_RSI(d,'price',3)
    rsi.append(d['RSI'].iloc[-1])

df['RSI'] = rsi

Набор данных выглядит правильно:

display(df)

  trend price   RSI
0   1   23     NaN
1   1   43     NaN
2   1   56     NaN
3   1   21     NaN
4   2   43     NaN
5   2   55     NaN
6   3   54     NaN
7   3   32     NaN
8   3   9      NaN
9   3   12     NaN
10  3   11     NaN
11  3   12     NaN
12  4   23     47.667343
13  4   3      28.631579
14  4   2      28.099174
15  4   1      27.586207
16  4   1      27.586207

Проблема в том, что мне нужно обработать около 4 миллионов записей, и это займет примерно 60 часов.

Кто-нибудь знает, как получить те же результаты быстро и эффективно?

Возможны ли строки с трендом 5, 6 и т. д. с одинаковым time_window =3?

inquirer 21.07.2024 18:20

да, конечно, это возможно

Giampaolo Levorato 22.07.2024 14:26

обновил код, теперь 4 миллиона рассчитываются за две минуты.

inquirer 27.07.2024 12:34
Почему в Python есть оператор "pass"?
Почему в Python есть оператор "pass"?
Оператор pass в Python - это простая концепция, которую могут быстро освоить даже новички без опыта программирования.
Некоторые методы, о которых вы не знали, что они существуют в Python
Некоторые методы, о которых вы не знали, что они существуют в Python
Python - самый известный и самый простой в изучении язык в наши дни. Имея широкий спектр применения в области машинного обучения, Data Science,...
Основы Python Часть I
Основы Python Часть I
Вы когда-нибудь задумывались, почему в программах на Python вы видите приведенный ниже код?
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
Алиса и Боб имеют неориентированный граф из n узлов и трех типов ребер:
Оптимизация кода с помощью тернарного оператора Python
Оптимизация кода с помощью тернарного оператора Python
И последнее, что мы хотели бы показать вам, прежде чем двигаться дальше, это
Советы по эффективной веб-разработке с помощью Python
Советы по эффективной веб-разработке с помощью Python
Как веб-разработчик, Python может стать мощным инструментом для создания эффективных и масштабируемых веб-приложений.
1
3
122
2
Перейти к ответу Данный вопрос помечен как решенный

Ответы 2

Ответ принят как подходящий

В вопросе не было до конца ясно, чего пытался достичь анализ — существует высокая вероятность того, что существует более эффективный метод, если это понять.

Используя приведенный ниже подход, мне удалось сократить время с 5 часов до 35 минут в смоделированном наборе данных, при этом используя ту же методологию.

Я смоделировал ваш набор данных 4 м следующим образом:

import random
from tqdm import tqdm

random.seed(1337)
trends = [x for x in range(1000)]

ds = {
    # sorted as I assume the 'trend' is the time indicator.
    "trend": sorted(
        [trends[int((random.random() * 1000))] for _ in tqdm(range(int(4e6)))]
    ),
    "price": [random.random() * 100 for _ in tqdm(range(int(4e6)))],
}

df = pd.DataFrame(data=ds)
df.to_csv(
    "fake_data.csv", index=True
)  # reccommend keeping index to keep track of the time series

Прежде чем перебирать df, я создал список декартовых произведений (список списков), где каждое декартово произведение представляет собой фрагмент исходного df, для которого вы рассчитываете RSI. Затем мы можем перебирать этот список, чтобы получить каждый необходимый фрагмент df.

import pandas as pd
from copy import deepcopy
from tqdm import tqdm

df = pd.read_csv("./fake_data.csv")
time_window = 3


def get_trend_ranges(df: pd.DataFrame, trends: list) -> dict[int, tuple]:
    """
    trends - list of unique trend values
    returns - start and end index of each range (assume df sorted by range)
    """
    trends_ranges = {}
    for trend in trends:
        df_trend = df[df["trend"] == trend]
        start_idx = df_trend.iloc[0].name
        end_idx = df_trend.iloc[-1].name
        trends_ranges[trend] = (start_idx, end_idx)
    return trends_ranges


def _get_idx_slices(trends: list, trends_ranges: dict, time_window) -> list[list]:
    """
    trends - list of unique trend values
    trends_ranges - {trend: (start_index, end idx)}
    returns [[idx1, idx2, ..,idxt+1], [idx11, idx22, ..., idx2t+1],..] where t is time_window
    """
    if time_window >= len(trends) or time_window <= 0:
        raise Exception("Time Window not within bounds")
    else:
        unique_trend_analyses = len(trends) - time_window
        idx_lst = []
        for i in tqdm(range(unique_trend_analyses)):
            fixed_t = trends[i : i + time_window]
            fixed_idx = [trends_ranges[trend][1] for trend in fixed_t]
            var_t = trends[i + time_window]
            var_t_start_idx, var_t_stop_idx = (
                trends_ranges[var_t][0],
                trends_ranges[var_t][1] + 1,
            )
            for j in range(var_t_start_idx, var_t_stop_idx):
                full_idx = deepcopy(fixed_idx)
                full_idx.append(j)
                idx_lst.append(full_idx)
    return idx_lst


# in order to store ranges of each trend, we assume trend is sorted.
def get_idx_slices(df, time_window):
    trends = df["trend"].unique().tolist()
    trends_ranges = get_trend_ranges(df, trends)
    idx_slices = _get_idx_slices(trends, trends_ranges, time_window)
    return idx_slices

Теперь мы можем перебирать список idx, чтобы использовать нужные нам фрагменты df.

def get_RSI(df, time_window):
    """Return the RSI indicator for the specified time window."""

    up_chg = df.apply(lambda x: x if x > 0 else 0)
    down_chg = df.apply(lambda x: x if x < 0 else 0)

    # We set com = time_window-1 so we get decay alpha=1/time_window.
    up_chg_avg = up_chg.ewm(com=time_window - 1, min_periods=time_window).mean()
    down_chg_avg = down_chg.ewm(com=time_window - 1, min_periods=time_window).mean()

    RS = abs(up_chg_avg / down_chg_avg)
    RSI = 100 - 100 / (1 + RS)
    return RSI.iloc[-1]


df["RSI"] = ""

idx_lst = get_idx_slices(df, time_window)

for idx in tqdm(idx_lst):
    df_slice = df.iloc[idx]
    df_slice_diff = df_slice["price"].diff()

    RSI = get_RSI(df_slice_diff, time_window)
    last_idx = idx[len(idx) - 1]

    df.loc[last_idx, "RSI"] = RSI

Я частично воспользовался вашим подходом, исключив вызов на каждой линии, получилось гораздо быстрее.

inquirer 27.07.2024 12:36

Вместо того, чтобы читать файл по каждой строке в цикле, лучше сделать так:

ds = df.loc[:i]

Обновление 27.07.2024 Частично использовал то, что сделал dydev с range_group. Снятие звонка на каждой линии. Использование векторных операций. В результате за две минуты пересчитывается 4 миллиона строк. Это сработает, если тренды упорядочены. 1, 2, 3 и т. д., а не 1, 2, 3, 1, например. Перенес предыдущую версию вниз.

Генерация данных

n = 15
trend = np.random.randint(low=1, high=7, size=(n))
price = np.random.uniform(low=1, high=100, size=(n))
df = pd.DataFrame({'trend': trend, 'price': price})
df = df.sort_values(by=['trend']).reset_index(drop=True)

Сам код расчета:

time_window = 3
trends = df["trend"].unique()

arr = df['price'].values

range_group = np.stack(
    [df[df["trend"] == trend].index.values.take([0, -1]) for trend in trends]
)
price =  np.full((len(df), trends.size), np.nan)
prev = arr[range_group[:time_window, 1]]

for i in range(time_window, len(trends)):
    stop = range_group[i, 1] + 1
    price[range_group[i, 0]:stop, -1] = arr[range_group[i, 0]:stop]
    price[range_group[i, 0]:stop, -(prev.size+1):-1] = prev
    prev = price[range_group[i, 1], -(prev.size+1):]


price = price[range_group[time_window, 0]:]
diff = np.diff(price, axis=1)

up_chg = 0 * diff
down_chg = 0 * diff
up_chg[diff > 0] = diff[diff > 0]
down_chg[diff < 0] = diff[diff < 0]

up_chg_avg = pd.DataFrame(up_chg).T.ewm(
    com=time_window - 1, min_periods=time_window).mean().iloc[-1]
down_chg_avg = pd.DataFrame(down_chg).T.ewm(
    com=time_window - 1, min_periods=time_window).mean().iloc[-1]


RS = np.abs(up_chg_avg / down_chg_avg)
df.loc[range_group[time_window, 0]:, 'newRSI'] = (100 - 100 / (1 + RS)).values

Предыдущая версия. Колонка ind создана для получения indexes в map. Создается пустой массив arr, который заполняется в каждой строке func_numpy. Остальное — это ваши операции, которые преобразуются в векторные, исключаются из обработки построчно и выполняются за один раз.

Я также проверил, что рассчитанные значения RSI совпадают с вашим алгоритмом.

df['ind'] = df.index
size_trend = df['trend'].unique().size
arr =  np.full((len(df), size_trend), np.nan)

def func_numpy(x):
    row_last = df.loc[:x].groupby('trend')['price'].last().values
    arr[x, -row_last.size:] = row_last

df['ind'].map(func_numpy)

diff = np.diff(arr, axis=1)

up_chg = 0 * diff
down_chg = 0 * diff
up_chg[diff > 0] = diff[diff > 0]
down_chg[diff < 0] = diff[diff < 0]

up_chg_avg = pd.DataFrame(up_chg).T.ewm(
    com=time_window - 1, min_periods=time_window).mean().iloc[-1].values
down_chg_avg = pd.DataFrame(down_chg).T.ewm(
    com=time_window - 1, min_periods=time_window).mean().iloc[-1].values

RS = np.abs(up_chg_avg / down_chg_avg)
df['newRSI'] = 100 - 100 / (1 + RS)

Другие вопросы по теме

Похожие вопросы

Как устранить ошибку «Необходимо указать, как согласовать расходящиеся ветки» при перебазировании в Git?
Как сгруппировать строки на основе идентификатора столбца в фрейме данных pandas?
Интерпретация битовых данных в Python с помощью структур и ctypes: невыровненные результаты
Мой чат-сервер Python не отправляет сообщения должным образом
Как сообщить средству проверки типов тип вывода внешней функции
Разделить фрейм данных pandas на основе заданной строки строки
Как объединить подсказку типа с использованием переменной связанного типа и статических типов для максимальной гибкости?
Сравните строки из очень большого текстового файла (более 100 ГБ) с небольшим текстовым файлом (около 30 строк) и распечатайте все строки, содержащиеся в обоих файлах
Переместить часть строки в фрейме данных в новую строку
Как немедленно отменить задачу Asyncio, которая использует библиотеку Ollama Python для генерации ответа?