Оптимальное умножение двух 3D-массивов переменной размерности

Я хотел бы перемножить тензоры R = {R_1, R_2, ..., R_M} и X = {X_1, X_2, ..., X_M}, где R_i и X_i — матрицы 3×3 и 3×N_i соответственно. Как я могу максимально использовать функциональные возможности NumPy при формировании массивов R_i × X_i?

Мой MWE следующий:

import numpy as np

np.random.seed(0)

M = 5
R = [np.random.rand(3, 3) for _ in range(M)]
X = []
for i in range(M):
    N_i = np.random.randint(1, 6)
    X_i = np.random.rand(3, N_i)
    X.append(X_i)
    
result = np.zeros((3, 0))
for i in range(M):
    R_i = R[i]
    X_i = X[i]
    result = np.hstack((result, np.dot(R_i, X_i)))

print(result)

Редактировать №1:

Спасибо всем, кто помог мне своими ценными комментариями. Тем временем я размышлял о роли N_i в моей реальной проблеме и пришел к выводу, что количество уникальных N_i на самом деле невелико (от 1 до 5; 2 — наиболее распространенный, но 1 тоже встречается очень часто). Будет ли в этом случае более эффективное решение проблемы умножения?

Еще один важный аспект: на практике я храню 3 × N матрицу X, а не отдельные X_i блоки. Столбцы X не упорядочены относительно. список R. Вместо этого я храню только индексный вектор p, который обеспечивает правильный порядок столбцов X. В этом случае версия einsum будет следующей (по сравнению с «прямым» умножением):

import numpy as np

M = 30
N = 100

np.random.seed(0)
p = np.random.randint(M, size=N)
R = np.random.rand(M, 3, 3)
X = np.random.rand(3, N)

result_einsum = np.einsum('ijk,ki->ji', R[p], X)

result_direct = np.zeros((3, N))
for i in range(N):
    result_direct[:, i] = np.dot(R[p[i]], X[:, i])

print(np.allclose(result_einsum, result_direct))

Редактировать № 2:

Кажется, Нумба очень помогает:

import numpy as np
import numba
from timeit import Timer

M = 30
N = 100

np.random.seed(0)
p = np.random.randint(M, size=N)
R = np.random.rand(M, 3, 3)
X = np.random.rand(3, N)

@numba.njit
def numba_direct(R, p, X, result_direct, N):
    for i in range(N):
        p_i = p[i]
        for j in range(3):
            res = 0.0
            for k in range(3):
                res += R[p_i, j, k] * X[k, i]
            result_direct[j, i] = res

result_direct = np.zeros((3, N))
numba_direct(R, p, X, result_direct, N)
result_einsum = np.einsum('ijk,ki->ji', R[p], X)
print(np.allclose(result_einsum, result_direct))

ntimes = 10000

einsum_timer = Timer(lambda: np.einsum('ijk,ki->ji', R[p], X))
einsum_time = einsum_timer.timeit(number=ntimes)

numba_direct_timer = Timer(lambda: numba_direct(R, p, X, result_direct, N))
numba_direct_time = numba_direct_timer.timeit(number=ntimes)

print(f'Einsum runtime: {einsum_time:.4f} seconds')
print(f'Numba direct runtime: {numba_direct_time:.4f} seconds')

Время выполнения приведенного выше кода следующее:

Einsum runtime: 0.0979 seconds
Numba direct runtime: 0.0129 seconds
Почему в Python есть оператор "pass"?
Почему в Python есть оператор "pass"?
Оператор pass в Python - это простая концепция, которую могут быстро освоить даже новички без опыта программирования.
Некоторые методы, о которых вы не знали, что они существуют в Python
Некоторые методы, о которых вы не знали, что они существуют в Python
Python - самый известный и самый простой в изучении язык в наши дни. Имея широкий спектр применения в области машинного обучения, Data Science,...
Основы Python Часть I
Основы Python Часть I
Вы когда-нибудь задумывались, почему в программах на Python вы видите приведенный ниже код?
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
Алиса и Боб имеют неориентированный граф из n узлов и трех типов ребер:
Оптимизация кода с помощью тернарного оператора Python
Оптимизация кода с помощью тернарного оператора Python
И последнее, что мы хотели бы показать вам, прежде чем двигаться дальше, это
Советы по эффективной веб-разработке с помощью Python
Советы по эффективной веб-разработке с помощью Python
Как веб-разработчик, Python может стать мощным инструментом для создания эффективных и масштабируемых веб-приложений.
4
0
114
2
Перейти к ответу Данный вопрос помечен как решенный

Ответы 2

Я не думаю, что еще что-то нужно сделать.
В зависимости от того, как распределены N_i, например, если половина N_i равна 5, а другая половина 10, может быть полезно иметь два X (один M×5 X для 5 первых столбцов и один (M/2)× 5 X для остальных). То есть, вообще говоря, работать по столбцам, а не по "X"
Или, в некоторых других случаях, заполнить X нулями (например, если N_i равно 10000 для 99% i и 9999 для остальных%).

Но, грубо говоря, numpy предназначен для обработки однородного массива измерений.

Но есть один момент: никогда не используйте hstackstackvstack, append или подобные вещи в цикле for. Это очень медленно. Это подразумевает перераспределение нового массива и копирование всего содержимого предыдущего массива в новый.

Либо используйте простые списки Python для накопления результатов. И делайте укладку только в конце

res=[]
for i in range(M):
    R_i = R[i]
    X_i = X[i]
    res.append(np.dot(R_i, X_i))

np.hstack(res)

Или заранее выделите массив результатов и сохраните результат по мере итерации.

result=np.empty((3,sum(x.shape[1] for x in X)))
k=0
for i in range(M):
    R_i = R[i]
    X_i = X[i]
    result[:,k:k+X_i.shape[1]]=np.dot(R_i,X_i)
    k+=X_i.shape[1]

Спасибо за Ваш ответ. Я боялся, что это так. А также спасибо за ваши инструкции по поводу операций "расширения". Если нет возможности «векторизовать» эту процедуру, то и сохранять блоки в моем случае не нужно, потому что я могу перезаписать X на R*X.

TobiR 27.06.2024 13:30

Если позволите, вам следует немного подождать, чтобы убедиться, что у mozway или hpaulj есть шанс, прежде чем подводить итоги. На самом деле совсем необычно, что они видят способы (например, какой-нибудь причудливый einsum), хотя я думаю, что их нет.

chrslg 27.06.2024 14:03
Ответ принят как подходящий

Я знаю, что я не @mozway и не @hpaulj (это относится к комментарию @chrslg), но действительно кажется, что есть реальное решение с einsum:

np.einsum("ijk,ki->ji",
          np.repeat(R, [x.shape[1] for x in X], axis=0),
          np.hstack(X))

Вот полный код, с которым я тестировал:

import numpy as np
from timeit import Timer

np.random.seed(0)

L, M, timeit_n_times = 3, 5, 10_000
R = [np.random.rand(L, L) for _ in range(M)]
X = []
for i in range(M):
    N_i = np.random.randint(1, 6)
    X_i = np.random.rand(L, N_i)
    X.append(X_i)

def original(R, X):    
    result = np.zeros((L, 0))
    for i in range(M):
        R_i = R[i]
        X_i = X[i]
        result = np.hstack((result, np.dot(R_i, X_i)))
    return result

def list_stack(R, X):
    result = []
    for i in range(M):
        R_i = R[i]
        X_i = X[i]
        result.append(np.dot(R_i, X_i))
    return np.hstack(result)

def preallocate(R, X):
    result=np.empty((L, sum(x.shape[1] for x in X)))
    k=0
    for i in range(M):
        R_i = R[i]
        X_i = X[i]
        result[:, k:k+X_i.shape[1]] = np.dot(R_i, X_i)
        k += X_i.shape[1]
    return result

def with_einsum(R, X):
    return np.einsum("ijk,ki->ji",
                     np.repeat(R, [x.shape[1] for x in X], axis=0),
                     np.hstack(X))
    
assert np.allclose(original(R, X), list_stack(R, X))
assert np.allclose(original(R, X), preallocate(R, X))
assert np.allclose(original(R, X), with_einsum(R, X))

print("original", Timer(lambda: original(R, X)).timeit(timeit_n_times))
print("list_stack", Timer(lambda: list_stack(R, X)).timeit(timeit_n_times))
print("preallocate", Timer(lambda: preallocate(R, X)).timeit(timeit_n_times))
print("with_einsum", Timer(lambda: with_einsum(R, X)).timeit(timeit_n_times))

Некоторые наблюдения:

  • Время расчета list_stack() и preallocate() незначительно отличается, но всегда быстрее, чем original(), что также предлагается и подразумевается в ответе @chrslg.
  • Будет ли with_einsum() быстрее или медленнее остальных, во многом зависит от формы задачи:
    • Для маленьких, но многих L×L матриц (L==3 в вопросе) with_einsum() выигрывает.
      Вот результаты для L, M, timeit_n_times = 3, 500, 10_000:
      original 19.07941191700229
      list_stack 6.437358160997974
      preallocate 7.638774587001535
      with_einsum 3.6944152869982645
      
    • Для больших, но немногих L×L матриц with_einsum() проигрывает.
      Вот результаты для L, M, timeit_n_times = 100, 5, 100_000:
      original 6.661783802999707
      list_stack 4.67112236200046
      preallocate 4.94899292899936
      with_einsum 28.233751856001618
      

Я не проверял влияние размера и вариации N_i.

Обновлять

Основываясь на обновлении вопроса и мыслях @chrslg, я попробовал, будет ли заполнение нулями жизнеспособным способом. Итог: вероятно, нет (по крайней мере, не в данном примере).

Вот новый код тестирования:

from timeit import Timer
import numpy as np

M, N, timeit_n_times = 30, 100, 10_000

np.random.seed(0)
p = np.random.randint(M, size=N)
R = np.random.rand(M, 3, 3)
X = np.random.rand(3, N)

einsum = lambda: np.einsum('ijk,ki->ji', R[p], X)

def direct():
    res = np.zeros((3, N))
    for i in range(N):
        res[:, i] = np.dot(R[p[i]], X[:, i])
    return res

# Rearrange data for padding
max_count = np.max(np.unique(p, return_counts=True)[1])
counts = np.zeros(M, dtype=int)

X_padded = np.zeros((M, max_count, 3))
for i, idx in enumerate(p):
    X_padded[idx, counts[idx]] = X[:, i]
    counts[idx] += 1

padded = lambda: np.einsum('ijk,ilk->ilj', R, X_padded)
    
result_einsum = einsum()
result_direct = direct()
result_padded_raw = padded()

# Extract padding result (reverse steps of getting from X to X_padded)
result_padded = np.zeros((3, N))
counts = np.zeros(M, dtype=int)
for i, idx in enumerate(p):
    result_padded[:, i] = result_padded_raw[idx, counts[idx]]
    counts[idx] += 1

assert np.allclose(result_einsum, result_direct)
assert np.allclose(result_padded, result_direct)

print("einsum", Timer(einsum).timeit(timeit_n_times))
print("direct", Timer(direct).timeit(timeit_n_times))
print("padded", Timer(padded).timeit(timeit_n_times))

Здесь мы сначала переставляем данные в X_padded, который имеет M×max_count×3-образную форму, где max_count — максимальное количество ссылок на один из индексов в R из p. Мы итеративно заполняем X_padded для каждого индекса, оставляя неиспользуемое пространство заполненным нулями. В конце концов, мы снова можем использовать версию einsum для вычисления результата (padded = lambda: np.einsum('ijk,ilk->ilj', R, X_padded)). Если мы хотим сравнить результат с результатами других методов, нам нужно переставить его обратно, по сути инвертируя шаги перехода от X к X_padded.

Наблюдения:

  • С точки зрения памяти мы имеем:

    • Сохранено на стороне R, так как нам больше не нужно копировать отдельные матрицы 3×3.
    • Оплачивается на стороне X, так как нам нужно заполнение нулями.
  • По скорости мы, кажется, немного проиграли;
    вот результаты для M, N, timeit_n_times = 30, 100, 100_000:

    einsum 0.7587459350002064
    direct 13.305958173999898
    padded 1.502786388000004
    

    Обратите внимание, что сюда даже не входит время на перестановку данных. Так что, вероятно, отступы здесь не помогут – по крайней мере, не так, как я пытался.

Спасибо, @simon, за твои тщательные тесты и полезные объяснения. В моих случаях L = 3 фиксировано (матрицы R_i представляют собой трехмерное вращение), а N_i и M изменяются (на самом деле у меня есть M кластеров с объектами N_1, N_2, ..., N_M, которые я хочу повернуть). При таком ограничении кажется, что для большого диапазона M или N_ieinsum работает лучше, чем list_stack, но когда оба параметра малы, list_stack работает быстрее. Я предпочитаю версию einsum, потому что она довольно компактная. Еще раз спасибо за вашу любезную помощь.

TobiR 27.06.2024 17:00

@TobiR Спасибо за ваш отзыв. Что касается компактности версии einsum: хотя она выглядит компактнее, чем есть на самом деле. Что происходит в np.repeat(R, [x.shape[1] for x in X], axis=0), так это то, что каждая матрица в R повторяется столько раз, сколько столбцов имеется в соответствующей матрице в X. Это тратит огромное количество памяти и времени на избыточные данные (я предполагаю). Однако я также не нашел лучшего решения из-за переменных форм в X (о чем уже говорил @chrslg). Так что, возможно, есть еще лучший способ решить эту проблему.

simon 27.06.2024 17:11

Итак, вы используете отдельный R для каждого столбца объединенных массивов X.

hpaulj 27.06.2024 18:10

@hpaulj Точно. Лучшего способа я не нашел из-за разных форм в X. Я также попробовал использовать список (реплицированных) матриц R вместо использования np.repeat(), чтобы получить только дублированные ссылки, а не дублированную память. Это работало так же хорошо, но было немного медленнее. Правда, тест скорости я не сохранял и память не мониторил ни в коем случае.

simon 27.06.2024 22:40

Приятно (должен признаться, что когда я получил степень бакалавра и предложил немного подождать, прежде чем присуждать ее, я не совсем ожидал из-за неоднородности форм, что мой степень бакалавра действительно находится под угрозой :D). Для N_i, я думаю, лучше, если они тоже не будут слишком большими из-за повтора. Но если они достаточно большие, их повторение можно было бы дополнить 0 и заменить каким-нибудь трюком с шагом. Это зависит от того, насколько они разные N_i. Итог: лучший способ — протестировать на реальных данных.

chrslg 28.06.2024 00:35

@chrslg Спасибо и полностью согласен с вашим выводом. Честно говоря, я тоже подумал, что было немного рано, когда ОП присвоил степень бакалавра. (Это не значит, что я думаю, что ваше решение того не стоило бы, но присуждение степени бакалавра без альтернатив и без некоторого ожидания, вероятно, было не идеальным – вероятно, даже на данном этапе еще слишком рано.) Я также согласен, что стоит попробовать отступы, в зависимости от фактического распределения N_i.

simon 28.06.2024 09:33

Спасибо за ваши комментарии и извините за раннюю пометку другого ответа как BA (я регулярно писал в SO по другим темам, где обычно получал только один ответ). Я немного отредактировал свой вопрос, а также сделал несколько замечаний по поводу N_i в своем практическом применении.

TobiR 28.06.2024 12:44

@TobiR Спасибо за ваш отзыв! Что касается вашего обновления/редактирования: на данный момент у меня заканчиваются идеи, и я предполагаю, что вы не сможете работать намного быстрее, чем с вашей версией einsum (кроме, возможно, попытки ускорения с помощью чего-то вроде Numba).

simon 28.06.2024 14:00

@TobiR Я добавил код и провел эксперимент с заполнением нулями. См. раздел Обновление.

simon 28.06.2024 16:04

@simon Спасибо за ваши новости. Я пробовал Numba с наивной (подобной Фортрану) реализацией, и это действительно очень помогает (см. Редактирование № 2 в моем вопросе). Сначала я тупо замерял время компиляции вместе с повторными выполнениями, но теперь результаты вполне обнадеживают.

TobiR 28.06.2024 16:36

@TobiR Звучит хорошо! Возможно, вы захотите превратить это обновление в принятый ответ: D

simon 28.06.2024 16:54

Другие вопросы по теме