Самый быстрый способ найти ближайших соседей в массиве NumPy

Как быстрее всего выполнить операции над соседними элементами массива mxn на расстоянии $l$ (где m, n большие). Если бы это было изображение, это приравнивалось бы к операции над окружающими пикселями. Чтобы было понятнее, я создал новый массив с соседями соответствующего источника.

Учитывая некоторый массив, например

x = [[1,2,3],
     [4,5,6], 
     [7,8,9]]

Если бы я взял элемент [0,0] и хотел, чтобы окружающие элементы находились в $l$=1, мне понадобились бы элементы [0,1] и [1,0] (намлей 2 и 4). Желаемый результат будет выглядеть примерно так

y = [[[2,4], [1,3,5], [2,6]], 
     [[1,5,7], [4,6,2,8], [3,9,5]],
     [[4,8], [7,5,9], [8,6]]]

Я пытался поиграть с kdTree из scipy.spatial и знаю о https://stackoverflow.com/a/45742628/20451990, но, насколько я могу судить, это на самом деле поиск ближайших точек данных, тогда как я хочу найти ближайшие элементы массива. Я думаю, это можно было бы наивно сделать, перебирая, но это очень медленно...

Конечная цель здесь состоит в том, чтобы сгенерировать комбинации соседних элементов массива, из которых я буду брать произведение. Для приведенного выше примера это может быть

 [[1*2, 1*4], [2*1, 2*3, 2*5], [3*2, 3*6]],...]

Ожидаемый результат кажется неверным: для [1, 0] должно быть [1, 5, 7], а не [1,5]

RomanPerekhrest 14.02.2023 20:47

Как вы, кажется, заметили, классификаторы ближайшего соседа не имеют ничего общего с индексами соседних массивов...

Brian 14.02.2023 20:48

Какую проблему вы пытаетесь решить, делая это? Если это что-то вроде нахождения суммы всех таких «соседних» значений, есть гораздо более эффективные в вычислительном отношении способы сделать это, чем создание рваного массива всех соседей, как вы здесь (например, scipy.signal.convolve2d([[0,1,0],[1,0,1],[0,1,0]], x, "same"))

Brian 14.02.2023 20:53

@Brian Я пытаюсь найти кумулянты x-точки элементов массива на определенном расстоянии. Спасибо за быстрый ответ!

notastringtheorist 14.02.2023 21:07

Это похоже на проблему XY для меня. Генерация этого набора индексов часто является очень плохой идеей для производительности. Даже если есть веская причина, Numpy не поддерживает эффективно зубчатые массивы (поскольку вложенные массивы должны быть объектами, а Numpy не векторизует операции над ними, что приводит к огромным накладным расходам). Дело в том, что результат, который вы предоставляете, представляет собой зубчатый массив... Пожалуйста, опишите, что вы хотите с ним делать (начиная с того, что такое "кумулянты x-точки" и, в частности, как вы хотели бы их вычислить в вашем случае).

Jérôme Richard 14.02.2023 21:11

Я отредактирую вопрос, чтобы сделать его более понятным

notastringtheorist 14.02.2023 21:15

Насколько велики ваши массивы? А какие значения дистанции "Я" вас интересуют? И как вы собираетесь агрегировать, использовать или визуализировать выходной массив, поскольку во многих случаях он будет настолько большим, что просто на глаз увидеть что-либо полезное невозможно..?

np8 15.02.2023 07:35

Каждый массив имеет размер примерно 30x30 элементов, l будет меньше 4, и чтобы ответить на вопрос о визуализации, нет необходимости визуализировать его, это будет связано с моделью Изинга в физике, поэтому мы собираемся экстремировать некоторую энергию.

notastringtheorist 15.02.2023 11:34

Итак, вас интересуют [[1*2, 1*4], [2*1, 2*3, 2*5], [3*2, 3*6]],...] или вы хотите просто получить их максимум, например [[1*4], [2*5], [3*6]],...] в качестве вывода?

np8 15.02.2023 12:17

Во-первых, все они важны :)

notastringtheorist 15.02.2023 15:36
Инструменты для веб-скрапинга с открытым исходным кодом: Python Developer Toolkit
Инструменты для веб-скрапинга с открытым исходным кодом: Python Developer Toolkit
Веб-скрейпинг, как мы все знаем, это дисциплина, которая развивается с течением времени. Появляются все более сложные средства борьбы с ботами, а...
Библиотека для работы с мороженым
Библиотека для работы с мороженым
Лично я попрощался с операторами print() в python. Без шуток.
Эмиссия счетов-фактур с помощью Telegram - Python RPA (BotCity)
Эмиссия счетов-фактур с помощью Telegram - Python RPA (BotCity)
Привет, люди RPA, это снова я и я несу подарки! В очередном моем приключении о том, как создавать ботов для облегчения рутины. Вот, думаю, стоит...
Пошаговое руководство по созданию собственного Slackbot: От установки до развертывания
Пошаговое руководство по созданию собственного Slackbot: От установки до развертывания
Шаг 1: Создание приложения Slack Чтобы создать Slackbot, вам необходимо создать приложение Slack. Войдите в свою учетную запись Slack и перейдите на...
Учебник по веб-скрапингу
Учебник по веб-скрапингу
Привет, ребята... В этот раз мы поговорим о веб-скрейпинге. Целью этого обсуждения будет узнать и понять, что такое веб-скрейпинг, а также узнать, как...
Тонкая настройка GPT-3 с помощью Anaconda
Тонкая настройка GPT-3 с помощью Anaconda
Зарегистрируйте аккаунт Open ai, а затем получите ключ API ниже.
1
10
92
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Основные выводы

  • С numba можно получить примерно в 690 раз более быстрые алгоритмы, чем с наивным кодом Python с циклами for и добавлениями списка.
  • С numba функции имеют сигнатуру; вы явно указываете, что такое тип данных.
  • Избегайте перераспределения памяти. Старайтесь заранее выделять память под любые массивы. По возможности повторно используйте контейнеры данных (см.: cell_result в numbafied process_cell())
  • Numba не очень удобен с классами (по крайней мере, с кодом в стиле ООП), динамически типизированным материалом, контейнерами со смешанными типами или контейнерами, изменяющимися в размере. Отдавайте предпочтение простым функциям и типизированным структурам определенного размера. См. также: Поддерживаемые функции Python
  • Numba любит циклы for, и они быстрые!

Предисловия

Вы просили самый быстрый способ вычислить это. У меня не было базовой линии, поэтому я сначала создал чистое решение для цикла for на Python в качестве базовой линии. Затем я использовал numba, чтобы код работал быстрее. Скорее всего, это не самая быстрая реализация, но, по крайней мере, она намного быстрее, чем наивный подход с использованием цикла for на чистом Python.

Итак, если вы не знакомы с numba, это хороший способ узнать о нем немного :)


Используемые тестовые данные

Я использую две части тестовых данных. Во-первых, простой массив, указанный в вопросе. Я называю это myarr, и он используется для легкого сравнения вывода:

import numpy as np

myarr = np.array(
    [
        [1, 2, 3],
        [4, 5, 6],
        [7, 8, 9],
    ],
    dtype=np.float32,
)

Второй набор данных предназначен для сравнительного анализа. Вы упомянули, что массивы будут иметь размер 30 x 30, а расстояние I будет меньше 4.

arr_large = np.arange(1, 30 * 30 + 1, 1, dtype=np.float32).reshape(30, 30)

Другими словами, arr_large представляет собой двумерный массив 30 x 30:

>>> arr_large
array([[  1.,   2.,   3.,   4.,   5.,   6.,   7.,   8.,   9.,  10.,  11.,
         12.,  13.,  14.,  15.,  16.,  17.,  18.,  19.,  20.,  21.,  22.,
         23.,  24.,  25.,  26.,  27.,  28.,  29.,  30.],
        ...
       [871., 872., 873., 874., 875., 876., 877., 878., 879., 880., 881.,
        882., 883., 884., 885., 886., 887., 888., 889., 890., 891., 892.,
        893., 894., 895., 896., 897., 898., 899., 900.]], dtype=float32)

Я указал dtype, потому что на этапе оптимизации необходимо указать тип данных. Для решения на чистом Python это, конечно, вообще не нужно.

Базовое решение: чистый Python с циклами for

Я реализовал базовое решение с помощью класса python и циклов for. Результат выглядит так (источник NeighbourProcessor ниже):

Пример вывода с входным массивом 3 x 3 (I=1)

n = NeighbourProcessor()
output = n.process(myarr, max_distance=1)

Результат тогда

>>> output
{(0, 0): [2, 4],
 (0, 1): [2, 6, 10],
 (0, 2): [6, 18],
 (1, 0): [4, 20, 28],
 (1, 1): [10, 20, 30, 40],
 (1, 2): [18, 30, 54],
 (2, 0): [28, 56],
 (2, 1): [40, 56, 72],
 (2, 2): [54, 72]}

Что такое же, как

{(0, 0): [1 * 2, 1 * 4],
 (0, 1): [2 * 1, 2 * 3, 2 * 5],
 (0, 2): [3 * 2, 3 * 6],
 (1, 0): [4 * 1, 4 * 5, 4 * 7],
 (1, 1): [5 * 2, 5 * 4, 5 * 6, 5 * 8],
 (1, 2): [6 * 3, 6 * 5, 6 * 9],
 (2, 0): [7 * 4, 7 * 8],
 (2, 1): [8 * 5, 8 * 7, 8 * 9],
 (2, 2): [9 * 6, 9 * 8]}

Это в основном то, что было задано в вопросе; целевой результат был

 [[1*2, 1*4], [2*1, 2*3, 2*5], [3*2, 3*6]],...]

Здесь я использовал словарь с (row, column) в качестве ключа, потому что так вам будет легче найти вывод для каждой ячейки.

Базовая производительность

Для самого большого ввода 30 x 30 и самого большого расстояния (I = 4) расчет на моем ноутбуке занимает около 0,188 секунды:

>>> %timeit n.process(arr_large, max_distance=4)
188 ms ± 19.2 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)

Код для NeighbourProcessor

import math
import numpy as np


class NeighbourProcessor:
    def __init__(self):
        self.arr = None

    def process(self, arr, max_distance=1):
        self.arr = arr
        output = dict()
        rows, columns = self.arr.shape
        for current_row in range(rows):
            for current_col in range(columns):
                cell_result = self.process_cell(current_row, current_col, max_distance)
                output[(current_row, current_col)] = cell_result
        return output

    def row_col_is_within_array(self, row, col):
        if row < 0 or col < 0:
            return False
        if row > self.arr.shape[0] - 1 or col > self.arr.shape[1] - 1:
            return False
        return True

    def distance(self, row, col, current_row, current_col):
        distance_squared = (current_row - row) ** 2 + (current_col - col) ** 2
        return np.sqrt(distance_squared)

    def are_neighbours(self, row, col, current_row, current_col, max_distance):
        if row == current_row and col == current_col:
            return False
        if not self.row_col_is_within_array(row, col):
            return False
        return self.distance(row, col, current_row, current_col) <= max_distance

    def neighbours(self, current_row, current_col, max_distance):
        start_row = math.floor(current_row - max_distance)
        start_col = math.floor(current_col - max_distance)
        end_row = math.ceil(current_row + max_distance)
        end_col = math.ceil(current_col + max_distance)
        for row in range(start_row, end_row + 1):
            for col in range(start_col, end_col + 1):
                if self.are_neighbours(
                    row, col, current_row, current_col, max_distance
                ):
                    yield row, col

    def process_cell(self, current_row, current_col, max_distance):
        cell_output = []
        current_cell_value = self.arr[current_row][current_col]
        for row, col in self.neighbours(current_row, current_col, max_distance):
            neighbour_cell_value = self.arr[row][col]
            cell_output.append(current_cell_value * neighbour_cell_value)
        return cell_output

Краткое объяснение

  • Итак, что делает NeighbourProcessor.process, так это проходит по строкам и столбцам входного массива, начиная с (0,0), который является левым верхним углом, и обрабатывает слева направо, сверху вниз до правого нижнего угла, который равен ( n_rows, n_columns), каждый раз помечая ячейку как текущую ячейку; (current_row, current_column).
  • Для каждой текущей ячейки обработайте ее в process_cell. Это сформирует итератор с neighbours(), который перебирает всех соседей на максимальном расстоянии I от текущей ячейки. Вы можете проверить, как проходит логика are_neighbours

Более быстрое решение: использование numba и предварительного выделения памяти

Теперь я сделаю версию только для функций с numba и постараюсь сделать обработку максимально быстрой. Также есть возможность использовать классы в numba, но они все же немного более экспериментальные и сложные, и эту проблему можно решить только с помощью функций. Читабельность кода немного страдает, но это цена, которую мы иногда платим за оптимизацию скорости.

Начну с функции process. Теперь ему нужно будет создать трехмерный массив вместо dict. Причина, по которой мы хотим создать массив заранее, заключается в том, что выделение памяти является дорогостоящим процессом, и мы хотим сделать это ровно один раз. Итак, вместо того, чтобы использовать это как вывод для myarr:

# output[(row,column)]
#
output[(0,0)] # [2,4]
output[(0,1)] # [2, 6, 10]
#..etc

Я хочу, чтобы вывод постоянного размера:

# output[row][column]
#
output[0][0] # [2, 4, nan, nan]
output[0][1] # [2, 6, 10, nan]
#..etc

Обратите внимание, что после всех «пар» выводится np.nan (не число). Любой сценарий постобработки должен просто игнорировать лишние nans.

Решение для требуемого размера для предварительно выделенного массива

Откуда я знаю размер третьего измерения, т.е. количество соседей для заданного макс. расстояние I? Ну, я не знаю. Кажется, это довольно сложная проблема. См., например, это , это или задачу о круге Гаусса в Википедии. Тем не менее, я могу довольно легко вычислить верхнюю границу числа соседей. В дальнейшем я предполагаю, что сосед является соседом тогда и только тогда, когда расстояние от средней точки ячеек меньше или равно I. Если вы будете создавать наброски ручкой и бумагой, вы заметите, что при увеличении количества соседей максимальное количество соседей увеличивается следующим образом:

I = 1 -> max_number_neighbours = 4
I = 2 -> max_number_neighbours = 9
I = 3 -> max_number_neighbours = 28

Вот пример скетча с 2d-массивом 10 x 10 и расстоянием I=3, когда текущая ячейка равна (4,5), количество соседей должно быть меньше или равно 28:

Этот шаблон представлен как функция максимального расстояния (I): (2*I-1)**2 + 4 -1 или

n_third_dimension = max_number_neighbours = (2*I-1)**2 + 3

Рефакторинг кода для работы с numba

Начнем с создания сигнатуры функции точки входа. В этом случае мы создаем функцию process с сигнатурой функции:

@numba.jit("f4[:,:,:](f4[:,:], f4)")
def process(arr, max_distance):
    ...

См. документы для других доступных типов. f4[:,:] просто означает, что ввод представляет собой 2d-массив float32, а f4[:,:,:](....) означает, что выход функции представляет собой 3d-массив float32. Затем мы создаем вывод с формулой, которую мы придумали выше. Вот одна часть волшебства: предварительное выделение памяти с помощью np.empty:

    n_third_dimension = (2 * math.ceil(max_distance) - 1) ** 2 + 3
    output = np.empty((*arr.shape, n_third_dimension), dtype=np.float32)
    cell_result = np.empty(n_third_dimension, dtype=np.float32)

Онемевший код

Я не буду подробно рассматривать остальную часть кода, но вы можете видеть ниже, что это немного модифицированная версия базовой линии цикла for на чистом Python.

import math

import numba
import numpy as np


@numba.njit("f4(i4,i4,i4,i4)")
def distance(row, col, current_row, current_col):
    distance_squared = (current_row - row) ** 2 + (current_col - col) ** 2
    return np.sqrt(distance_squared)


@numba.njit("boolean(i4,i4, i4,i4)")
def row_col_is_within_array(
    row,
    col,
    arr_rows,
    arr_cols,
):
    if row < 0 or col < 0:
        return False
    if row > arr_rows - 1 or col > arr_cols - 1:
        return False
    return True


@numba.njit("boolean(i4,i4,i4,i4,f4,i4,i4)")
def are_neighbours(
    neighbour_row,
    neighbour_col,
    current_row,
    current_col,
    max_distance,
    arr_rows,
    arr_cols,
):
    if neighbour_row == current_row and neighbour_col == current_col:
        return False
    if not row_col_is_within_array(
        neighbour_row,
        neighbour_col,
        arr_rows,
        arr_cols,
    ):
        return False
    return (
        distance(neighbour_row, neighbour_col, current_row, current_col) <= max_distance
    )


@numba.njit("f4[:](f4[:,:], f4[:], i4,i4,i4,f4)")
def process_cell(
    arr, cell_result, current_row, current_col, n_third_dimension, max_distance
):
    for i in range(n_third_dimension):
        cell_result[i] = np.nan

    current_cell_value = arr[current_row][current_col]

    # Potential cell neighbour area
    start_row = math.floor(current_row - max_distance)
    start_col = math.floor(current_col - max_distance)
    end_row = math.ceil(current_row + max_distance)
    end_col = math.ceil(current_col + max_distance)

    arr_rows, arr_cols = arr.shape

    cell_pointer = 0
    for neighbour_row in range(start_row, end_row + 1):
        for neighbour_col in range(start_col, end_col + 1):
            if are_neighbours(
                neighbour_row,
                neighbour_col,
                current_row,
                current_col,
                max_distance,
                arr_rows,
                arr_cols,
            ):
                neighbour_cell_value = arr[neighbour_row][neighbour_col]
                cell_result[cell_pointer] = current_cell_value * neighbour_cell_value
                cell_pointer += 1
    return cell_result


@numba.njit("f4[:,:,:](f4[:,:], f4)")
def process(arr, max_distance):
    n_third_dimension = (2 * math.ceil(max_distance) - 1) ** 2 + 3
    output = np.empty((*arr.shape, n_third_dimension), dtype=np.float32)
    cell_result = np.empty(n_third_dimension, dtype=np.float32)

    rows, columns = arr.shape

    for current_row in range(rows):
        for current_col in range(columns):
            cell_result = process_cell(
                arr,
                cell_result,
                current_row,
                current_col,
                n_third_dimension,
                max_distance,
            )
            output[current_row][current_col][:] = cell_result
    return output

Пример вывода

>>> output = process(myarr, max_distance=1.0)
>>> output
array([[[ 2.,  4., nan, nan],
        [ 2.,  6., 10., nan],
        [ 6., 18., nan, nan]],

       [[ 4., 20., 28., nan],
        [10., 20., 30., 40.],
        [18., 30., 54., nan]],

       [[28., 56., nan, nan],
        [40., 56., 72., nan],

>>> output[0]
array([[ 2.,  4., nan, nan],
       [ 2.,  6., 10., nan],
       [ 6., 18., nan, nan]], dtype=float32)


>>> output[0][1]
array([ 2.,  6., 10., nan], dtype=float32)
# Above is the same as target: [2 * 1, 2 * 3, 2 * 5]

Скорость оцифрованного кода и закрывающих слов

Базовое время выполнения подхода составило 188 мс. Теперь это 271 мкс. Это всего в 0,00144 раза больше, чем исходный код! (Сокращение времени выполнения на 99,85%. Некоторые говорят, что в 693 раза быстрее).

>>> %timeit process(arr_large, max_distance=4.0)
271 µs ± 2.88 µs per loop (mean ± std. dev. of 7 runs, 1,000 loops each)

Обратите внимание, что вы можете захотеть рассчитать расстояние по-другому, или добавить туда взвешивание, или какую-то более сложную логику, функции агрегации и т. д. Это можно еще немного оптимизировать, создав, например, лучшую оценку для максимального количества соседей. Удачи с numba, и я надеюсь, что вы узнали что-то! :)

Дополнительный совет: в numba также есть заблаговременная компиляция, которую вы можете использовать, чтобы сделать первый вызов функции быстрым!

Это отлично, большое спасибо. Более чем отвечает на мой вопрос. Было бы интересно посмотреть, чем реализация Numba отличается от Cython.

notastringtheorist 15.02.2023 21:07

Да, я бы тоже хотел это увидеть! Обычно я просто был доволен «встраиваемым» решением numba, так как относительно легко преобразовать существующий код python для использования numba. Когда-то я пытался создать конкурирующее решение с Cython, но моим первым решениям не хватало далеко не нумба-решения, а оптимизация версии Cython заняла довольно много времени. Я не могу сейчас вспомнить, удалось ли мне сделать версию Cython в этом конкретном случае быстрее, чем версию numba. Но я определенно считаю, что возможность использовать Cython тоже ценна! Я предполагаю, что это открывает новые возможности (относительно numba).

np8 15.02.2023 21:18

Другие вопросы по теме

Объединить значение объекта, содержащее массив: Javascript
Суммируйте цены из массива, пока они не будут соответствовать определенному значению, которое определено в другом массиве.
Разобрать отформатированную строку с метками, написанными ЗАГЛАВНЫМИ БУКВАМИ, за которыми следует их значение, для создания ассоциативного массива.
Массив не расширяется при использовании в if
Создание таблицы из массива в React
Изменение вхождений значения в матрице
Почему адреса распределенного 2D-массива не такие, как я ожидаю?
Является ли мелкая копия массива дешевой, независимо от размера?
Сортировка слиянием в C возвращает список, содержащий дубликаты одних и тех же 3-4 чисел. Другие номера отсутствуют в отсортированном списке
Я застрял. Мне нужно настроить свои циклы, чтобы они продолжали сравнивать два моих массива, но не распечатывали все лишние символы.