Как быстрее всего выполнить операции над соседними элементами массива mxn на расстоянии $l$ (где m, n большие). Если бы это было изображение, это приравнивалось бы к операции над окружающими пикселями. Чтобы было понятнее, я создал новый массив с соседями соответствующего источника.
Учитывая некоторый массив, например
x = [[1,2,3],
[4,5,6],
[7,8,9]]
Если бы я взял элемент [0,0] и хотел, чтобы окружающие элементы находились в $l$=1, мне понадобились бы элементы [0,1] и [1,0] (намлей 2 и 4). Желаемый результат будет выглядеть примерно так
y = [[[2,4], [1,3,5], [2,6]],
[[1,5,7], [4,6,2,8], [3,9,5]],
[[4,8], [7,5,9], [8,6]]]
Я пытался поиграть с kdTree из scipy.spatial и знаю о https://stackoverflow.com/a/45742628/20451990, но, насколько я могу судить, это на самом деле поиск ближайших точек данных, тогда как я хочу найти ближайшие элементы массива. Я думаю, это можно было бы наивно сделать, перебирая, но это очень медленно...
Конечная цель здесь состоит в том, чтобы сгенерировать комбинации соседних элементов массива, из которых я буду брать произведение. Для приведенного выше примера это может быть
[[1*2, 1*4], [2*1, 2*3, 2*5], [3*2, 3*6]],...]
Как вы, кажется, заметили, классификаторы ближайшего соседа не имеют ничего общего с индексами соседних массивов...
Какую проблему вы пытаетесь решить, делая это? Если это что-то вроде нахождения суммы всех таких «соседних» значений, есть гораздо более эффективные в вычислительном отношении способы сделать это, чем создание рваного массива всех соседей, как вы здесь (например, scipy.signal.convolve2d([[0,1,0],[1,0,1],[0,1,0]], x, "same"))
@Brian Я пытаюсь найти кумулянты x-точки элементов массива на определенном расстоянии. Спасибо за быстрый ответ!
Это похоже на проблему XY для меня. Генерация этого набора индексов часто является очень плохой идеей для производительности. Даже если есть веская причина, Numpy не поддерживает эффективно зубчатые массивы (поскольку вложенные массивы должны быть объектами, а Numpy не векторизует операции над ними, что приводит к огромным накладным расходам). Дело в том, что результат, который вы предоставляете, представляет собой зубчатый массив... Пожалуйста, опишите, что вы хотите с ним делать (начиная с того, что такое "кумулянты x-точки" и, в частности, как вы хотели бы их вычислить в вашем случае).
Я отредактирую вопрос, чтобы сделать его более понятным
Насколько велики ваши массивы? А какие значения дистанции "Я" вас интересуют? И как вы собираетесь агрегировать, использовать или визуализировать выходной массив, поскольку во многих случаях он будет настолько большим, что просто на глаз увидеть что-либо полезное невозможно..?
Каждый массив имеет размер примерно 30x30 элементов, l будет меньше 4, и чтобы ответить на вопрос о визуализации, нет необходимости визуализировать его, это будет связано с моделью Изинга в физике, поэтому мы собираемся экстремировать некоторую энергию.
Итак, вас интересуют [[1*2, 1*4], [2*1, 2*3, 2*5], [3*2, 3*6]],...] или вы хотите просто получить их максимум, например [[1*4], [2*5], [3*6]],...] в качестве вывода?
Во-первых, все они важны :)
cell_result
в numbafied process_cell()
)Вы просили самый быстрый способ вычислить это. У меня не было базовой линии, поэтому я сначала создал чистое решение для цикла for на Python в качестве базовой линии. Затем я использовал numba, чтобы код работал быстрее. Скорее всего, это не самая быстрая реализация, но, по крайней мере, она намного быстрее, чем наивный подход с использованием цикла for на чистом Python.
Итак, если вы не знакомы с numba, это хороший способ узнать о нем немного :)
Я использую две части тестовых данных. Во-первых, простой массив, указанный в вопросе. Я называю это myarr, и он используется для легкого сравнения вывода:
import numpy as np
myarr = np.array(
[
[1, 2, 3],
[4, 5, 6],
[7, 8, 9],
],
dtype=np.float32,
)
Второй набор данных предназначен для сравнительного анализа. Вы упомянули, что массивы будут иметь размер 30 x 30, а расстояние I будет меньше 4.
arr_large = np.arange(1, 30 * 30 + 1, 1, dtype=np.float32).reshape(30, 30)
Другими словами, arr_large представляет собой двумерный массив 30 x 30:
>>> arr_large
array([[ 1., 2., 3., 4., 5., 6., 7., 8., 9., 10., 11.,
12., 13., 14., 15., 16., 17., 18., 19., 20., 21., 22.,
23., 24., 25., 26., 27., 28., 29., 30.],
...
[871., 872., 873., 874., 875., 876., 877., 878., 879., 880., 881.,
882., 883., 884., 885., 886., 887., 888., 889., 890., 891., 892.,
893., 894., 895., 896., 897., 898., 899., 900.]], dtype=float32)
Я указал dtype, потому что на этапе оптимизации необходимо указать тип данных. Для решения на чистом Python это, конечно, вообще не нужно.
Я реализовал базовое решение с помощью класса python и циклов for. Результат выглядит так (источник NeighbourProcessor ниже):
n = NeighbourProcessor()
output = n.process(myarr, max_distance=1)
Результат тогда
>>> output
{(0, 0): [2, 4],
(0, 1): [2, 6, 10],
(0, 2): [6, 18],
(1, 0): [4, 20, 28],
(1, 1): [10, 20, 30, 40],
(1, 2): [18, 30, 54],
(2, 0): [28, 56],
(2, 1): [40, 56, 72],
(2, 2): [54, 72]}
Что такое же, как
{(0, 0): [1 * 2, 1 * 4],
(0, 1): [2 * 1, 2 * 3, 2 * 5],
(0, 2): [3 * 2, 3 * 6],
(1, 0): [4 * 1, 4 * 5, 4 * 7],
(1, 1): [5 * 2, 5 * 4, 5 * 6, 5 * 8],
(1, 2): [6 * 3, 6 * 5, 6 * 9],
(2, 0): [7 * 4, 7 * 8],
(2, 1): [8 * 5, 8 * 7, 8 * 9],
(2, 2): [9 * 6, 9 * 8]}
Это в основном то, что было задано в вопросе; целевой результат был
[[1*2, 1*4], [2*1, 2*3, 2*5], [3*2, 3*6]],...]
Здесь я использовал словарь с (row, column) в качестве ключа, потому что так вам будет легче найти вывод для каждой ячейки.
Для самого большого ввода 30 x 30 и самого большого расстояния (I = 4) расчет на моем ноутбуке занимает около 0,188 секунды:
>>> %timeit n.process(arr_large, max_distance=4)
188 ms ± 19.2 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
import math
import numpy as np
class NeighbourProcessor:
def __init__(self):
self.arr = None
def process(self, arr, max_distance=1):
self.arr = arr
output = dict()
rows, columns = self.arr.shape
for current_row in range(rows):
for current_col in range(columns):
cell_result = self.process_cell(current_row, current_col, max_distance)
output[(current_row, current_col)] = cell_result
return output
def row_col_is_within_array(self, row, col):
if row < 0 or col < 0:
return False
if row > self.arr.shape[0] - 1 or col > self.arr.shape[1] - 1:
return False
return True
def distance(self, row, col, current_row, current_col):
distance_squared = (current_row - row) ** 2 + (current_col - col) ** 2
return np.sqrt(distance_squared)
def are_neighbours(self, row, col, current_row, current_col, max_distance):
if row == current_row and col == current_col:
return False
if not self.row_col_is_within_array(row, col):
return False
return self.distance(row, col, current_row, current_col) <= max_distance
def neighbours(self, current_row, current_col, max_distance):
start_row = math.floor(current_row - max_distance)
start_col = math.floor(current_col - max_distance)
end_row = math.ceil(current_row + max_distance)
end_col = math.ceil(current_col + max_distance)
for row in range(start_row, end_row + 1):
for col in range(start_col, end_col + 1):
if self.are_neighbours(
row, col, current_row, current_col, max_distance
):
yield row, col
def process_cell(self, current_row, current_col, max_distance):
cell_output = []
current_cell_value = self.arr[current_row][current_col]
for row, col in self.neighbours(current_row, current_col, max_distance):
neighbour_cell_value = self.arr[row][col]
cell_output.append(current_cell_value * neighbour_cell_value)
return cell_output
NeighbourProcessor.process
, так это проходит по строкам и столбцам входного массива, начиная с (0,0), который является левым верхним углом, и обрабатывает слева направо, сверху вниз до правого нижнего угла, который равен ( n_rows, n_columns), каждый раз помечая ячейку как текущую ячейку; (current_row
, current_column
).process_cell
. Это сформирует итератор с neighbours()
, который перебирает всех соседей на максимальном расстоянии I
от текущей ячейки. Вы можете проверить, как проходит логика are_neighbours
Теперь я сделаю версию только для функций с numba и постараюсь сделать обработку максимально быстрой. Также есть возможность использовать классы в numba, но они все же немного более экспериментальные и сложные, и эту проблему можно решить только с помощью функций. Читабельность кода немного страдает, но это цена, которую мы иногда платим за оптимизацию скорости.
Начну с функции process. Теперь ему нужно будет создать трехмерный массив вместо dict. Причина, по которой мы хотим создать массив заранее, заключается в том, что выделение памяти является дорогостоящим процессом, и мы хотим сделать это ровно один раз. Итак, вместо того, чтобы использовать это как вывод для myarr:
# output[(row,column)]
#
output[(0,0)] # [2,4]
output[(0,1)] # [2, 6, 10]
#..etc
Я хочу, чтобы вывод постоянного размера:
# output[row][column]
#
output[0][0] # [2, 4, nan, nan]
output[0][1] # [2, 6, 10, nan]
#..etc
Обратите внимание, что после всех «пар» выводится np.nan (не число). Любой сценарий постобработки должен просто игнорировать лишние nans.
Откуда я знаю размер третьего измерения, т.е. количество соседей для заданного макс. расстояние I? Ну, я не знаю. Кажется, это довольно сложная проблема. См., например, это , это или задачу о круге Гаусса в Википедии. Тем не менее, я могу довольно легко вычислить верхнюю границу числа соседей. В дальнейшем я предполагаю, что сосед является соседом тогда и только тогда, когда расстояние от средней точки ячеек меньше или равно I. Если вы будете создавать наброски ручкой и бумагой, вы заметите, что при увеличении количества соседей максимальное количество соседей увеличивается следующим образом:
I = 1 -> max_number_neighbours = 4
I = 2 -> max_number_neighbours = 9
I = 3 -> max_number_neighbours = 28
Вот пример скетча с 2d-массивом 10 x 10 и расстоянием I=3, когда текущая ячейка равна (4,5), количество соседей должно быть меньше или равно 28:
Этот шаблон представлен как функция максимального расстояния (I): (2*I-1)**2 + 4 -1 или
n_third_dimension = max_number_neighbours = (2*I-1)**2 + 3
Начнем с создания сигнатуры функции точки входа. В этом случае мы создаем функцию process с сигнатурой функции:
@numba.jit("f4[:,:,:](f4[:,:], f4)")
def process(arr, max_distance):
...
См. документы для других доступных типов. f4[:,:] просто означает, что ввод представляет собой 2d-массив float32, а f4[:,:,:](....) означает, что выход функции представляет собой 3d-массив float32. Затем мы создаем вывод с формулой, которую мы придумали выше. Вот одна часть волшебства: предварительное выделение памяти с помощью np.empty:
n_third_dimension = (2 * math.ceil(max_distance) - 1) ** 2 + 3
output = np.empty((*arr.shape, n_third_dimension), dtype=np.float32)
cell_result = np.empty(n_third_dimension, dtype=np.float32)
Я не буду подробно рассматривать остальную часть кода, но вы можете видеть ниже, что это немного модифицированная версия базовой линии цикла for на чистом Python.
import math
import numba
import numpy as np
@numba.njit("f4(i4,i4,i4,i4)")
def distance(row, col, current_row, current_col):
distance_squared = (current_row - row) ** 2 + (current_col - col) ** 2
return np.sqrt(distance_squared)
@numba.njit("boolean(i4,i4, i4,i4)")
def row_col_is_within_array(
row,
col,
arr_rows,
arr_cols,
):
if row < 0 or col < 0:
return False
if row > arr_rows - 1 or col > arr_cols - 1:
return False
return True
@numba.njit("boolean(i4,i4,i4,i4,f4,i4,i4)")
def are_neighbours(
neighbour_row,
neighbour_col,
current_row,
current_col,
max_distance,
arr_rows,
arr_cols,
):
if neighbour_row == current_row and neighbour_col == current_col:
return False
if not row_col_is_within_array(
neighbour_row,
neighbour_col,
arr_rows,
arr_cols,
):
return False
return (
distance(neighbour_row, neighbour_col, current_row, current_col) <= max_distance
)
@numba.njit("f4[:](f4[:,:], f4[:], i4,i4,i4,f4)")
def process_cell(
arr, cell_result, current_row, current_col, n_third_dimension, max_distance
):
for i in range(n_third_dimension):
cell_result[i] = np.nan
current_cell_value = arr[current_row][current_col]
# Potential cell neighbour area
start_row = math.floor(current_row - max_distance)
start_col = math.floor(current_col - max_distance)
end_row = math.ceil(current_row + max_distance)
end_col = math.ceil(current_col + max_distance)
arr_rows, arr_cols = arr.shape
cell_pointer = 0
for neighbour_row in range(start_row, end_row + 1):
for neighbour_col in range(start_col, end_col + 1):
if are_neighbours(
neighbour_row,
neighbour_col,
current_row,
current_col,
max_distance,
arr_rows,
arr_cols,
):
neighbour_cell_value = arr[neighbour_row][neighbour_col]
cell_result[cell_pointer] = current_cell_value * neighbour_cell_value
cell_pointer += 1
return cell_result
@numba.njit("f4[:,:,:](f4[:,:], f4)")
def process(arr, max_distance):
n_third_dimension = (2 * math.ceil(max_distance) - 1) ** 2 + 3
output = np.empty((*arr.shape, n_third_dimension), dtype=np.float32)
cell_result = np.empty(n_third_dimension, dtype=np.float32)
rows, columns = arr.shape
for current_row in range(rows):
for current_col in range(columns):
cell_result = process_cell(
arr,
cell_result,
current_row,
current_col,
n_third_dimension,
max_distance,
)
output[current_row][current_col][:] = cell_result
return output
>>> output = process(myarr, max_distance=1.0)
>>> output
array([[[ 2., 4., nan, nan],
[ 2., 6., 10., nan],
[ 6., 18., nan, nan]],
[[ 4., 20., 28., nan],
[10., 20., 30., 40.],
[18., 30., 54., nan]],
[[28., 56., nan, nan],
[40., 56., 72., nan],
>>> output[0]
array([[ 2., 4., nan, nan],
[ 2., 6., 10., nan],
[ 6., 18., nan, nan]], dtype=float32)
>>> output[0][1]
array([ 2., 6., 10., nan], dtype=float32)
# Above is the same as target: [2 * 1, 2 * 3, 2 * 5]
Базовое время выполнения подхода составило 188 мс. Теперь это 271 мкс. Это всего в 0,00144 раза больше, чем исходный код! (Сокращение времени выполнения на 99,85%. Некоторые говорят, что в 693 раза быстрее).
>>> %timeit process(arr_large, max_distance=4.0)
271 µs ± 2.88 µs per loop (mean ± std. dev. of 7 runs, 1,000 loops each)
Обратите внимание, что вы можете захотеть рассчитать расстояние по-другому, или добавить туда взвешивание, или какую-то более сложную логику, функции агрегации и т. д. Это можно еще немного оптимизировать, создав, например, лучшую оценку для максимального количества соседей. Удачи с numba, и я надеюсь, что вы узнали что-то! :)
Дополнительный совет: в numba также есть заблаговременная компиляция, которую вы можете использовать, чтобы сделать первый вызов функции быстрым!
Это отлично, большое спасибо. Более чем отвечает на мой вопрос. Было бы интересно посмотреть, чем реализация Numba отличается от Cython.
Да, я бы тоже хотел это увидеть! Обычно я просто был доволен «встраиваемым» решением numba, так как относительно легко преобразовать существующий код python для использования numba. Когда-то я пытался создать конкурирующее решение с Cython, но моим первым решениям не хватало далеко не нумба-решения, а оптимизация версии Cython заняла довольно много времени. Я не могу сейчас вспомнить, удалось ли мне сделать версию Cython в этом конкретном случае быстрее, чем версию numba. Но я определенно считаю, что возможность использовать Cython тоже ценна! Я предполагаю, что это открывает новые возможности (относительно numba).
Ожидаемый результат кажется неверным: для [1, 0] должно быть [1, 5, 7], а не [1,5]