Я могу рассчитать скользящий коэффициент корреляции для одномерного массива (данные против [0, 1, 2, 3, 4]), используя цикл. Я ищу более разумное решение, используя numpy (не pandas). Вот мой текущий код:
import numpy as np
data = np.array([10,5,8,9,15,22,26,11,15,16,18,7,4,8,-2,-3,-4,-6,-2,0,10,0,5,8])
x = np.zeros_like(data).astype('float32')
length = 5
for i in range(length, data.shape[0]):
x[i] = np.corrcoef(data[i - length:i], np.arange(length))[0, 1]
print(x)
Х дает:
[ 0. 0. 0. 0. 0. 0.607 0.959 0.98 0.328 -0.287
-0.61 -0.314 -0.18 -0.8 -0.782 -0.847 -0.811 -0.825 -0.869 -0.283
0.566 0.863 0.643 0.454]
Любое решение без цикла, пожалуйста?
К вашему сведению, я добавил некоторые результаты определения времени всех решений здесь к моему ответу ниже. Я был очень удивлен результатами для больших входных данных.
Спасибо, Пранав Хосангади, настоящая законченная работа!
Попроси и ты получишь. Вот решение, использующее рекурсию:
import numpy as np
data = np.array([10,5,8,9,15,22,26,11,15,16,18,7,4,8,-2,-3,-4,-6,-2,0,10,0,5,8])
length = 5
def rolling_correlation_recurse(data, i, length) :
assert i+length < data.size
left = np.array([np.corrcoef(data[i:i+length], np.arange(length))[0, 1]])
if i+length+1 == data.size :
return left
right = rolling_correlation_recurse(data, i+1, length)
return np.concatenate((left, right))
def rolling_correlation(data, length) :
return np.concatenate((
np.zeros([length,]),
rolling_correlation_recurse(data, 0, length)
))
print(rolling_correlation(data, length))
Обновлено: здесь тоже есть решение numpy:
n = len(data)
print(np.corrcoef(np.lib.stride_tricks.sliding_window_view(data, length), np.tile(np.arange(length), (n-length+1, 1)))[:n-length+1, -1])
«Решение для редактирования» для подхода numpy очень хорошее, оно работает, спасибо
Примечание: в данной формуле отсутствовал +1, так как последнее число корреляции упало. Здесь исправлено (для любого другого пользователя): np.corrcoef(np.lib.stride_tricks.sliding_window_view(data, length), np.tile(np.arange(length), (n-length+1, 1))) [:n-длина+1, -1]
@tibibou хорошее замечание, исправил
Используйте numpy.lib.stride_tricks.sliding_window_view (доступно в numpy v1.20.0+)
swindow = np.lib.stride_tricks.sliding_window_view(data, (length,))
Который дает представление о массиве data, который выглядит так:
array([[10, 5, 8, 9, 15],
[ 5, 8, 9, 15, 22],
[ 8, 9, 15, 22, 26],
[ 9, 15, 22, 26, 11],
[15, 22, 26, 11, 15],
[22, 26, 11, 15, 16],
[26, 11, 15, 16, 18],
[11, 15, 16, 18, 7],
[15, 16, 18, 7, 4],
[16, 18, 7, 4, 8],
[18, 7, 4, 8, -2],
[ 7, 4, 8, -2, -3],
[ 4, 8, -2, -3, -4],
[ 8, -2, -3, -4, -6],
[-2, -3, -4, -6, -2],
[-3, -4, -6, -2, 0],
[-4, -6, -2, 0, 10],
[-6, -2, 0, 10, 0],
[-2, 0, 10, 0, 5],
[ 0, 10, 0, 5, 8]])
Теперь мы хотим применить расчет коэффициента корреляции к каждой строке этого массива. К сожалению, np.corrcoef не принимает аргумент axis, он применяет вычисления ко всей матрице и не предоставляет способа сделать это для каждой строки/столбца.
Однако расчет коэффициента корреляции двух векторов довольно прост:
Применяя это здесь:
def vec_corrcoef(X, y, axis=1):
Xm = np.mean(X, axis=axis, keepdims=True)
ym = np.mean(y)
n = np.sum((X - Xm) * (y - ym), axis=axis)
d = np.sqrt(np.sum((X - Xm)**2, axis=axis) * np.sum((y - ym)**2))
return n / d
Теперь вызовите эту функцию с нашим массивом и arange:
cc = vec_corrcoef(swindow, np.arange(length))
Что дает желаемый результат:
array([ 0.60697698, 0.95894955, 0.98 , 0.3279521 , -0.28709766,
-0.61035663, -0.31390158, -0.17995394, -0.80041656, -0.78192905,
-0.84702587, -0.81091772, -0.82464375, -0.86892667, -0.28347335,
0.56568542, 0.86304424, 0.64326752, 0.45374261, 0.38135638])
Чтобы получить x, просто установите соответствующие индексы массива zeros нужного размера.
Примечание. Я думаю, что ваш x должен содержать ненулевые значения, начиная с индекса 4 (потому что именно там скользящее окно заполнено), а не начиная с индекса 5.
x = np.zeros(data.shape)
x[-len(cc):] = cc
Если вы уверены, что ваши значения должны начинаться с индекса 5, вы можете сделать:
x = np.zeros(data.shape)
x[length:] = cc[:-1] # Ignore the last value in cc
Сравнение времени выполнения вашего исходного подхода с предложенными в ответах здесь:
f_OP_loopy
— ваш подход, реализующий скользящее окно с помощью циклаf_PH_numpy
— это мой подход, который использует sliding_window_view
и векторизованную функцию для построчного вычисления векторного коэффициента корреляцииf_RA_numpy
— это подход Ронтоянниса, который разбивает arange
, вычисляет коэффициент корреляции для всех матриц и выбирает только первые len(data) - length
строки последнего столбца.f_RA_recur
— это рекурсивный подход Ронтоянниса, но я не учел его, потому что он пропускает последний коэффициент корреляции.np.corrcoef
или с операцией np.tile
.Note: This plot was obtained on my 2.2GHz i7 Macbook Air with 8GB RAM, Python 3.10.7 and numpy 1.23.3. Similar results were obtained on Google Colab
Если вас интересует временной код, вот он:
import timeit
import numpy as np
from matplotlib import pyplot as plt
def time_funcs(funcs, sizes, arg_gen, N=20):
times = np.zeros((len(sizes), len(funcs)))
gdict = globals().copy()
for i, s in enumerate(sizes):
args = arg_gen(s)
print(args)
for j, f in enumerate(funcs):
gdict.update(locals())
try:
times[i, j] = timeit.timeit("f(*args)", globals=gdict, number=N) / N
print(f"{i}/{len(sizes)}, {j}/{len(funcs)}, {times[i, j]}")
except ValueError:
print(f"ERROR in {f}, with args = ", *args)
return times
def plot_times(times, funcs):
fig, ax = plt.subplots()
for j, f in enumerate(funcs):
ax.plot(sizes, times[:, j], label=f.__name__)
ax.set_xlabel("Array size")
ax.set_ylabel("Time per function call (s)")
ax.set_xscale("log")
ax.set_yscale("log")
ax.legend()
ax.grid()
fig.tight_layout()
return fig, ax
#%%
def arg_gen(n):
return [np.random.randint(-100, 100, (n,)), 5]
#%%
def f_OP_loopy(data, length):
x = np.zeros_like(data).astype('float32')
for i in range(length-1, data.shape[0]):
x[i] = np.corrcoef(data[i - length + 1:i+1], np.arange(length))[0, 1]
return x
def f_PH_numpy(data, length):
swindow = np.lib.stride_tricks.sliding_window_view(data, (length,))
cc = vec_corrcoef(swindow, np.arange(length))
x = np.zeros(data.shape)
x[-len(cc):] = cc
return x
def f_RA_recur(data, length):
return np.concatenate((
np.zeros([length,]),
rolling_correlation_recurse(data, 0, length)
))
def f_RA_numpy(data, length):
n = len(data)
cc = np.corrcoef(np.lib.stride_tricks.sliding_window_view(data, length), np.tile(np.arange(length), (n-length+1, 1)))[:n-length+1, -1]
x = np.zeros(data.shape)
x[-len(cc):] = cc
return x
#%%
def rolling_correlation_recurse(data, i, length) :
assert i+length < data.size
left = np.array([np.corrcoef(data[i:i+length], np.arange(length))[0, 1]])
if i+length+1 == data.size :
return left
right = rolling_correlation_recurse(data, i+1, length)
return np.concatenate((left, right))
def vec_corrcoef(X, y, axis=1):
Xm = np.mean(X, axis=axis, keepdims=True)
ym = np.mean(y)
n = np.sum((X - Xm) * (y - ym), axis=axis)
d = np.sqrt(np.sum((X - Xm)**2, axis=axis) * np.sum((y - ym)**2))
return n / d
#%%
if __name__ == "__main__":
#%% Set up sim
sizes = [5, 10, 50, 100, 500, 1000, 5000, 10_000] #, 50_000, 100_000]
funcs = [f_OP_loopy, #f_RA_recur,
f_PH_numpy, f_RA_numpy]
#%% Run timing
time_fcalls = np.zeros((len(sizes), len(funcs))) * np.nan
time_fcalls = time_funcs(funcs, sizes, arg_gen)
fig, ax = plot_times(time_fcalls, funcs)
ax.set_xlabel(f"Input size")
plt.show()
input("Enter x to exit")
Очень четкое решение, и я оценил объяснение (действительно полезное). Спасибо за исправление индекса (вы правы с 4).