Как улучшить работу cwiseProduct?

У меня есть эта функция, которая вызывается несколько раз в моем коде:

    void Grid::computeFVarsSigma(const int DFAType,
                                const Matrix& D_sigma,
                                const Matrix& Phi,
                                const Matrix& DPhiDx,
                                const Matrix& DPhiDy,
                                const Matrix& DPhiDz,
                                Matrix& Rho,
                                Matrix& DRhoDx,
                                Matrix& DRhoDy,
                                Matrix& DRhoDz)
{
    // auto PhiD = Phi * D_sigma;
    Rho = ((Phi * D_sigma).cwiseProduct(Phi)).rowwise().sum();

    if (DFAType == 1)
    {
        DRhoDx = 2. * ((Phi * D_sigma).cwiseProduct(DPhiDx)).rowwise().sum();
        DRhoDy = 2. * ((Phi * D_sigma).cwiseProduct(DPhiDy)).rowwise().sum();
        DRhoDz = 2. * ((Phi * D_sigma).cwiseProduct(DPhiDz)).rowwise().sum();
    }
}

Для варианта использования, который я взял для сравнения, входные массивы имеют следующие размеры:

D_sigma     42 42
Phi     402264 42
DPhiDx  402264 42
DPhiDy  402264 42
DPhiDz  402264 42

Среднее время вызова этой функции 12 раз равно 0.621 seconds, измерено с помощью std::chrono::high_resolution_clock. Я выполняю эти расчеты на AMD Ryzen5, скомпилированном с помощью g++ 7.5.0. Я могу поднять версию компилятора, но сейчас меня больше всего интересует оптимизация кода.

Одна идея, которую я хотел бы изучить, состоит в том, чтобы хранить вычисления cwiseProduct для DRhoDx, DRhoDy и DRhoDz непосредственно в матрице 3xNGridPoints. Однако я пока не знаю, как это сделать.

Есть ли другие манипуляции, которыми я мог бы попытаться улучшить эту функцию?

Заранее благодарим за ваши комментарии.


Я хотел бы поблагодарить @chatz и @Homer512 за очень хорошие предложения. Я был очень доволен однострочной оптимизацией, предложенной @chatz, однако предложения @Homer512 резко изменили производительность, как показано на рисунке ниже (особая благодарность @Homer512!). Я обязательно воспользуюсь обоими предложениями в качестве отправной точки для улучшения других частей моего кода.

Обратите внимание: я использую double и на рисунке ниже возвращаю параметр и возвращаю tuple означает ту же функцию, которая возвращает результат в виде кортежа и параметров соответственно.

Пожалуйста, будьте более конкретны в отношении «входные массивы имеют размеры 1x402264»: так что все (?) Matrix на самом деле являются RowVectors? (Если это так, Phi * D_sigma не будет работать).

chtz 19.01.2023 17:45

Спасибо @chtz, я обновил вопрос информацией о входных матрицах.

rbw 19.01.2023 18:11
Стоит ли изучать PHP в 2023-2024 годах?
Стоит ли изучать PHP в 2023-2024 годах?
Привет всем, сегодня я хочу высказать свои соображения по поводу вопроса, который я уже много раз получал в своем сообществе: "Стоит ли изучать PHP в...
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
В JavaScript одним из самых запутанных понятий является поведение ключевого слова "this" в стрелочной и обычной функциях.
Приемы CSS-макетирования - floats и Flexbox
Приемы CSS-макетирования - floats и Flexbox
Здравствуйте, друзья-студенты! Готовы совершенствовать свои навыки веб-дизайна? Сегодня в нашем путешествии мы рассмотрим приемы CSS-верстки - в...
Тестирование функциональных ngrx-эффектов в Angular 16 с помощью Jest
В системе управления состояниями ngrx, совместимой с Angular 16, появились функциональные эффекты. Это здорово и делает код определенно легче для...
Концепция локализации и ее применение в приложениях React ⚡️
Концепция локализации и ее применение в приложениях React ⚡️
Локализация - это процесс адаптации приложения к различным языкам и культурным требованиям. Это позволяет пользователям получить опыт, соответствующий...
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
1
2
64
2
Перейти к ответу Данный вопрос помечен как решенный

Ответы 2

Пусть M=402264, N=42, тогда в вашем случае Phi*D_sigma произведение выполняет M*N² FMA операций, cwiseProduct - сумма M*N FMA операций. Вы можете сохранить некоторую значительную работу, если вычислите Phi * D_sigma только один раз, но вам нужно фактически оценить результат, например.

 Matrix PhiD = Phi * D_sigma;  // DO NOT USE `auto` HERE!
 Rho = PhiD.cwiseProduct(Phi).rowwise().sum();
 if (...) // etc

Я что-то упустил во второй оптимизации? Phi.transpose()*Phi) приводит к матрице 42x42 и выходному вектору из 42 элементов. Оригинал дает выходной вектор из 402264 элементов.

Homer512 20.01.2023 10:57

@ Homer512 Вы правы, я иногда путаю, в каком направлении работают rowwise().sum() и т. д. Я удалил вторую часть ответа на данный момент, может быть, я смогу придумать какой-нибудь хитрый трюк, но я не очень надеюсь. Сохранение для оценки Phi * D_sigma несколько раз уже должно сэкономить много.

chtz 20.01.2023 11:36
Ответ принят как подходящий

Я сделаю эту оптимизацию поэтапно. Сначала мы устанавливаем базовое значение.

Вы не дали определение типа для своего типа Matrix. Поэтому я определяю это как Eigen::MatrixXf. Кроме того, просто для собственного здравомыслия я переопределяю различные Rho-векторы как таковые. Обратите внимание, что Eigen иногда оптимизирует пути кода для векторов по сравнению с матрицами, которые просто являются векторами. Так что в любом случае это хорошая идея, к тому же это упрощает чтение кода.

using Matrix = Eigen::MatrixXf;
using Vector = Eigen::VectorXf;

namespace {
void compute(const Matrix& Phi, const Matrix& D_sigma, const Matrix& DPhi,
             float factor, Vector& Rho)
{
    Rho = (Phi * D_sigma).cwiseProduct(DPhi).rowwise().sum() * factor;
}
} /* namespace anonymous */

void computeFVarsSigma(const int DFAType, const Matrix& D_sigma,
        const Matrix& Phi, const Matrix& DPhiDx, const Matrix& DPhiDy,
        const Matrix& DPhiDz, Vector& Rho, Vector& DRhoDx, Vector& DRhoDy,
        Vector& DRhoDz)
{
    compute(Phi, D_sigma, Phi, 1.f, Rho);
    if (DFAType == 1) {
        compute(Phi, D_sigma, DPhiDx, 2.f, DRhoDx);
        compute(Phi, D_sigma, DPhiDy, 2.f, DRhoDy);
        compute(Phi, D_sigma, DPhiDz, 2.f, DRhoDz);
    }
}

Первая оптимизация, предложенная @chtz, заключается в кэшировании матричного умножения. Не используйте для этого auto, как указано в документации Eigen.

namespace {
void compute(const Matrix& PhiD, const Matrix& DPhi, float factor, Vector& Rho)
{
    Rho = PhiD.cwiseProduct(DPhi).rowwise().sum() * factor;
}
} /* namespace anonymous */

void computeFVarsSigma(const int DFAType, const Matrix& D_sigma,
        const Matrix& Phi, const Matrix& DPhiDx, const Matrix& DPhiDy,
        const Matrix& DPhiDz, Vector& Rho, Vector& DRhoDx, Vector& DRhoDy,
        Vector& DRhoDz)
{
    const Matrix PhiD = Phi * D_sigma;
    compute(PhiD, Phi, 1.f, Rho);
    if (DFAType == 1) {
        compute(PhiD, DPhiDx, 2.f, DRhoDx);
        compute(PhiD, DPhiDy, 2.f, DRhoDy);
        compute(PhiD, DPhiDz, 2.f, DRhoDz);
    }
}

Теперь это в 3,15 раза быстрее в моей системе.

Второй шаг — уменьшить объем требуемой памяти, выполняя операцию поблочно. Идея довольно проста: мы несколько ограничены пропускной способностью памяти, тем более что произведение матрица-матрица довольно «тонкое». Кроме того, это помогает с шагом после этого.

Здесь я выбираю размер блока 384 строки. Мое эмпирическое правило заключается в том, что входные и выходные данные должны помещаться в кеш L2 (128-256 КБ, возможно, разделенный на 2 потока) и что он должен быть кратным 16 для хорошей векторизации по всем направлениям. 384 rows * 42 columns * 4 byte per float = 64 kiB. Отрегулируйте по мере необходимости для других скалярных типов, но, судя по моим тестам, на самом деле это не очень чувствительно.

Постарайтесь использовать Eigen::Ref или соответствующие шаблоны, чтобы избежать копирования, как я сделал здесь во вспомогательной функции compute.

namespace {
void compute(const Matrix& PhiD, const Eigen::Ref<const Matrix>& DPhi,
             float factor, Eigen::Ref<Vector> Rho)
{
    Rho = PhiD.cwiseProduct(DPhi).rowwise().sum() * factor;
}
} /* namespace anonymous */

void computeFVarsSigma(const int DFAType, const Matrix& D_sigma,
        const Matrix& Phi, const Matrix& DPhiDx, const Matrix& DPhiDy,
        const Matrix& DPhiDz, Vector& Rho, Vector& DRhoDx, Vector& DRhoDy,
        Vector& DRhoDz)
{
    const Eigen::Index n = Phi.rows(), blocksize = 384;
    Rho.resize(n);
    if (DFAType == 1)
        for(Vector* vec: {&DRhoDx, &DRhoDy, &DRhoDz})
            vec->resize(n);
    Matrix PhiD;
    for(Eigen::Index i = 0; i < n; i += blocksize) {
        const Eigen::Index cur = std::min(blocksize, n - i);
        PhiD.noalias() = Phi.middleRows(i, cur) * D_sigma;
        compute(PhiD, Phi.middleRows(i, cur), 1.f, Rho.segment(i, cur));
        if (DFAType == 1) {
            compute(PhiD, DPhiDx.middleRows(i, cur), 2.f,
                    DRhoDx.segment(i, cur));
            compute(PhiD, DPhiDy.middleRows(i, cur), 2.f,
                    DRhoDy.segment(i, cur));
            compute(PhiD, DPhiDz.middleRows(i, cur), 2.f,
                    DRhoDz.segment(i, cur));
        }
    }
}

Это еще одно ускорение в 1,75 раза.

Теперь, когда у нас есть это, мы можем очень легко распараллелить. Eigen может распараллелить матричное умножение внутренне, но не все остальное, поэтому мы делаем все это внешне. Блочная версия работает лучше, потому что она может постоянно загружать все потоки и лучше использовать объединенную емкость кэша L2 системы. Компилировать с помощью -fopenmp

namespace {
void compute(const Matrix& PhiD, const Eigen::Ref<const Matrix>& DPhi,
             float factor, Eigen::Ref<Vector> Rho)
{
    Rho = PhiD.cwiseProduct(DPhi).rowwise().sum() * factor;
}
} /* namespace anonymous */

void computeFVarsSigma(const int DFAType, const Matrix& D_sigma,
        const Matrix& Phi, const Matrix& DPhiDx, const Matrix& DPhiDy,
        const Matrix& DPhiDz, Vector& Rho, Vector& DRhoDx, Vector& DRhoDy,
        Vector& DRhoDz)
{
    const Eigen::Index n = Phi.rows(), blocksize = 384;
    Rho.resize(n);
    if (DFAType == 1)
        for(Vector* vec: {&DRhoDx, &DRhoDy, &DRhoDz})
            vec->resize(n);
#   pragma omp parallel
    {
        Matrix PhiD;
#       pragma omp for nowait
        for(Eigen::Index i = 0; i < n; i += blocksize) {
            const Eigen::Index cur = std::min(blocksize, n - i);
            PhiD.noalias() = Phi.middleRows(i, cur) * D_sigma;
            compute(PhiD, Phi.middleRows(i, cur), 1.f, Rho.segment(i, cur));
            if (DFAType == 1) {
                compute(PhiD, DPhiDx.middleRows(i, cur), 2.f,
                        DRhoDx.segment(i, cur));
                compute(PhiD, DPhiDy.middleRows(i, cur), 2.f,
                        DRhoDy.segment(i, cur));
                compute(PhiD, DPhiDz.middleRows(i, cur), 2.f,
                        DRhoDz.segment(i, cur));
            }
        }
    }
}

Интересно, что это не дает огромного преимущества в моей системе, только коэффициент 1,25 с 8 ядрами / 16 потоками. Я не исследовал, что на самом деле является узким местом. Я предполагаю, что это моя основная пропускная способность памяти. Система с более низкой пропускной способностью на ядро ​​и/или более высокой пропускной способностью на узел (Xeons, Threadrippers) может получить больше преимуществ.

Последнее предложение, но оно ситуативное: транспонируйте матрицы Phi и DPhiDx/y/z. Это позволяет провести две дополнительные оптимизации для матриц с главным столбцом, таких как , используемые Eigen :

  1. Общие матричные умножения выполняются быстрее всего, когда они записываются по образцу A.transpose() * B. Транспонирование элементов в Phi позволяет нам писать PhiD = D_sigma.transpose() * Phi

  2. Сокращения по столбцам быстрее, чем по строкам, за исключением очень небольшого количества столбцов, например, в MatrixX4f

namespace {
void compute(const Matrix& PhiD, const Eigen::Ref<const Matrix>& DPhi,
             float factor, Eigen::Ref<Vector> Rho)
{
    Rho = PhiD.cwiseProduct(DPhi).colwise().sum() * factor;
}
} /* namespace anonymous */

void computeFVarsSigma(const int DFAType, const Matrix& D_sigma,
        const Matrix& Phi, const Matrix& DPhiDx, const Matrix& DPhiDy,
        const Matrix& DPhiDz, Vector& Rho, Vector& DRhoDx, Vector& DRhoDy,
        Vector& DRhoDz)
{
    const Eigen::Index n = Phi.cols(), blocksize = 384;
    Rho.resize(n);
    if (DFAType == 1)
        for(Vector* vec: {&DRhoDx, &DRhoDy, &DRhoDz})
            vec->resize(n);
#   pragma omp parallel
    {
        Matrix PhiD;
#       pragma omp for nowait
        for(Eigen::Index i = 0; i < n; i += blocksize) {
            const Eigen::Index cur = std::min(blocksize, n - i);
            PhiD.noalias() = D_sigma.transpose() * Phi.middleCols(i, cur);
            compute(PhiD, Phi.middleCols(i, cur), 1.f, Rho.segment(i, cur));
            if (DFAType == 1) {
                compute(PhiD, DPhiDx.middleCols(i, cur), 2.f,
                        DRhoDx.segment(i, cur));
                compute(PhiD, DPhiDy.middleCols(i, cur), 2.f,
                        DRhoDy.segment(i, cur));
                compute(PhiD, DPhiDz.middleCols(i, cur), 2.f,
                        DRhoDz.segment(i, cur));
            }
        }
    }
}

Это дает еще одно ускорение в 1,14 раза. Я бы предположил большее преимущество, если внутреннее измерение вырастет с 42 до чего-то близкого к 100 или 1000, а также если узкое место выше не будет столь выраженным.

Улучшение за счет разложения

Есть изящный трюк, который вы можете применить для случая (Phi * D_sigma).cwiseProduct(Phi).rowwise().sum():

Пусть p будет вектором-строкой Phi, S будет D_sigma, а d будет скалярным результатом для этой одной строки. Тогда то, что мы вычисляем,

d = p * S * p'

Если S положительно полуопределенно, мы можем использовать разложение LDLT:

S = P' * L * D * L' * P

в матрицу перестановок P, нижнюю треугольную матрицу L и диагональную матрицу D.

Отсюда следует:

d = p * P' * L * D * L' * P * p'
d = (p * P') * (L * sqrt(D)) * (sqrt(D) * L') * (P * p')
d = ||(P * p) * (L * sqrt(D))||^2

(P * p) — это простая перестановка. (L * sqrt(D)) — еще одна быстрая и простая операция, поскольку D — это просто диагональная матрица. Окончательное умножение вектора (P * p) на матрицу (L * sqrt(D)) также дешевле, чем раньше, потому что L — треугольная матрица. Таким образом, вы можете использовать triangularView<Eigen::Lower> Eigen для сохранения операций.

Поскольку декомпозиция может завершиться неудачей, вы должны предоставить исходный подход в качестве запасного варианта.

Другие вопросы по теме