Пытаюсь оптимизировать реализацию ARPLS

Я пытаюсь оптимизировать свой алгоритм подбора спектральной базовой линии (на основе этого), поскольку, насколько я понимаю, производительность NumPy apply_along_axis ненамного выше, чем простое перебор индексов массива.

В настоящее время обработка спектра из ~4000 длин волн занимает ~0,15 секунды, но применительно к миллионам спектров это заняло бы, по меньшей мере, некоторое время. Мне не хватает очевидного векторизованного решения, или алгоритм настолько тяжелый? Пример файла спектров представлен здесь , а его соответствующая базовая линия здесь.

import numpy as np
from numpy.typing import NDArray
from scipy.linalg import norm
from scipy.sparse import csc_array, csr_array, dia_array, diags_array, eye_array
from scipy.sparse.linalg import spsolve


# Sparse array implementation
def arpls(
    spectrum: NDArray[np.floating],
    smoothing_factor: float = 1E6,
    convergence_tolerance: float = 0.05,
    differentiation_order: int = 2,
    max_iterations: int = 100,
    edge_padding: int = 100,
) -> NDArray[np.floating]:
    """
    Fits a baseline signal to the given spectrum using Asymmetrically Reweighted Penalized Least Squares smoothing.

    ### Parameters
    - spectrum: `NDArray[np.floating]` -- The 1D spectrum input array.
    - smoothing_factor: `float`, optional -- Smoothing strength parameter.\
        The larger the parameter, the smoother the resulting baseline.\
        High values suppress small fluctuations, which may lead to a smoother but less sensitive baseline.
    - convergence_tolerance: `float`, optional -- Convergence criterion based on the change in weights between iterations.\
        Must be a value between 0 and 1. Smaller values allow less negative deviations, promoting a more accurate baseline fitting.
    - differentiation_order: `int`, optional -- The order of the difference operator used in penalization.\
        An order of 1 enforces a linear baseline (smoothing the first derivative), while an order of 2 enforces a quadratic baseline (smoothing the second derivative).\
        Higher orders allow for more complex baseline shapes.
    - max_iterations: `int`, optional -- Maximum number of iterations to perform.\
        The algorithm iteratively refines the baseline until the convergence criterion is met or the maximum number of iterations is reached.
    - edge_padding: `int`, optional -- Number of points to pad on both ends of the spectrum to improve baseline fitting at the edges.\
        Padding is done using edge values to minimize boundary effects.

    ### Returns
    - `NDArray[np.floating]` -- The fitted baseline signal array of the same length as the input spectrum (after removing padding).

    ### Raises
    - `ValueError` -- If the ratio is not between 0 and 1.

    ### Reference
    - https://irfpy.irf.se/projects/ica/api/api_irfpy.ica.baseline.html#irfpy.ica.baseline.arpls

    ### Notes
    The ARPLS algorithm is particularly useful for baseline correction in spectra where the baseline is not well-defined and varies in a complex way.
    The algorithm iteratively adjusts the baseline by penalizing large deviations while allowing for asymmetric weighting, making it robust for various types of spectral data.
    """

    # Step 1: Pad the spectrum to reduce boundary effects during baseline fitting.
    padded_spectrum = np.pad(spectrum, pad_width=edge_padding, mode = "edge")
    # Step 2: Initialize the weights array.
    current_weights = np.ones_like(padded_spectrum)

    # Step 3: Initialize a difference array for enforcing smoothness
    differences: csr_array = eye_array(padded_spectrum.shape[0], dtype=np.float64, format = "csr")
    for _ in range(differentiation_order):
        # Update the difference array to account for higher-order differentials if required
        differences = differences[1:] - differences[:-1]

    # Step 4: Compute the smoothness penalties using the difference matrix and smoothing factor
    smoothness_penalies: csc_array = smoothing_factor * (differences.T @ differences)

    # Step 5: Iteratively refine the estimated baseline
    baseline = np.zeros_like(padded_spectrum)
    for _ in range(max_iterations):
        # Step 5a: Generate a diagonal weights array from the current weights
        diagonal_weights: dia_array = diags_array(current_weights, dtype=np.float64, format = "dia")
        # Step 5b: Solve for the baseline using the current weights and smoothness penalties
        baseline[:] = spsolve(diagonal_weights + smoothness_penalies, current_weights * padded_spectrum, permc_spec = "NATURAL")

        # Step 5c: Calculate residuals (difference between the actual spectrum and the baseline)
        residuals = padded_spectrum - baseline
        # Step 5d: Calculate statistics of the negative residuals. Negative residuals indicate regions where the baseline may be overestimated
        negative_residuals = residuals[residuals < 0]
        nr_mean: np.floating = negative_residuals.mean(dtype=np.float64)
        nr_deviation: np.floating = negative_residuals.std(dtype=np.float64)

        # Step 5e: Update the weights to penalize large deviations while allowing for asymmetry. This step adjusts the baseline fitting to be more sensitive to regions below the baseline
        # Potential overflow in the original algorithm: "np.exp(2.0 * (residuals - ((2 * nr_deviation) - nr_mean)) / nr_deviation)"
        exponents = 2 * (residuals - (2 * nr_deviation - nr_mean)) / nr_deviation
        exponents.clip(-500, 500, out=exponents)
        updated_weights = 1.0 / (1.0 + np.exp(exponents, dtype=np.float64))

        # Step 5f: Check for convergence by measuring how much the weights have changed
        if (norm(current_weights - updated_weights) / norm(current_weights)) < convergence_tolerance:
            break # Stop iterating if the change in weights is below the specified tolerance
        current_weights[:] = updated_weights

    # Step 6: Remove the padding and return the baseline corresponding to the original spectrum
    return baseline[edge_padding:-edge_padding]

# (M, N) spectra, where M is the number of signals acquired and N, the amount of wavelengths each signal contains
spectra = np.loadtxt("spectra.csv", dtype=np.float32, delimiter = ",")

baseline = np.apply_along_axis(arpls, axis=1, arr=spectra)
np.savetxt("baseline.csv", baseline, delimiter = ",")

Если вы используете такой инструмент, как line_profiler, где сказано, что он тратит большую часть своего времени?

Nick ODell 23.08.2024 23:14

В этом вопросе отсутствует (полностью) минимальный воспроизводимый пример или подробная информация о профилировании. Кроме того, вы должны показать, что вы пробовали.

Jérôme Richard 23.08.2024 23:23

«применительно к миллионам спектров» Многопоточность является ключом к ускорению независимых вычислений.

Jérôme Richard 23.08.2024 23:23

«Мне не хватает очевидного векторизованного решения?» Что касается кода, я не думаю, что существует какое-то простое решение. Это не редкость для сложных решений, особенно для алгоритмов, работающих с разреженными структурами.

Jérôme Richard 23.08.2024 23:25

«Алгоритм настолько тяжел?» Обычно нет, по крайней мере, до тех пор, пока у вас нет ограничений типа «использование только Numpy или Scipy», которые здесь определенно далеки от оптимального. Например, в этом коде создано много дорогих (бесполезных) временных массивов, и в функции есть цикл (который медленный в CPython, не говоря уже о том, что каждая функция Numpy имеет значительные накладные расходы на небольших массивах).

Jérôme Richard 23.08.2024 23:32

Я загрузил образец файла спектров вместе с полученными базовыми линиями. Обратите внимание, что я ограничил спектры примерно 2000 длинами волн, поскольку в этой точке существует спектральная граница.

srcLegend 24.08.2024 00:04
Почему в Python есть оператор "pass"?
Почему в Python есть оператор "pass"?
Оператор pass в Python - это простая концепция, которую могут быстро освоить даже новички без опыта программирования.
Некоторые методы, о которых вы не знали, что они существуют в Python
Некоторые методы, о которых вы не знали, что они существуют в Python
Python - самый известный и самый простой в изучении язык в наши дни. Имея широкий спектр применения в области машинного обучения, Data Science,...
Основы Python Часть I
Основы Python Часть I
Вы когда-нибудь задумывались, почему в программах на Python вы видите приведенный ниже код?
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
Алиса и Боб имеют неориентированный граф из n узлов и трех типов ребер:
Оптимизация кода с помощью тернарного оператора Python
Оптимизация кода с помощью тернарного оператора Python
И последнее, что мы хотели бы показать вам, прежде чем двигаться дальше, это
Советы по эффективной веб-разработке с помощью Python
Советы по эффективной веб-разработке с помощью Python
Как веб-разработчик, Python может стать мощным инструментом для создания эффективных и масштабируемых веб-приложений.
0
6
51
3
Перейти к ответу Данный вопрос помечен как решенный

Ответы 3

Ответ принят как подходящий

Я не думаю, что есть что-то, что могло бы сделать это намного быстрее — большую часть времени он проводит в spsolve(), и это уже написано на C. Чтобы сделать это намного быстрее, вам каким-то образом придется избегать решения этой матрицы, и я этого не делаю. Я недостаточно хорошо разбираюсь в математике, чтобы придумать альтернативу, не требующую решения матрицы.

Тем не менее, я нашел две идеи, которые могут сделать это примерно на 36% быстрее.

  • Более быстрая диагональная манипуляция
  • Кэширование создания матрицы штрафов

Первое, что я сделал, это профилировал программу. Я заметил, что программа тратит много времени на строку spsolve, поэтому разбил ее на несколько строк.

A = diagonal_weights + smoothness_penalies
b = current_weights * padded_spectrum
baseline[:] = spsolve(A, b, permc_spec = "NATURAL")

Затем я профилировал это.

    69      1203  569457223.0 473364.3     22.6          A = diagonal_weights + smoothness_penalies
    70      1203    4759976.0   3956.8      0.2          b = current_weights * padded_spectrum
    71      1203 1352957577.0    1e+06     53.7          baseline[:] = spsolve(A, b, permc_spec = "NATURAL")

Расходы 53,7% времени на spsolve трудно сократить по причинам, которые я только что упомянул. Однако 22,6% времени также тратится на сложение двух матриц, что мне кажется безумием. Я думаю, что это некоторая комбинация накладных расходов из-за преобразования DIA в CSC и медленного сложения CSC + CSC.

Более быстрая диагональная манипуляция

В качестве альтернативы я попытался определить индексы диагональных элементов, начальное значение этих элементов, а затем манипулировать диагональными элементами напрямую с помощью NumPy.

# Sparse array implementation
def arpls(
    spectrum: NDArray[np.floating],
    smoothing_factor: float = 1E6,
    convergence_tolerance: float = 0.05,
    differentiation_order: int = 2,
    max_iterations: int = 100,
    edge_padding: int = 100,
) -> NDArray[np.floating]:
    """
    Fits a baseline signal to the given spectrum using Asymmetrically Reweighted Penalized Least Squares smoothing.

    ### Parameters
    - spectrum: `NDArray[np.floating]` -- The 1D spectrum input array.
    - smoothing_factor: `float`, optional -- Smoothing strength parameter.\
        The larger the parameter, the smoother the resulting baseline.\
        High values suppress small fluctuations, which may lead to a smoother but less sensitive baseline.
    - convergence_tolerance: `float`, optional -- Convergence criterion based on the change in weights between iterations.\
        Must be a value between 0 and 1. Smaller values allow less negative deviations, promoting a more accurate baseline fitting.
    - differentiation_order: `int`, optional -- The order of the difference operator used in penalization.\
        An order of 1 enforces a linear baseline (smoothing the first derivative), while an order of 2 enforces a quadratic baseline (smoothing the second derivative).\
        Higher orders allow for more complex baseline shapes.
    - max_iterations: `int`, optional -- Maximum number of iterations to perform.\
        The algorithm iteratively refines the baseline until the convergence criterion is met or the maximum number of iterations is reached.
    - edge_padding: `int`, optional -- Number of points to pad on both ends of the spectrum to improve baseline fitting at the edges.\
        Padding is done using edge values to minimize boundary effects.

    ### Returns
    - `NDArray[np.floating]` -- The fitted baseline signal array of the same length as the input spectrum (after removing padding).

    ### Raises
    - `ValueError` -- If the ratio is not between 0 and 1.

    ### Reference
    - https://irfpy.irf.se/projects/ica/api/api_irfpy.ica.baseline.html#irfpy.ica.baseline.arpls

    ### Notes
    The ARPLS algorithm is particularly useful for baseline correction in spectra where the baseline is not well-defined and varies in a complex way.
    The algorithm iteratively adjusts the baseline by penalizing large deviations while allowing for asymmetric weighting, making it robust for various types of spectral data.
    """

    # Step 1: Pad the spectrum to reduce boundary effects during baseline fitting.
    padded_spectrum = np.pad(spectrum, pad_width=edge_padding, mode = "edge")
    # Step 2: Initialize the weights array.
    current_weights = np.ones_like(padded_spectrum)

    # Step 3: Initialize a difference array for enforcing smoothness
    differences: csr_array = eye_array(padded_spectrum.shape[0], dtype=np.float64, format = "csr")
    for _ in range(differentiation_order):
        # Update the difference array to account for higher-order differentials if required
        differences = differences[1:] - differences[:-1]

    # Step 4: Compute the smoothness penalties using the difference matrix and smoothing factor
    smoothness_penalies: csc_array = smoothing_factor * (differences.T @ differences)
    smoothness_penalies = smoothness_penalies.tocoo()
    smoothness_penalies.sum_duplicates()
    # smoothness_penalies must have canonical format, or changing to csr could change
    # data order
    assert smoothness_penalies.has_canonical_format
    initial_diag_index = np.where(smoothness_penalies.row == smoothness_penalies.col)[0]
    
    initial_diag = smoothness_penalies.data[initial_diag_index]
    smoothness_penalies = smoothness_penalies.tocsr()
    assert (smoothness_penalies.data[initial_diag_index] == smoothness_penalies.diagonal()).all()
    A = smoothness_penalies.copy()

    # Step 5: Iteratively refine the estimated baseline
    baseline = np.zeros_like(padded_spectrum)
    for _ in range(max_iterations):
        # Step 5a: Generate a diagonal weights array from the current weights
        A.data[initial_diag_index] = initial_diag + current_weights
        # Step 5b: Solve for the baseline using the current weights and smoothness penalties
        b = current_weights * padded_spectrum
        baseline[:] = spsolve(A, b, permc_spec = "NATURAL")

        # Step 5c: Calculate residuals (difference between the actual spectrum and the baseline)
        residuals = padded_spectrum - baseline
        # Step 5d: Calculate statistics of the negative residuals. Negative residuals indicate regions where the baseline may be overestimated
        negative_residuals = residuals[residuals < 0]
        nr_mean: np.floating = negative_residuals.mean(dtype=np.float64)
        nr_deviation: np.floating = negative_residuals.std(dtype=np.float64)

        # Step 5e: Update the weights to penalize large deviations while allowing for asymmetry. This step adjusts the baseline fitting to be more sensitive to regions below the baseline
        # Potential overflow in the original algorithm: "np.exp(2.0 * (residuals - ((2 * nr_deviation) - nr_mean)) / nr_deviation)"
        exponents = 2 * (residuals - (2 * nr_deviation - nr_mean)) / nr_deviation
        exponents.clip(-500, 500, out=exponents)
        updated_weights = 1.0 / (1.0 + np.exp(exponents, dtype=np.float64))

        # Step 5f: Check for convergence by measuring how much the weights have changed
        if (norm(current_weights - updated_weights) / norm(current_weights)) < convergence_tolerance:
            break # Stop iterating if the change in weights is below the specified tolerance
        current_weights[:] = updated_weights

    # Step 6: Remove the padding and return the baseline corresponding to the original spectrum
    return baseline[edge_padding:-edge_padding]

Это примерно на 26% быстрее.

Кэширование создания матрицы штрафов

Далее я рассмотрел создание матрицы smoothness_penalies. Главное, что следует отметить в этой матрице, это то, что она зависит от трех вещей: размера спектра, коэффициента сглаживания и порядка дифференциации. Они одинаковы для каждого вызова arpls(). Поскольку arpls() вызывается много раз, это хороший кандидат для мемоизации.

Для этого сначала переместите создание smoothness_penalies в отдельную функцию. Затем примените декоратор functools.lru_cache(). Это сокращает время, затрачиваемое на настройку smoothness_penalies с 16% времени выполнения до 0,2%.

import numpy as np
from numpy.typing import NDArray
from scipy.linalg import norm
from scipy.sparse import csc_array, csr_array, dia_array, diags_array, eye_array
from scipy.sparse.linalg import spsolve
from functools import lru_cache


@lru_cache
def initialize_smoothness_penalies(size, differentiation_order, smoothing_factor):
    # Step 3: Initialize a difference array for enforcing smoothness
    differences: csr_array = eye_array(size, dtype=np.float64, format = "csr")
    for _ in range(differentiation_order):
        # Update the difference array to account for higher-order differentials if required
        differences = differences[1:] - differences[:-1]

    # Step 4: Compute the smoothness penalties using the difference matrix and smoothing factor
    smoothness_penalies: csc_array = smoothing_factor * (differences.T @ differences)
    smoothness_penalies = smoothness_penalies.tocoo()
    smoothness_penalies.sum_duplicates()
    # smoothness_penalies must have canonical format, or changing to csr could change
    # data order
    assert smoothness_penalies.has_canonical_format
    initial_diag_index = np.where(smoothness_penalies.row == smoothness_penalies.col)[0]
    
    initial_diag = smoothness_penalies.data[initial_diag_index]
    smoothness_penalies = smoothness_penalies.tocsr()
    assert (smoothness_penalies.data[initial_diag_index] == smoothness_penalies.diagonal()).all()
    return smoothness_penalies, initial_diag, initial_diag_index


# Sparse array implementation
def arpls(
    spectrum: NDArray[np.floating],
    smoothing_factor: float = 1E6,
    convergence_tolerance: float = 0.05,
    differentiation_order: int = 2,
    max_iterations: int = 100,
    edge_padding: int = 100,
) -> NDArray[np.floating]:
    """
    Fits a baseline signal to the given spectrum using Asymmetrically Reweighted Penalized Least Squares smoothing.

    ### Parameters
    - spectrum: `NDArray[np.floating]` -- The 1D spectrum input array.
    - smoothing_factor: `float`, optional -- Smoothing strength parameter.\
        The larger the parameter, the smoother the resulting baseline.\
        High values suppress small fluctuations, which may lead to a smoother but less sensitive baseline.
    - convergence_tolerance: `float`, optional -- Convergence criterion based on the change in weights between iterations.\
        Must be a value between 0 and 1. Smaller values allow less negative deviations, promoting a more accurate baseline fitting.
    - differentiation_order: `int`, optional -- The order of the difference operator used in penalization.\
        An order of 1 enforces a linear baseline (smoothing the first derivative), while an order of 2 enforces a quadratic baseline (smoothing the second derivative).\
        Higher orders allow for more complex baseline shapes.
    - max_iterations: `int`, optional -- Maximum number of iterations to perform.\
        The algorithm iteratively refines the baseline until the convergence criterion is met or the maximum number of iterations is reached.
    - edge_padding: `int`, optional -- Number of points to pad on both ends of the spectrum to improve baseline fitting at the edges.\
        Padding is done using edge values to minimize boundary effects.

    ### Returns
    - `NDArray[np.floating]` -- The fitted baseline signal array of the same length as the input spectrum (after removing padding).

    ### Raises
    - `ValueError` -- If the ratio is not between 0 and 1.

    ### Reference
    - https://irfpy.irf.se/projects/ica/api/api_irfpy.ica.baseline.html#irfpy.ica.baseline.arpls

    ### Notes
    The ARPLS algorithm is particularly useful for baseline correction in spectra where the baseline is not well-defined and varies in a complex way.
    The algorithm iteratively adjusts the baseline by penalizing large deviations while allowing for asymmetric weighting, making it robust for various types of spectral data.
    """

    # Step 1: Pad the spectrum to reduce boundary effects during baseline fitting.
    padded_spectrum = np.pad(spectrum, pad_width=edge_padding, mode = "edge")
    # Step 2: Initialize the weights array.
    current_weights = np.ones_like(padded_spectrum)
    smoothness_penalies, initial_diag, initial_diag_index = initialize_smoothness_penalies(padded_spectrum.shape[0], differentiation_order, smoothing_factor)

    A = smoothness_penalies.copy()

    # Step 5: Iteratively refine the estimated baseline
    baseline = np.zeros_like(padded_spectrum)
    for _ in range(max_iterations):
        # Step 5a: Generate a diagonal weights array from the current weights
        A.data[initial_diag_index] = initial_diag + current_weights
        # Step 5b: Solve for the baseline using the current weights and smoothness penalties
        b = current_weights * padded_spectrum
        baseline[:] = spsolve(A, b, permc_spec = "NATURAL")

        # Step 5c: Calculate residuals (difference between the actual spectrum and the baseline)
        residuals = padded_spectrum - baseline
        # Step 5d: Calculate statistics of the negative residuals. Negative residuals indicate regions where the baseline may be overestimated
        negative_residuals = residuals[residuals < 0]
        nr_mean: np.floating = negative_residuals.mean(dtype=np.float64)
        nr_deviation: np.floating = negative_residuals.std(dtype=np.float64)

        # Step 5e: Update the weights to penalize large deviations while allowing for asymmetry. This step adjusts the baseline fitting to be more sensitive to regions below the baseline
        # Potential overflow in the original algorithm: "np.exp(2.0 * (residuals - ((2 * nr_deviation) - nr_mean)) / nr_deviation)"
        exponents = 2 * (residuals - (2 * nr_deviation - nr_mean)) / nr_deviation
        exponents.clip(-500, 500, out=exponents)
        updated_weights = 1.0 / (1.0 + np.exp(exponents, dtype=np.float64))

        # Step 5f: Check for convergence by measuring how much the weights have changed
        if (norm(current_weights - updated_weights) / norm(current_weights)) < convergence_tolerance:
            break # Stop iterating if the change in weights is below the specified tolerance
        current_weights[:] = updated_weights

    # Step 6: Remove the padding and return the baseline corresponding to the original spectrum
    return baseline[edge_padding:-edge_padding]

Это включает в себя предыдущую оптимизацию и примерно на 36% быстрее, чем исходная реализация.

Сравнение с исходными результатами

Результаты находятся в пределах 10-7 от оригинала, что обусловлено изменением ввода A для spsolve с CSC на CSR.

Вы можете переписать Initialize_smoothness_penalies для предоставления результатов в CSC, но это замедлит работу spsolve.

Дальнейшая работа

Вот набор идей, которые я исследовал, но не смог реализовать. Возможно, вам повезет больше.

  • Итеративные решатели: spsolve — это прямой решатель, но в scipy есть несколько итерационных решателей. Теоретически, если бы вы могли определить хорошее начальное предположение для x0, например, базовый уровень последней итерации, то вы могли бы получить ответ за очень небольшое количество итераций. На практике мне не удалось заставить это работать быстрее, чем прямой решатель. Лучшим итеративным решателем, который я нашел, был cg, который все равно работал медленнее.
  • umfpack: в документации SciPy есть примечание, которое подразумевает, что spsolve на основе umfpack работает быстрее, чем решение SciPy. Однако это зависит от пакета scikit-umfpack, который мне не удалось установить.

Благодаря вашему руководству мне удалось значительно повысить его производительность. Я опубликовал ответ с обновленным кодом для справки.

srcLegend 26.08.2024 04:31

Я попробовал несколько итеративных решателей, но, судя по результатам, я думаю, что это потребует серьезного переписывания алгоритма... Что касается вашего второго пункта о дальнейшей работе, взглянув на несколько статей, на которые ссылается umfpack, я не не думаю, что стоит затрачивать усилия на компиляцию.

srcLegend 26.08.2024 04:36

Простой и наивный способ ускорить эти вычисления — запустить их на нескольких ядрах благодаря нескольким процессам. Вот пример:

def parallel_arpls(spectra):
    results = joblib.Parallel(n_jobs=-1)(joblib.delayed(arpls)(row) for row in spectra)
    return np.array(results)

Вот результаты производительности процессора Ryzen 7 5700U с 8 ядрами:

%timeit -n 1 np.apply_along_axis(arpls, axis=1, arr=spectra)
1.81 s ± 57 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

%timeit -n 1 parallel_arpls(spectra)
484 ms ± 66.5 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

Это в 3,7 раза быстрее. Это не очень хорошо масштабируется, но имеет то преимущество, что очень просто.

Я думаю, чтобы добиться значительно большей производительности, следует написать более быструю реализацию spsolve. Однако это, безусловно, действительно огромная работа (даже для опытного разработчика). Альтернативным решением может быть реализация альтернативного алгоритма с меньшим количеством итераций или более прямым подходом (для этого требуется хорошее понимание базовой математики).

Благодаря предложениям Ника О'Делла мне удалось повысить производительность более чем на 300% по сравнению с исходным кодом. Вот обновленный алгоритм для справки:

from functools import lru_cache

import numpy as np
from numpy.typing import NDArray
from scipy.linalg import norm
from scipy.sparse import coo_array, csc_array, csr_array, eye_array
from scipy.sparse.linalg import spsolve


# Only need to cache up to two parameter sets
@lru_cache(maxsize=2)
def generate_penalties(
    shape: int,
    differentiation_order: int,
    smoothing_factor: float,
) -> tuple[NDArray[np.floating], csr_array, NDArray[np.floating], NDArray[np.intp]]:
    """
    Generates the initial weights and smoothness penalty matrix used in the ARPLS algorithm.

    This function precomputes the structures needed for penalizing deviations during baseline correction, which are reused across iterations.
    To optimize performance, it caches the results using `lru_cache`, allowing for reuse if the function is called again with the same parameters.

    ### Parameters
    - shape: `int` -- The length of the spectrum for which penalties are generated.
    - differentiation_order: `int` -- The order of the difference operator used in penalization. See the ARPLS docstring for more details.
    - smoothing_factor: `float` -- Controls the smoothness of the resulting baseline. See the ARPLS docstring for more details.

    ### Returns
    - current_weights: `NDArray[floating]` -- Initial weights array, all set to 1, indicating uniform weighting before any iterations.
    - smoothness_penalties: `csr_array` -- A sparse matrix representing the smoothness penalties to be applied during baseline fitting.
    - diagonal: `NDArray[floating]` -- The initial diagonal values of the penalty matrix, stored for efficient updates during iterations.
    - diagonal_indices: `NDArray[intp]` -- Indices where diagonal elements are located in the sparse matrix data array.

    ### Notes
    The caching mechanism assumes that the function is called with similar parameters multiple times, which helps in optimizing performance in repeated baseline fitting tasks.
    """

    # Initialize the weights array to all ones, indicating no preference for any part of the spectrum initially
    weights = np.ones(shape, dtype=np.float32)

    # Create a sparse identity matrix and apply the difference operator iteratively to build the smoothness penalty matrix
    differences = csr_array(eye_array(shape, dtype=np.float64, format = "csr"))
    for _ in range(differentiation_order):
        # Subtracts shifted matrices to build higher-order differences
        differences = csr_array(differences[1:] - differences[:-1])

    # Apply the smoothing factor to the penalty matrix, converting it to a compressed sparse column format for efficient computation
    smoothness_penalties = csc_array(smoothing_factor * (differences.T @ differences))
    smoothness_penalties = coo_array(smoothness_penalties.tocoo())
    # Ensure there are no duplicate entries in the sparse matrix
    smoothness_penalties.sum_duplicates()

    # Identify and store the diagonal elements for quick updates during baseline fitting iterations
    diagonal_indices: NDArray[np.intp] = np.where(smoothness_penalties.row == smoothness_penalties.col)[0]
    diagonal = smoothness_penalties.data[diagonal_indices]
    # Convert to CSR format for solving the linear system
    smoothness_penalties = csr_array(smoothness_penalties.tocsr())

    return weights, smoothness_penalties, diagonal, diagonal_indices


def arpls(
    spectrum: NDArray[np.floating],
    edge_padding: int = 100,
    differentiation_order: int = 2,
    smoothing_factor: float = 1E6,
    convergence_tolerance: float = 0.05,
    max_iterations: int = 100,
) -> NDArray[np.floating]:
    """
    Fits a baseline signal to the given spectrum using the Asymmetrically Reweighted Penalized Least Squares algorithm.

    This method iteratively fits a smooth baseline to a given spectrum by minimizing the effect of noise while preserving the original signal's characteristics.
    The ARPLS algorithm adjusts the baseline by asymmetrically penalizing negative deviations, making it particularly robust for spectral data with varying baseline shapes.

    ### Parameters
    - spectrum: `NDArray[floating]` -- The 1D spectrum input array.
    - edge_padding: `int` -- Number of points to pad on both ends of the spectrum to improve baseline fitting at the edges.\
        Padding is done using edge values to minimize boundary effects.
    - differentiation_order: `int` -- The order of the difference operator used in penalization.\
        An order of 1 enforces a linear baseline (smoothing the first derivative), while an order of 2 enforces a quadratic baseline (smoothing the second derivative).\
        Higher orders allow for more complex baseline shapes.
    - smoothing_factor: `float` -- Controls the smoothness of the resulting baseline.\
        Larger values suppress small fluctuations, which may lead to a smoother, but less sensitive baseline.
    - convergence_tolerance: `float` -- Convergence criterion based on the change in weights between iterations.\
        Must be a value between 0 and 1. Smaller values allow less negative deviations, promoting a more accurate baseline fitting.
    - max_iterations: `int` -- Maximum number of iterations to perform.\
        The algorithm iteratively refines the baseline until the convergence criterion is met or the maximum number of iterations is reached.

    ### Returns
    - `NDArray[floating]` -- The fitted baseline signal array of the same length as the input spectrum (after removing padding).

    ### Reference
    Asymmetrically Reweighted Penalized Least Squares for Baseline Correction: https://irfpy.irf.se/projects/ica/api/api_irfpy.ica.baseline.html#irfpy.ica.baseline.arpls

    ### Notes
    The ARPLS algorithm is particularly useful for baseline correction in spectra where the baseline is not well-defined and varies in a complex way.
    The algorithm iteratively adjusts the baseline by penalizing large deviations while allowing for asymmetric weighting, making it robust for various types of spectral data.
    """

    # Step 1: Pad the spectrum to mitigate boundary effects, which helps in obtaining a more accurate baseline
    padded_spectrum = np.pad(spectrum, pad_width=edge_padding, mode = "edge")
    # Step 2: Generate initial weights, penalty matrix, and indices for diagonal elements
    weights, penalties, diagonal, diagonal_index = generate_penalties(padded_spectrum.shape[0], differentiation_order, smoothing_factor)

    # Step 4: Begin iterative refinement of the baseline using updated weights and penalties
    for _ in range(max_iterations):
        # Step 5a: Update the diagonal elements of the penalty matrix with the current weights.
        penalties.data[diagonal_index] = diagonal + weights
        # Step 5b: Solve the linear system to obtain the new baseline.
        baseline: NDArray[np.floating] = spsolve(penalties, weights * padded_spectrum, permc_spec = "NATURAL", use_umfpack=False)

        # Step 5c: Calculate residuals (difference between the actual spectrum and the baseline)
        residuals = padded_spectrum - baseline
        # Step 5d: Extract the negative residuals, which indicate regions where the baseline may be overestimated
        negative_residuals = residuals[residuals < 0]
        if not negative_residuals.size:
            break # Stop iterating if residuals are all positive

        # Step 5e: Calculate statistics of the negative residuals.
        nr_mean: np.floating = negative_residuals.mean(dtype=np.float64)
        nr_deviation: np.floating = negative_residuals.std(dtype=np.float64)

        # Step 5f: Update the weights to penalize large deviations while allowing for asymmetry. This step adjusts the baseline fitting to be more sensitive to regions below the baseline
        # Potential overflow in the original algorithm: "np.exp(2.0 * (residuals - ((2 * nr_deviation) - nr_mean)) / nr_deviation)"
        exponents = 2 * (residuals - (2 * nr_deviation - nr_mean)) / nr_deviation
        exponents.clip(-500, 500, out=exponents)
        updated_weights = 1.0 / (1.0 + np.exp(exponents, dtype=np.float64))

        # Step 5g: Check for convergence by measuring how much the weights have changed
        if (norm(weights - updated_weights) / norm(weights)) < convergence_tolerance:
            break # Stop iterating if the change in weights is below the specified tolerance

        # Step 5h: Update the weights for the next iteration
        weights[:] = updated_weights

    # Step 6: Remove the padding and return the baseline corresponding to the original spectrum
    return baseline[edge_padding:-edge_padding]

Как предположил Жером Ришар, многопоточность, скорее всего, станет следующим шагом.

Другие вопросы по теме