Как рассчитать коэффициент корреляции в скользящем окне вектора с помощью numpy?

Я могу рассчитать скользящий коэффициент корреляции для одномерного массива (данные против [0, 1, 2, 3, 4]), используя цикл. Я ищу более разумное решение, используя numpy (не pandas). Вот мой текущий код:

import numpy as np
data = np.array([10,5,8,9,15,22,26,11,15,16,18,7,4,8,-2,-3,-4,-6,-2,0,10,0,5,8])
x = np.zeros_like(data).astype('float32')
length = 5
for i in range(length, data.shape[0]):
    x[i] = np.corrcoef(data[i - length:i], np.arange(length))[0, 1]
print(x)

Х дает:

    [ 0.     0.     0.     0.     0.     0.607  0.959  0.98   0.328 -0.287
 -0.61  -0.314 -0.18  -0.8   -0.782 -0.847 -0.811 -0.825 -0.869 -0.283
  0.566  0.863  0.643  0.454]

Любое решение без цикла, пожалуйста?

numpy.org/devdocs/reference/generated/…
Pranav Hosangadi 10.01.2023 16:48

К вашему сведению, я добавил некоторые результаты определения времени всех решений здесь к моему ответу ниже. Я был очень удивлен результатами для больших входных данных.

Pranav Hosangadi 10.01.2023 18:30

Спасибо, Пранав Хосангади, настоящая законченная работа!

tibibou 10.01.2023 21:02
Как подобрать выигрышные акции с помощью анализа и визуализации на Python
Как подобрать выигрышные акции с помощью анализа и визуализации на Python
Отказ от ответственности: Эта статья предназначена только для демонстрации и не должна использоваться в качестве инвестиционного совета.
Понимание Python и переход к SQL
Понимание Python и переход к SQL
Перед нами лабораторная работа по BloodOath:
Потяните за рычаг выброса энергососущих проектов
Потяните за рычаг выброса энергососущих проектов
На этой неделе моя команда отменила проект, над которым я работал. Неделя усилий пошла насмарку.
Инструменты для веб-скрапинга с открытым исходным кодом: Python Developer Toolkit
Инструменты для веб-скрапинга с открытым исходным кодом: Python Developer Toolkit
Веб-скрейпинг, как мы все знаем, это дисциплина, которая развивается с течением времени. Появляются все более сложные средства борьбы с ботами, а...
Библиотека для работы с мороженым
Библиотека для работы с мороженым
Лично я попрощался с операторами print() в python. Без шуток.
Эмиссия счетов-фактур с помощью Telegram - Python RPA (BotCity)
Эмиссия счетов-фактур с помощью Telegram - Python RPA (BotCity)
Привет, люди RPA, это снова я и я несу подарки! В очередном моем приключении о том, как создавать ботов для облегчения рутины. Вот, думаю, стоит...
2
3
64
2
Перейти к ответу Данный вопрос помечен как решенный

Ответы 2

Попроси и ты получишь. Вот решение, использующее рекурсию:

import numpy as np

data = np.array([10,5,8,9,15,22,26,11,15,16,18,7,4,8,-2,-3,-4,-6,-2,0,10,0,5,8])
length = 5

def rolling_correlation_recurse(data, i, length) :
    assert i+length < data.size
    left = np.array([np.corrcoef(data[i:i+length], np.arange(length))[0, 1]])
    if i+length+1 == data.size :
        return left
    right = rolling_correlation_recurse(data, i+1, length)
    return np.concatenate((left, right))

def rolling_correlation(data, length) :
    return np.concatenate((
        np.zeros([length,]),
        rolling_correlation_recurse(data, 0, length)
    ))

print(rolling_correlation(data, length))

Обновлено: здесь тоже есть решение numpy:

n = len(data)
print(np.corrcoef(np.lib.stride_tricks.sliding_window_view(data, length), np.tile(np.arange(length), (n-length+1, 1)))[:n-length+1, -1])

«Решение для редактирования» для подхода numpy очень хорошее, оно работает, спасибо

tibibou 10.01.2023 17:45

Примечание: в данной формуле отсутствовал +1, так как последнее число корреляции упало. Здесь исправлено (для любого другого пользователя): np.corrcoef(np.lib.stride_tricks.sliding_window_view(data, length), np.tile(np.arange(length), (n-length+1, 1))) [:n-длина+1, -1]

tibibou 10.01.2023 17:52

@tibibou хорошее замечание, исправил

Rontogiannis Aristofanis 10.01.2023 17:52
Ответ принят как подходящий

Используйте numpy.lib.stride_tricks.sliding_window_view (доступно в numpy v1.20.0+)

swindow = np.lib.stride_tricks.sliding_window_view(data, (length,))

Который дает представление о массиве data, который выглядит так:

array([[10,  5,  8,  9, 15],
       [ 5,  8,  9, 15, 22],
       [ 8,  9, 15, 22, 26],
       [ 9, 15, 22, 26, 11],
       [15, 22, 26, 11, 15],
       [22, 26, 11, 15, 16],
       [26, 11, 15, 16, 18],
       [11, 15, 16, 18,  7],
       [15, 16, 18,  7,  4],
       [16, 18,  7,  4,  8],
       [18,  7,  4,  8, -2],
       [ 7,  4,  8, -2, -3],
       [ 4,  8, -2, -3, -4],
       [ 8, -2, -3, -4, -6],
       [-2, -3, -4, -6, -2],
       [-3, -4, -6, -2,  0],
       [-4, -6, -2,  0, 10],
       [-6, -2,  0, 10,  0],
       [-2,  0, 10,  0,  5],
       [ 0, 10,  0,  5,  8]])

Теперь мы хотим применить расчет коэффициента корреляции к каждой строке этого массива. К сожалению, np.corrcoef не принимает аргумент axis, он применяет вычисления ко всей матрице и не предоставляет способа сделать это для каждой строки/столбца.

Однако расчет коэффициента корреляции двух векторов довольно прост:

Применяя это здесь:

def vec_corrcoef(X, y, axis=1):
    Xm = np.mean(X, axis=axis, keepdims=True)
    ym = np.mean(y)
    n = np.sum((X - Xm) * (y - ym), axis=axis)
    d = np.sqrt(np.sum((X - Xm)**2, axis=axis) * np.sum((y - ym)**2))
    return n / d

Теперь вызовите эту функцию с нашим массивом и arange:

cc = vec_corrcoef(swindow, np.arange(length))

Что дает желаемый результат:

array([ 0.60697698,  0.95894955,  0.98      ,  0.3279521 , -0.28709766,
       -0.61035663, -0.31390158, -0.17995394, -0.80041656, -0.78192905,
       -0.84702587, -0.81091772, -0.82464375, -0.86892667, -0.28347335,
        0.56568542,  0.86304424,  0.64326752,  0.45374261,  0.38135638])

Чтобы получить x, просто установите соответствующие индексы массива zeros нужного размера.

Примечание. Я думаю, что ваш x должен содержать ненулевые значения, начиная с индекса 4 (потому что именно там скользящее окно заполнено), а не начиная с индекса 5.

x = np.zeros(data.shape)
x[-len(cc):] = cc

Если вы уверены, что ваши значения должны начинаться с индекса 5, вы можете сделать:

x = np.zeros(data.shape)
x[length:] = cc[:-1] # Ignore the last value in cc

Сравнение времени выполнения вашего исходного подхода с предложенными в ответах здесь:

  • f_OP_loopy — ваш подход, реализующий скользящее окно с помощью цикла
  • f_PH_numpy — это мой подход, который использует sliding_window_view и векторизованную функцию для построчного вычисления векторного коэффициента корреляции
  • f_RA_numpy — это подход Ронтоянниса, который разбивает arange, вычисляет коэффициент корреляции для всех матриц и выбирает только первые len(data) - length строки последнего столбца.
  • f_RA_recur — это рекурсивный подход Ронтоянниса, но я не учел его, потому что он пропускает последний коэффициент корреляции.

  1. Неудивительно, что решение, основанное только на numpy, работает быстрее, чем циклический подход.
  2. Мое решение numpy, которое вычисляет коэффициент корреляции по строкам, быстрее, чем то, что показано Rontogiannis ниже, потому что дополнительная работа связана с разбиением векторного ввода и вычислением корреляции всей матрицы только для отбрасывания нежелательных элементов. , мой подход избегает.
  3. По мере увеличения размера входных данных эта «дополнительная работа» в подходе Ронтоянниса увеличивается настолько, что время его выполнения становится даже хуже, чем при циклическом подходе! Я не уверен, связано ли это дополнительное время с расчетом np.corrcoef или с операцией np.tile.

Note: This plot was obtained on my 2.2GHz i7 Macbook Air with 8GB RAM, Python 3.10.7 and numpy 1.23.3. Similar results were obtained on Google Colab

Если вас интересует временной код, вот он:

import timeit
import numpy as np
from matplotlib import pyplot as plt

def time_funcs(funcs, sizes, arg_gen, N=20):
    times = np.zeros((len(sizes), len(funcs)))
    gdict = globals().copy()
    for i, s in enumerate(sizes):
        args = arg_gen(s)
        print(args)
        for j, f in enumerate(funcs):
            gdict.update(locals())
            try:
                times[i, j] = timeit.timeit("f(*args)", globals=gdict, number=N) / N
                print(f"{i}/{len(sizes)}, {j}/{len(funcs)}, {times[i, j]}")
            except ValueError:
                print(f"ERROR in {f}, with args = ", *args)
                
            
    return times

def plot_times(times, funcs):
    fig, ax = plt.subplots()
    for j, f in enumerate(funcs):
        ax.plot(sizes, times[:, j], label=f.__name__)
    
    
    ax.set_xlabel("Array size")
    ax.set_ylabel("Time per function call (s)")
    ax.set_xscale("log")
    ax.set_yscale("log")
    ax.legend()
    ax.grid()
    fig.tight_layout()
    return fig, ax

#%%
def arg_gen(n):
    return [np.random.randint(-100, 100, (n,)), 5]
    
#%%
def f_OP_loopy(data, length):
    x = np.zeros_like(data).astype('float32')
    for i in range(length-1, data.shape[0]):
        x[i] = np.corrcoef(data[i - length + 1:i+1], np.arange(length))[0, 1]
    return x
    

def f_PH_numpy(data, length):    
    swindow = np.lib.stride_tricks.sliding_window_view(data, (length,))
    cc = vec_corrcoef(swindow, np.arange(length))
    x = np.zeros(data.shape)
    x[-len(cc):] = cc
    return x

def f_RA_recur(data, length):
    return np.concatenate((
        np.zeros([length,]),
        rolling_correlation_recurse(data, 0, length)
    ))

def f_RA_numpy(data, length):
    n = len(data)
    cc = np.corrcoef(np.lib.stride_tricks.sliding_window_view(data, length), np.tile(np.arange(length), (n-length+1, 1)))[:n-length+1, -1]
    x = np.zeros(data.shape)
    x[-len(cc):] = cc
    return x
#%%

def rolling_correlation_recurse(data, i, length) :
    assert i+length < data.size
    left = np.array([np.corrcoef(data[i:i+length], np.arange(length))[0, 1]])
    if i+length+1 == data.size :
        return left
    right = rolling_correlation_recurse(data, i+1, length)
    return np.concatenate((left, right))

def vec_corrcoef(X, y, axis=1):
    Xm = np.mean(X, axis=axis, keepdims=True)
    ym = np.mean(y)
    n = np.sum((X - Xm) * (y - ym), axis=axis)
    d = np.sqrt(np.sum((X - Xm)**2, axis=axis) * np.sum((y - ym)**2))
    return n / d

#%% 
if __name__ == "__main__":
    #%% Set up sim
    sizes = [5, 10, 50, 100, 500, 1000, 5000, 10_000] #, 50_000, 100_000]
    funcs = [f_OP_loopy, #f_RA_recur,
             f_PH_numpy, f_RA_numpy]
    
    
    #%% Run timing
    time_fcalls = np.zeros((len(sizes), len(funcs))) * np.nan
    time_fcalls = time_funcs(funcs, sizes, arg_gen)
    
    fig, ax = plot_times(time_fcalls, funcs)
    ax.set_xlabel(f"Input size")

    plt.show()
    input("Enter x to exit")

Очень четкое решение, и я оценил объяснение (действительно полезное). Спасибо за исправление индекса (вы правы с 4).

tibibou 10.01.2023 17:46

Другие вопросы по теме