Моя матрица проста, например:
# python3 numpy
>>> A
array([[0., 0., 1., 1., 1.],
[0., 0., 1., 1., 1.],
[0., 0., 0., 0., 0.],
[0., 0., 0., 0., 0.],
[0., 0., 0., 0., 0.]])
>>> P
array([[0., 0., 0., 0.]])
Мне нужно найти полностью нулевую область (достаточно одной) в A с тем же размером, что и P (1x4). Таким образом, правильный ответ включает в себя:
(2, 0) # The vertex coordinates of the all-zero rectangular region that P can be matched
(2, 1)
(3, 0)
(3, 1)
(4, 0)
(4, 1)
# Just get any 1 answer
На самом деле моя матрица A достигнет размера 30 000 * 30 000. Я беспокоюсь, что он будет медленным, если будет написан как оператор цикла. Есть ли быстрый способ?
Размер P не определен, от 10*30 до 4000*80. В то же время матрице A не хватает регулярности, и зацикливание из любой точки может потребовать обхода всей матрицы для успешного сопоставления.
Будет ли P
всегда иметь этот размер и форму, или вам нужно обобщать и для этого?
@KarlKnechtel не всегда
@Julien Если есть работающий метод numpy, я могу сравнить его с циклом. Потому что на самом деле существует большое количество случаев совпадения верхнего левого и нижнего правого. Независимо от того, где вы начнете цикл, вы можете столкнуться с худшим
Насколько велика может быть матрица P
? В вашем примере матрица имеет только 0 или 1, то же самое для вашей фактической матрицы?
@ken Конкретное значение определить невозможно, возможно 10*20~3000*80~4000*3000 или больше. Но он не превышает размеров A
np.where((np.lib.stride_tricks.sliding_window_view(A, P.shape) == P).all(axis=tuple(range(-len(P.shape),0))))
работает по крайней мере для достаточно маленьких массивов.
Прежде всего, полный анализ A
может быть дорогим, особенно потому, что A
занимает много памяти (поэтому он поместится в ОЗУ), а ОЗУ работает медленно. Мы можем сканировать A
строку за строкой, чтобы узнать, может ли P
поместиться в np
последних строках A
, где np,mp = P.shape
и na,ma = A.shape
. Дело в том, что np
может быть довольно большим, поэтому нам нужен быстрый способ, так что проверьте это.
Чтобы упростить понимание проблемы, предположим, что мы можем (эффективно) предварительно вычислить B = A == 0
. Теперь вопрос в том, как найти двумерную область значений True
размером не менее (np,mp)
. Для этого мы можем вычислить логическое И np
последних строк B
и проверить, есть ли в результате последовательность хотя бы mp
значений. Например, если np=3
и mp=4
, то мы можем вычислить mask = B[i] & B[i+1] & B[i+2]
для каждого возможного i
, а затем проверить для каждого mask
, есть ли 4 последовательных True
. Эта операция представляет собой не что иное, как 2D-свертку с логическим оператором И, а описанная выше оптимизация (значительно снижающая алгоритмическую сложность) называется разделением фильтров.
На самом деле обратите внимание, что логическое И логических чисел эквивалентно произведению значений 0-1. Это означает, что мы можем ускорить свертку, используя двумерное быстрое преобразование Фурье (БПФ). Хотя 2D-БПФ можно использовать для алгоритмически оптимального решения этой задачи (за время O(na ma ((log na) + (log ma)))
), для этого требуется значительный объем памяти, и мы не можем комбинировать их со стратегией сканирования строк. Мы можем найти другую стратегию, использующую меньше памяти за счет, возможно, большего количества вычислений (и менее оптимальной сложности).
Можно видеть, что mask
последовательных строк в основном вычисляют одно и то же, особенно когда np
большое. Действительно, для i=0
мы вычисляем mask = B[0] & B[1] & B[2]
, а для i=1
мы вычисляем mask = B[1] & B[2] & B[3]
. B[1] & B[2]
пересчитывается дважды. Для k последовательных строк np-k
условия разделяются/вычисляются повторно. Таким образом, если np
большое, то операции лучше разложить на множители, чтобы не пересчитывать много слагаемых.
Для этого эффективным подходом является построение бинарного дерева, содержащего частичные редукции. Например, мы можем вычислить node0 = B[0] & B[1]
и node1 = B[2] & B[3]
, а затем node2 = node0 & node1
, поэтому, если мы хотим вычислить B[0] & B[1] & B[2]
, мы можем повторно использовать узлы бинарного дерева. Мы можем показать, что значений узла O(log np)
достаточно для вычисления mask
(т. е. пересечения np
последних строк). Это означает, что это решение эффективно (аналогично БПФ). Однако реализовать его не просто.
Более простое и менее эффективное решение — вычислить пересечение линий кусками размером c
. c
нужно тщательно выбирать: достаточно большой, чтобы избежать повторного вычисления одного и того же объекта несколько раз, и достаточно маленький, чтобы куски действительно были полезны. c = int(ceil(sqrt(np)/2))
, безусловно, является хорошим решением для относительно больших значений np
(без использования фрагментов, безусловно, лучше, когда np
очень мал, как <=4
).
Обратите внимание, что вы можете упаковать логические значения в биты, чтобы значительно ускорить пересечение линий, поскольку результирующие массивы в 8 раз меньше. Это можно сделать с помощью np.packbits
. С помощью этой стратегии логическое И логических значений заменяется побитовым И (что так же быстро, если не быстрее). В то время как строка A
длиной 30_000 требует 234 КБ ОЗУ, связанные логические значения могут храниться только в 4 КБ. Последние могут лучше помещаться в кеши ЦП, ускоряя вычисления.
Обратите внимание, что вычисление может быть остановлено раньше, когда будет найдена совпадающая строка развертки. Также обратите внимание, что если mp << np
, то сначала транспонировать A
может быть значительно быстрее.
Ради производительности можно использовать Cython или Numba для эффективного вычисления (циклы CPython медленные, но не циклы Cython/Numba).
Чтобы найти координаты всех нулевых прямоугольников определенной высоты/ширины, вы можете преобразовать 1 и 0 в количество последовательных нулей по горизонтали и сравнить их с вашей целевой шириной. Затем проделайте то же самое по вертикали с результатами горизонтального сравнения. Все координаты, которые накапливаются до вашего минимального размера по вертикали, будут представлять нижний правый угол нулевых прямоугольников:
import numpy as np
A = np. array([
[0., 0., 1., 1., 1.],
[0., 0., 1., 1., 1.],
[0., 0., 0., 0., 0.],
[0., 1., 0., 0., 1.],
[0., 0., 0., 0., 0.]])
height,width = 3,2
H0 = (A == 0)*(np.arange(A.shape[1])+1)[None,:]
H1 = (A != 0)*(np.arange(A.shape[1])+1)[None,:]
Hz = H0 - np.maximum.accumulate(H1,axis=1)
V0 = (Hz >= width)*(np.arange(A.shape[0])+1)[:,None]
V1 = (Hz < width)*(np.arange(A.shape[0])+1)[:,None]
V2 = V0 - np.maximum.accumulate(V1,axis=0)
bottomRight = np.where(V2 >= height)
topLeft = bottomRight - np.array([[height-1],[width-1]])
print(*zip(*topLeft))
print(*zip(*bottomRight))
(0, 0) (2, 2)
(2, 1) (4, 3)
Это может быть легче понять, изучив промежуточные результаты:
Гц количество последовательных нулей по горизонтали (единицы имеют отрицательные значения)
array([[ 1, 2, -3, -4, -5],
[ 1, 2, -3, -4, -5],
[ 1, 2, 3, 4, 5],
[ 1, -2, 1, 2, -5],
[ 1, 2, 3, 4, 5]])
значения 2 (или более) соответствуют конечным позициям горизонтальных последовательных нулей, которые соответствуют требованию ширины
V2 количество совпадений горизонтального размера по вертикали (Гц >= ширина)
array([[-1, 1, -1, -1, -1],
[-2, 2, -2, -2, -2],
[-3, 3, 1, 1, 1],
[-4, -4, -4, 2, -4],
[-5, 1, 1, 3, 1]])
Позиции, содержащие 3 (или более), соответствуют нижнему правому углу нулевых прямоугольников размером 3x2.
Производительность
Этот подход «всей матрицы» работает медленно на очень больших данных (30 000 x 30 000 занимает 220 секунд). Поскольку вы ищете только одно вхождение, гибридный подход векторизации/цикла может дать лучшие результаты:
V2 = np.zeros(A.shape[1])
rowRange = np.arange(A.shape[1])+1
for r,row in enumerate(A==0):
H0 = row*rowRange
H1 = (~row)*rowRange
Hz = (H0 - np.maximum.accumulate(H1)) >= width
V2 = V2*Hz + Hz
if np.any(V2>=height):
print(time()-start,"found",(r,np.where(V2>=height)[0][0]))
break
В этом гибридном подходе используется тот же метод, но он выполняется построчно, а не по всей матрице. Вы можете оптимизировать, если матрица не является квадратной, выбрав обработку по столбцу или по строке на основе меньшего размера. Поскольку цикл Pyhton медленнее, чем векторизованные вычисления numpy, это должно минимизировать накладные расходы кода Python.
На матрице 30 000 x 30 000 потребовалось всего 9 секунд, когда НЕ было найдено совпадений (что является наихудшим сценарием). Это не быстрее при меньших размерах (например, 500 x 500), но это уменьшает объем памяти, которым управляет numpy, и в конечном итоге догоняет.
В обоих случаях скорость не зависит от размера P.
Как отметил @Julien в комментарии, в целом мы можем использовать раздвижные окна для такого рода задач.
def find_all_zero_region_by_sliding_window(a, shape):
x, y = np.nonzero(np.lib.stride_tricks.sliding_window_view(a, shape).max(axis=-1).max(axis=-1) == 0)
return np.stack((x, y), axis=-1)
find_all_zero_region_by_sliding_window(A, P.shape)
Однако, к сожалению, для этого требуется много памяти.
numpy.core._exceptions.MemoryError: Unable to allocate 11.3 TiB for an array with shape (26001, 29921, 4000) and data type float32
^^^^^^^^
В качестве альтернативы я думаю, что хорошей идеей будет использование таблицы Суммарной площади.
Это похоже на описанный выше подход со скользящим окном, но вместо поиска максимального значения мы можем вычислить сумму (очень эффективно) и найти позицию, в которой она равна нулю.
Обратите внимание, что это предполагает, что A
не содержит отрицательных значений. В противном случае вам пришлось бы использовать numpy.abs
.
Поскольку нам не нужно вычислять сумму любой заданной позиции, я адаптировал эту идею и реализовал ее так, чтобы требовался только однострочный кеш.
import numpy as np
from typing import Tuple
def find_all_zero_region(arr: np.ndarray, kernel_size: Tuple[int, int]) -> np.ndarray:
input_height, input_width = arr.shape
kernel_height, kernel_width = kernel_size
matches = []
# Calculate summed_line for y==0.
summed_line = arr[:kernel_height].sum(axis=0)
for y in range(input_height - kernel_height + 1):
# Update summed_line for row y.
if y != 0: # Except y==0, which already calculated above.
# Adding new row and subtracting old row.
summed_line += arr[y + kernel_height - 1] - arr[y - 1]
# Calculate kernel_sum for (y, 0).
kernel_sum = summed_line[:kernel_width].sum()
if kernel_sum == 0:
matches.append((y, 0))
# Calculate kernel_sum for (y, 1) to (y, right-edge).
# Using the idea of a summed-area table, but in 1D (horizontally).
(all_zero_region_cols,) = np.nonzero(kernel_sum + np.cumsum(summed_line[kernel_width:] - summed_line[:-kernel_width]) == 0)
for col in all_zero_region_cols:
matches.append((y, col + 1))
if not matches:
# For Numba, output must be a 2d array.
return np.zeros((0, 2), dtype=np.int64)
return np.array(matches, dtype=np.int64)
Как видите, здесь используются циклы, но это должно быть намного быстрее, чем вы думаете, потому что требуемая память относительно мала, а количество вычислений/сравнений значительно уменьшено. Вот некоторый временной код.
import time
rng = np.random.default_rng(0)
A = rng.integers(0, 2, size=(30000, 30000)).astype(np.float32)
P = np.zeros(shape=(4000, 80))
# Create an all-zero region in the bottom right corner which will be searched last.
A[-P.shape[0] :, -P.shape[1] :] = 0
started = time.perf_counter()
result = find_all_zero_region(A, P.shape)
print(f"{time.perf_counter() - started} sec")
print(result)
# 3.541154200000001 sec
# [[26000 29920]]
Более того, эту функцию можно сделать еще быстрее, используя Numba. Просто добавьте аннотации следующим образом:
import numba
@numba.njit("int64[:,:](float32[:,:],UniTuple(int64,2))")
def find_all_zero_region_with_numba(arr: np.ndarray, kernel_size: Tuple[int, int]) -> np.ndarray:
...
started = time.perf_counter()
find_all_zero_region_with_numba(A, P.shape)
print(f"{time.perf_counter() - started} sec")
# 1.6005743999999993 sec
Обратите внимание, что я реализовал его, чтобы найти все позиции всех нулевых регионов, но вы также можете заставить его возвращаться к первому. Поскольку он использует циклы, среднее время выполнения будет еще быстрее.
Я предполагаю, что это будет зависеть от того, насколько вероятно/плотно вы ожидаете, что совпадения будут существовать. Я считаю (не совсем уверен в этом), что любой векторизованный код в numpy будет вычислять всю матрицу. Таким образом, цикл будет быстрым, но он не завершится, как только будет найдено первое решение. Если вы напишете свой цикл на питоне, он, вероятно, будет медленнее, но тогда вы легко сможете существовать раньше.