У меня есть эта функция, которая вызывается несколько раз в моем коде:
void Grid::computeFVarsSigma(const int DFAType,
const Matrix& D_sigma,
const Matrix& Phi,
const Matrix& DPhiDx,
const Matrix& DPhiDy,
const Matrix& DPhiDz,
Matrix& Rho,
Matrix& DRhoDx,
Matrix& DRhoDy,
Matrix& DRhoDz)
{
// auto PhiD = Phi * D_sigma;
Rho = ((Phi * D_sigma).cwiseProduct(Phi)).rowwise().sum();
if (DFAType == 1)
{
DRhoDx = 2. * ((Phi * D_sigma).cwiseProduct(DPhiDx)).rowwise().sum();
DRhoDy = 2. * ((Phi * D_sigma).cwiseProduct(DPhiDy)).rowwise().sum();
DRhoDz = 2. * ((Phi * D_sigma).cwiseProduct(DPhiDz)).rowwise().sum();
}
}
Для варианта использования, который я взял для сравнения, входные массивы имеют следующие размеры:
D_sigma 42 42
Phi 402264 42
DPhiDx 402264 42
DPhiDy 402264 42
DPhiDz 402264 42
Среднее время вызова этой функции 12
раз равно 0.621 seconds
, измерено с помощью std::chrono::high_resolution_clock
. Я выполняю эти расчеты на AMD Ryzen5, скомпилированном с помощью g++ 7.5.0. Я могу поднять версию компилятора, но сейчас меня больше всего интересует оптимизация кода.
Одна идея, которую я хотел бы изучить, состоит в том, чтобы хранить вычисления cwiseProduct для DRhoDx, DRhoDy и DRhoDz непосредственно в матрице 3xNGridPoints. Однако я пока не знаю, как это сделать.
Есть ли другие манипуляции, которыми я мог бы попытаться улучшить эту функцию?
Заранее благодарим за ваши комментарии.
Я хотел бы поблагодарить @chatz и @Homer512 за очень хорошие предложения. Я был очень доволен однострочной оптимизацией, предложенной @chatz, однако предложения @Homer512 резко изменили производительность, как показано на рисунке ниже (особая благодарность @Homer512!). Я обязательно воспользуюсь обоими предложениями в качестве отправной точки для улучшения других частей моего кода.
Обратите внимание: я использую double и на рисунке ниже возвращаю параметр и возвращаю tuple означает ту же функцию, которая возвращает результат в виде кортежа и параметров соответственно.
Спасибо @chtz, я обновил вопрос информацией о входных матрицах.
Пусть M=402264, N=42, тогда в вашем случае Phi*D_sigma
произведение выполняет M*N²
FMA операций, cwiseProduct - сумма M*N
FMA операций. Вы можете сохранить некоторую значительную работу, если вычислите Phi * D_sigma
только один раз, но вам нужно фактически оценить результат, например.
Matrix PhiD = Phi * D_sigma; // DO NOT USE `auto` HERE!
Rho = PhiD.cwiseProduct(Phi).rowwise().sum();
if (...) // etc
Я что-то упустил во второй оптимизации? Phi.transpose()*Phi)
приводит к матрице 42x42 и выходному вектору из 42 элементов. Оригинал дает выходной вектор из 402264 элементов.
@ Homer512 Вы правы, я иногда путаю, в каком направлении работают rowwise().sum()
и т. д. Я удалил вторую часть ответа на данный момент, может быть, я смогу придумать какой-нибудь хитрый трюк, но я не очень надеюсь. Сохранение для оценки Phi * D_sigma
несколько раз уже должно сэкономить много.
Я сделаю эту оптимизацию поэтапно. Сначала мы устанавливаем базовое значение.
Вы не дали определение типа для своего типа Matrix. Поэтому я определяю это как Eigen::MatrixXf
. Кроме того, просто для собственного здравомыслия я переопределяю различные Rho-векторы как таковые. Обратите внимание, что Eigen иногда оптимизирует пути кода для векторов по сравнению с матрицами, которые просто являются векторами. Так что в любом случае это хорошая идея, к тому же это упрощает чтение кода.
using Matrix = Eigen::MatrixXf;
using Vector = Eigen::VectorXf;
namespace {
void compute(const Matrix& Phi, const Matrix& D_sigma, const Matrix& DPhi,
float factor, Vector& Rho)
{
Rho = (Phi * D_sigma).cwiseProduct(DPhi).rowwise().sum() * factor;
}
} /* namespace anonymous */
void computeFVarsSigma(const int DFAType, const Matrix& D_sigma,
const Matrix& Phi, const Matrix& DPhiDx, const Matrix& DPhiDy,
const Matrix& DPhiDz, Vector& Rho, Vector& DRhoDx, Vector& DRhoDy,
Vector& DRhoDz)
{
compute(Phi, D_sigma, Phi, 1.f, Rho);
if (DFAType == 1) {
compute(Phi, D_sigma, DPhiDx, 2.f, DRhoDx);
compute(Phi, D_sigma, DPhiDy, 2.f, DRhoDy);
compute(Phi, D_sigma, DPhiDz, 2.f, DRhoDz);
}
}
Первая оптимизация, предложенная @chtz, заключается в кэшировании матричного умножения. Не используйте для этого auto, как указано в документации Eigen.
namespace {
void compute(const Matrix& PhiD, const Matrix& DPhi, float factor, Vector& Rho)
{
Rho = PhiD.cwiseProduct(DPhi).rowwise().sum() * factor;
}
} /* namespace anonymous */
void computeFVarsSigma(const int DFAType, const Matrix& D_sigma,
const Matrix& Phi, const Matrix& DPhiDx, const Matrix& DPhiDy,
const Matrix& DPhiDz, Vector& Rho, Vector& DRhoDx, Vector& DRhoDy,
Vector& DRhoDz)
{
const Matrix PhiD = Phi * D_sigma;
compute(PhiD, Phi, 1.f, Rho);
if (DFAType == 1) {
compute(PhiD, DPhiDx, 2.f, DRhoDx);
compute(PhiD, DPhiDy, 2.f, DRhoDy);
compute(PhiD, DPhiDz, 2.f, DRhoDz);
}
}
Теперь это в 3,15 раза быстрее в моей системе.
Второй шаг — уменьшить объем требуемой памяти, выполняя операцию поблочно. Идея довольно проста: мы несколько ограничены пропускной способностью памяти, тем более что произведение матрица-матрица довольно «тонкое». Кроме того, это помогает с шагом после этого.
Здесь я выбираю размер блока 384 строки. Мое эмпирическое правило заключается в том, что входные и выходные данные должны помещаться в кеш L2 (128-256 КБ, возможно, разделенный на 2 потока) и что он должен быть кратным 16 для хорошей векторизации по всем направлениям. 384 rows * 42 columns * 4 byte per float = 64 kiB
. Отрегулируйте по мере необходимости для других скалярных типов, но, судя по моим тестам, на самом деле это не очень чувствительно.
Постарайтесь использовать Eigen::Ref
или соответствующие шаблоны, чтобы избежать копирования, как я сделал здесь во вспомогательной функции compute
.
namespace {
void compute(const Matrix& PhiD, const Eigen::Ref<const Matrix>& DPhi,
float factor, Eigen::Ref<Vector> Rho)
{
Rho = PhiD.cwiseProduct(DPhi).rowwise().sum() * factor;
}
} /* namespace anonymous */
void computeFVarsSigma(const int DFAType, const Matrix& D_sigma,
const Matrix& Phi, const Matrix& DPhiDx, const Matrix& DPhiDy,
const Matrix& DPhiDz, Vector& Rho, Vector& DRhoDx, Vector& DRhoDy,
Vector& DRhoDz)
{
const Eigen::Index n = Phi.rows(), blocksize = 384;
Rho.resize(n);
if (DFAType == 1)
for(Vector* vec: {&DRhoDx, &DRhoDy, &DRhoDz})
vec->resize(n);
Matrix PhiD;
for(Eigen::Index i = 0; i < n; i += blocksize) {
const Eigen::Index cur = std::min(blocksize, n - i);
PhiD.noalias() = Phi.middleRows(i, cur) * D_sigma;
compute(PhiD, Phi.middleRows(i, cur), 1.f, Rho.segment(i, cur));
if (DFAType == 1) {
compute(PhiD, DPhiDx.middleRows(i, cur), 2.f,
DRhoDx.segment(i, cur));
compute(PhiD, DPhiDy.middleRows(i, cur), 2.f,
DRhoDy.segment(i, cur));
compute(PhiD, DPhiDz.middleRows(i, cur), 2.f,
DRhoDz.segment(i, cur));
}
}
}
Это еще одно ускорение в 1,75 раза.
Теперь, когда у нас есть это, мы можем очень легко распараллелить. Eigen может распараллелить матричное умножение внутренне, но не все остальное, поэтому мы делаем все это внешне. Блочная версия работает лучше, потому что она может постоянно загружать все потоки и лучше использовать объединенную емкость кэша L2 системы. Компилировать с помощью -fopenmp
namespace {
void compute(const Matrix& PhiD, const Eigen::Ref<const Matrix>& DPhi,
float factor, Eigen::Ref<Vector> Rho)
{
Rho = PhiD.cwiseProduct(DPhi).rowwise().sum() * factor;
}
} /* namespace anonymous */
void computeFVarsSigma(const int DFAType, const Matrix& D_sigma,
const Matrix& Phi, const Matrix& DPhiDx, const Matrix& DPhiDy,
const Matrix& DPhiDz, Vector& Rho, Vector& DRhoDx, Vector& DRhoDy,
Vector& DRhoDz)
{
const Eigen::Index n = Phi.rows(), blocksize = 384;
Rho.resize(n);
if (DFAType == 1)
for(Vector* vec: {&DRhoDx, &DRhoDy, &DRhoDz})
vec->resize(n);
# pragma omp parallel
{
Matrix PhiD;
# pragma omp for nowait
for(Eigen::Index i = 0; i < n; i += blocksize) {
const Eigen::Index cur = std::min(blocksize, n - i);
PhiD.noalias() = Phi.middleRows(i, cur) * D_sigma;
compute(PhiD, Phi.middleRows(i, cur), 1.f, Rho.segment(i, cur));
if (DFAType == 1) {
compute(PhiD, DPhiDx.middleRows(i, cur), 2.f,
DRhoDx.segment(i, cur));
compute(PhiD, DPhiDy.middleRows(i, cur), 2.f,
DRhoDy.segment(i, cur));
compute(PhiD, DPhiDz.middleRows(i, cur), 2.f,
DRhoDz.segment(i, cur));
}
}
}
}
Интересно, что это не дает огромного преимущества в моей системе, только коэффициент 1,25 с 8 ядрами / 16 потоками. Я не исследовал, что на самом деле является узким местом. Я предполагаю, что это моя основная пропускная способность памяти. Система с более низкой пропускной способностью на ядро и/или более высокой пропускной способностью на узел (Xeons, Threadrippers) может получить больше преимуществ.
Последнее предложение, но оно ситуативное: транспонируйте матрицы Phi и DPhiDx/y/z. Это позволяет провести две дополнительные оптимизации для матриц с главным столбцом, таких как , используемые Eigen :
Общие матричные умножения выполняются быстрее всего, когда они записываются по образцу A.transpose() * B
. Транспонирование элементов в Phi позволяет нам писать PhiD = D_sigma.transpose() * Phi
Сокращения по столбцам быстрее, чем по строкам, за исключением очень небольшого количества столбцов, например, в MatrixX4f
namespace {
void compute(const Matrix& PhiD, const Eigen::Ref<const Matrix>& DPhi,
float factor, Eigen::Ref<Vector> Rho)
{
Rho = PhiD.cwiseProduct(DPhi).colwise().sum() * factor;
}
} /* namespace anonymous */
void computeFVarsSigma(const int DFAType, const Matrix& D_sigma,
const Matrix& Phi, const Matrix& DPhiDx, const Matrix& DPhiDy,
const Matrix& DPhiDz, Vector& Rho, Vector& DRhoDx, Vector& DRhoDy,
Vector& DRhoDz)
{
const Eigen::Index n = Phi.cols(), blocksize = 384;
Rho.resize(n);
if (DFAType == 1)
for(Vector* vec: {&DRhoDx, &DRhoDy, &DRhoDz})
vec->resize(n);
# pragma omp parallel
{
Matrix PhiD;
# pragma omp for nowait
for(Eigen::Index i = 0; i < n; i += blocksize) {
const Eigen::Index cur = std::min(blocksize, n - i);
PhiD.noalias() = D_sigma.transpose() * Phi.middleCols(i, cur);
compute(PhiD, Phi.middleCols(i, cur), 1.f, Rho.segment(i, cur));
if (DFAType == 1) {
compute(PhiD, DPhiDx.middleCols(i, cur), 2.f,
DRhoDx.segment(i, cur));
compute(PhiD, DPhiDy.middleCols(i, cur), 2.f,
DRhoDy.segment(i, cur));
compute(PhiD, DPhiDz.middleCols(i, cur), 2.f,
DRhoDz.segment(i, cur));
}
}
}
}
Это дает еще одно ускорение в 1,14 раза. Я бы предположил большее преимущество, если внутреннее измерение вырастет с 42 до чего-то близкого к 100 или 1000, а также если узкое место выше не будет столь выраженным.
Есть изящный трюк, который вы можете применить для случая (Phi * D_sigma).cwiseProduct(Phi).rowwise().sum()
:
Пусть p
будет вектором-строкой Phi, S
будет D_sigma
, а d
будет скалярным результатом для этой одной строки. Тогда то, что мы вычисляем,
d = p * S * p'
Если S
положительно полуопределенно, мы можем использовать разложение LDLT:
S = P' * L * D * L' * P
в матрицу перестановок P
, нижнюю треугольную матрицу L
и диагональную матрицу D
.
Отсюда следует:
d = p * P' * L * D * L' * P * p'
d = (p * P') * (L * sqrt(D)) * (sqrt(D) * L') * (P * p')
d = ||(P * p) * (L * sqrt(D))||^2
(P * p)
— это простая перестановка. (L * sqrt(D))
— еще одна быстрая и простая операция, поскольку D
— это просто диагональная матрица. Окончательное умножение вектора (P * p)
на матрицу (L * sqrt(D))
также дешевле, чем раньше, потому что L
— треугольная матрица. Таким образом, вы можете использовать triangularView<Eigen::Lower> Eigen для сохранения операций.
Поскольку декомпозиция может завершиться неудачей, вы должны предоставить исходный подход в качестве запасного варианта.
Пожалуйста, будьте более конкретны в отношении «входные массивы имеют размеры
1x402264
»: так что все (?)Matrix
на самом деле являются RowVectors? (Если это так,Phi * D_sigma
не будет работать).