Как рассчитать коэффициент корреляции в скользящем окне вектора с помощью numpy?

Я могу рассчитать скользящий коэффициент корреляции для одномерного массива (данные против [0, 1, 2, 3, 4]), используя цикл. Я ищу более разумное решение, используя numpy (не pandas). Вот мой текущий код:

import numpy as np
data = np.array([10,5,8,9,15,22,26,11,15,16,18,7,4,8,-2,-3,-4,-6,-2,0,10,0,5,8])
x = np.zeros_like(data).astype('float32')
length = 5
for i in range(length, data.shape[0]):
    x[i] = np.corrcoef(data[i - length:i], np.arange(length))[0, 1]
print(x)

х дает:

    [ 0.     0.     0.     0.     0.     0.607  0.959  0.98   0.328 -0.287
 -0.61  -0.314 -0.18  -0.8   -0.782 -0.847 -0.811 -0.825 -0.869 -0.283
  0.566  0.863  0.643  0.454]

Любое решение без цикла, пожалуйста?

numpy.org/devdocs/reference/generated/…
Pranav Hosangadi 10.01.2023 16:48

К вашему сведению, я добавил некоторые результаты определения времени всех решений здесь к моему ответу ниже. Я был очень удивлен результатами для больших входных данных.

Pranav Hosangadi 10.01.2023 18:30

Спасибо, Пранав Хосангади, настоящая законченная работа!

tibibou 10.01.2023 21:02
Почему в Python есть оператор "pass"?
Почему в Python есть оператор "pass"?
Оператор pass в Python - это простая концепция, которую могут быстро освоить даже новички без опыта программирования.
Некоторые методы, о которых вы не знали, что они существуют в Python
Некоторые методы, о которых вы не знали, что они существуют в Python
Python - самый известный и самый простой в изучении язык в наши дни. Имея широкий спектр применения в области машинного обучения, Data Science,...
Основы Python Часть I
Основы Python Часть I
Вы когда-нибудь задумывались, почему в программах на Python вы видите приведенный ниже код?
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
Алиса и Боб имеют неориентированный граф из n узлов и трех типов ребер:
Оптимизация кода с помощью тернарного оператора Python
Оптимизация кода с помощью тернарного оператора Python
И последнее, что мы хотели бы показать вам, прежде чем двигаться дальше, это
Советы по эффективной веб-разработке с помощью Python
Советы по эффективной веб-разработке с помощью Python
Как веб-разработчик, Python может стать мощным инструментом для создания эффективных и масштабируемых веб-приложений.
2
3
64
2
Перейти к ответу Данный вопрос помечен как решенный

Ответы 2

Попроси и ты получишь. Вот решение, использующее рекурсию:

import numpy as np

data = np.array([10,5,8,9,15,22,26,11,15,16,18,7,4,8,-2,-3,-4,-6,-2,0,10,0,5,8])
length = 5

def rolling_correlation_recurse(data, i, length) :
    assert i+length < data.size
    left = np.array([np.corrcoef(data[i:i+length], np.arange(length))[0, 1]])
    if i+length+1 == data.size :
        return left
    right = rolling_correlation_recurse(data, i+1, length)
    return np.concatenate((left, right))

def rolling_correlation(data, length) :
    return np.concatenate((
        np.zeros([length,]),
        rolling_correlation_recurse(data, 0, length)
    ))

print(rolling_correlation(data, length))

Обновлено: здесь тоже есть решение numpy:

n = len(data)
print(np.corrcoef(np.lib.stride_tricks.sliding_window_view(data, length), np.tile(np.arange(length), (n-length+1, 1)))[:n-length+1, -1])

«Решение для редактирования» для подхода numpy очень хорошее, оно работает, спасибо

tibibou 10.01.2023 17:45

Примечание: в данной формуле отсутствовал +1, так как последнее число корреляции упало. Здесь исправлено (для любого другого пользователя): np.corrcoef(np.lib.stride_tricks.sliding_window_view(data, length), np.tile(np.arange(length), (n-length+1, 1))) [:n-длина+1, -1]

tibibou 10.01.2023 17:52

@tibibou хорошее замечание, исправил

Rontogiannis Aristofanis 10.01.2023 17:52
Ответ принят как подходящий

Используйте numpy.lib.stride_tricks.sliding_window_view (доступно в numpy v1.20.0+)

swindow = np.lib.stride_tricks.sliding_window_view(data, (length,))

который дает представление о массиве data, который выглядит так:

array([[10,  5,  8,  9, 15],
       [ 5,  8,  9, 15, 22],
       [ 8,  9, 15, 22, 26],
       [ 9, 15, 22, 26, 11],
       [15, 22, 26, 11, 15],
       [22, 26, 11, 15, 16],
       [26, 11, 15, 16, 18],
       [11, 15, 16, 18,  7],
       [15, 16, 18,  7,  4],
       [16, 18,  7,  4,  8],
       [18,  7,  4,  8, -2],
       [ 7,  4,  8, -2, -3],
       [ 4,  8, -2, -3, -4],
       [ 8, -2, -3, -4, -6],
       [-2, -3, -4, -6, -2],
       [-3, -4, -6, -2,  0],
       [-4, -6, -2,  0, 10],
       [-6, -2,  0, 10,  0],
       [-2,  0, 10,  0,  5],
       [ 0, 10,  0,  5,  8]])

Теперь мы хотим применить расчет коэффициента корреляции к каждой строке этого массива. К сожалению, np.corrcoef не принимает аргумент axis, он применяет вычисления ко всей матрице и не предоставляет способа сделать это для каждой строки/столбца.

Однако расчет коэффициента корреляции двух векторов довольно прост:

linear correlation coefficient

Применяя это здесь:

def vec_corrcoef(X, y, axis=1):
    Xm = np.mean(X, axis=axis, keepdims=True)
    ym = np.mean(y)
    n = np.sum((X - Xm) * (y - ym), axis=axis)
    d = np.sqrt(np.sum((X - Xm)**2, axis=axis) * np.sum((y - ym)**2))
    return n / d

Теперь вызовите эту функцию с нашим массивом и arange:

cc = vec_corrcoef(swindow, np.arange(length))

что дает желаемый результат:

array([ 0.60697698,  0.95894955,  0.98      ,  0.3279521 , -0.28709766,
       -0.61035663, -0.31390158, -0.17995394, -0.80041656, -0.78192905,
       -0.84702587, -0.81091772, -0.82464375, -0.86892667, -0.28347335,
        0.56568542,  0.86304424,  0.64326752,  0.45374261,  0.38135638])

Чтобы получить x, просто установите соответствующие индексы массива zeros нужного размера.

Примечание. Я думаю, что ваш x должен содержать ненулевые значения, начиная с индекса 4 (потому что именно там скользящее окно заполнено), а не начиная с индекса 5.

x = np.zeros(data.shape)
x[-len(cc):] = cc

Если вы уверены, что ваши значения должны начинаться с индекса 5, вы можете сделать:

x = np.zeros(data.shape)
x[length:] = cc[:-1] # Ignore the last value in cc

Сравнение времени выполнения вашего исходного подхода с предложенными в ответах здесь:

  • f_OP_loopy — ваш подход, реализующий скользящее окно с помощью цикла
  • f_PH_numpy — это мой подход, который использует sliding_window_view и векторизованную функцию для построчного вычисления векторного коэффициента корреляции
  • f_RA_numpy — это подход Ронтоянниса, который разбивает arange, вычисляет коэффициент корреляции для всех матриц и выбирает только первые len(data) - length строки последнего столбца.
  • f_RA_recur — это рекурсивный подход Ронтоянниса, но я не учел его, потому что он пропускает последний коэффициент корреляции.

Timing plot

  1. Неудивительно, что решение, основанное только на numpy, работает быстрее, чем циклический подход.
  2. Мое решение numpy, которое вычисляет коэффициент корреляции по строкам, быстрее, чем то, что показано Rontogiannis ниже, потому что дополнительная работа связана с разбиением векторного ввода и вычислением корреляции всей матрицы только для отбрасывания нежелательных элементов. , мой подход избегает.
  3. По мере увеличения размера входных данных эта «дополнительная работа» в подходе Ронтоянниса увеличивается настолько, что время его выполнения становится даже хуже, чем при циклическом подходе! Я не уверен, связано ли это дополнительное время с расчетом np.corrcoef или с операцией np.tile.

Note: This plot was obtained on my 2.2GHz i7 Macbook Air with 8GB RAM, Python 3.10.7 and numpy 1.23.3. Similar results were obtained on Google Colab

Если вас интересует временной код, вот он:

import timeit
import numpy as np
from matplotlib import pyplot as plt

def time_funcs(funcs, sizes, arg_gen, N=20):
    times = np.zeros((len(sizes), len(funcs)))
    gdict = globals().copy()
    for i, s in enumerate(sizes):
        args = arg_gen(s)
        print(args)
        for j, f in enumerate(funcs):
            gdict.update(locals())
            try:
                times[i, j] = timeit.timeit("f(*args)", globals=gdict, number=N) / N
                print(f"{i}/{len(sizes)}, {j}/{len(funcs)}, {times[i, j]}")
            except ValueError:
                print(f"ERROR in {f}, with args = ", *args)
                
            
    return times

def plot_times(times, funcs):
    fig, ax = plt.subplots()
    for j, f in enumerate(funcs):
        ax.plot(sizes, times[:, j], label=f.__name__)
    
    
    ax.set_xlabel("Array size")
    ax.set_ylabel("Time per function call (s)")
    ax.set_xscale("log")
    ax.set_yscale("log")
    ax.legend()
    ax.grid()
    fig.tight_layout()
    return fig, ax

#%%
def arg_gen(n):
    return [np.random.randint(-100, 100, (n,)), 5]
    
#%%
def f_OP_loopy(data, length):
    x = np.zeros_like(data).astype('float32')
    for i in range(length-1, data.shape[0]):
        x[i] = np.corrcoef(data[i - length + 1:i+1], np.arange(length))[0, 1]
    return x
    

def f_PH_numpy(data, length):    
    swindow = np.lib.stride_tricks.sliding_window_view(data, (length,))
    cc = vec_corrcoef(swindow, np.arange(length))
    x = np.zeros(data.shape)
    x[-len(cc):] = cc
    return x

def f_RA_recur(data, length):
    return np.concatenate((
        np.zeros([length,]),
        rolling_correlation_recurse(data, 0, length)
    ))

def f_RA_numpy(data, length):
    n = len(data)
    cc = np.corrcoef(np.lib.stride_tricks.sliding_window_view(data, length), np.tile(np.arange(length), (n-length+1, 1)))[:n-length+1, -1]
    x = np.zeros(data.shape)
    x[-len(cc):] = cc
    return x
#%%

def rolling_correlation_recurse(data, i, length) :
    assert i+length < data.size
    left = np.array([np.corrcoef(data[i:i+length], np.arange(length))[0, 1]])
    if i+length+1 == data.size :
        return left
    right = rolling_correlation_recurse(data, i+1, length)
    return np.concatenate((left, right))

def vec_corrcoef(X, y, axis=1):
    Xm = np.mean(X, axis=axis, keepdims=True)
    ym = np.mean(y)
    n = np.sum((X - Xm) * (y - ym), axis=axis)
    d = np.sqrt(np.sum((X - Xm)**2, axis=axis) * np.sum((y - ym)**2))
    return n / d

#%% 
if __name__ == "__main__":
    #%% Set up sim
    sizes = [5, 10, 50, 100, 500, 1000, 5000, 10_000] #, 50_000, 100_000]
    funcs = [f_OP_loopy, #f_RA_recur,
             f_PH_numpy, f_RA_numpy]
    
    
    #%% Run timing
    time_fcalls = np.zeros((len(sizes), len(funcs))) * np.nan
    time_fcalls = time_funcs(funcs, sizes, arg_gen)
    
    fig, ax = plot_times(time_fcalls, funcs)
    ax.set_xlabel(f"Input size")

    plt.show()
    input("Enter x to exit")

Очень четкое решение, и я оценил объяснение (действительно полезное). Спасибо за исправление индекса (вы правы с 4).

tibibou 10.01.2023 17:46

Другие вопросы по теме