Я пытаюсь оптимизировать свой алгоритм подбора спектральной базовой линии (на основе этого), поскольку, насколько я понимаю, производительность NumPy apply_along_axis
ненамного выше, чем простое перебор индексов массива.
В настоящее время обработка спектра из ~4000 длин волн занимает ~0,15 секунды, но применительно к миллионам спектров это заняло бы, по меньшей мере, некоторое время. Мне не хватает очевидного векторизованного решения, или алгоритм настолько тяжелый? Пример файла спектров представлен здесь , а его соответствующая базовая линия здесь.
import numpy as np
from numpy.typing import NDArray
from scipy.linalg import norm
from scipy.sparse import csc_array, csr_array, dia_array, diags_array, eye_array
from scipy.sparse.linalg import spsolve
# Sparse array implementation
def arpls(
spectrum: NDArray[np.floating],
smoothing_factor: float = 1E6,
convergence_tolerance: float = 0.05,
differentiation_order: int = 2,
max_iterations: int = 100,
edge_padding: int = 100,
) -> NDArray[np.floating]:
"""
Fits a baseline signal to the given spectrum using Asymmetrically Reweighted Penalized Least Squares smoothing.
### Parameters
- spectrum: `NDArray[np.floating]` -- The 1D spectrum input array.
- smoothing_factor: `float`, optional -- Smoothing strength parameter.\
The larger the parameter, the smoother the resulting baseline.\
High values suppress small fluctuations, which may lead to a smoother but less sensitive baseline.
- convergence_tolerance: `float`, optional -- Convergence criterion based on the change in weights between iterations.\
Must be a value between 0 and 1. Smaller values allow less negative deviations, promoting a more accurate baseline fitting.
- differentiation_order: `int`, optional -- The order of the difference operator used in penalization.\
An order of 1 enforces a linear baseline (smoothing the first derivative), while an order of 2 enforces a quadratic baseline (smoothing the second derivative).\
Higher orders allow for more complex baseline shapes.
- max_iterations: `int`, optional -- Maximum number of iterations to perform.\
The algorithm iteratively refines the baseline until the convergence criterion is met or the maximum number of iterations is reached.
- edge_padding: `int`, optional -- Number of points to pad on both ends of the spectrum to improve baseline fitting at the edges.\
Padding is done using edge values to minimize boundary effects.
### Returns
- `NDArray[np.floating]` -- The fitted baseline signal array of the same length as the input spectrum (after removing padding).
### Raises
- `ValueError` -- If the ratio is not between 0 and 1.
### Reference
- https://irfpy.irf.se/projects/ica/api/api_irfpy.ica.baseline.html#irfpy.ica.baseline.arpls
### Notes
The ARPLS algorithm is particularly useful for baseline correction in spectra where the baseline is not well-defined and varies in a complex way.
The algorithm iteratively adjusts the baseline by penalizing large deviations while allowing for asymmetric weighting, making it robust for various types of spectral data.
"""
# Step 1: Pad the spectrum to reduce boundary effects during baseline fitting.
padded_spectrum = np.pad(spectrum, pad_width=edge_padding, mode = "edge")
# Step 2: Initialize the weights array.
current_weights = np.ones_like(padded_spectrum)
# Step 3: Initialize a difference array for enforcing smoothness
differences: csr_array = eye_array(padded_spectrum.shape[0], dtype=np.float64, format = "csr")
for _ in range(differentiation_order):
# Update the difference array to account for higher-order differentials if required
differences = differences[1:] - differences[:-1]
# Step 4: Compute the smoothness penalties using the difference matrix and smoothing factor
smoothness_penalies: csc_array = smoothing_factor * (differences.T @ differences)
# Step 5: Iteratively refine the estimated baseline
baseline = np.zeros_like(padded_spectrum)
for _ in range(max_iterations):
# Step 5a: Generate a diagonal weights array from the current weights
diagonal_weights: dia_array = diags_array(current_weights, dtype=np.float64, format = "dia")
# Step 5b: Solve for the baseline using the current weights and smoothness penalties
baseline[:] = spsolve(diagonal_weights + smoothness_penalies, current_weights * padded_spectrum, permc_spec = "NATURAL")
# Step 5c: Calculate residuals (difference between the actual spectrum and the baseline)
residuals = padded_spectrum - baseline
# Step 5d: Calculate statistics of the negative residuals. Negative residuals indicate regions where the baseline may be overestimated
negative_residuals = residuals[residuals < 0]
nr_mean: np.floating = negative_residuals.mean(dtype=np.float64)
nr_deviation: np.floating = negative_residuals.std(dtype=np.float64)
# Step 5e: Update the weights to penalize large deviations while allowing for asymmetry. This step adjusts the baseline fitting to be more sensitive to regions below the baseline
# Potential overflow in the original algorithm: "np.exp(2.0 * (residuals - ((2 * nr_deviation) - nr_mean)) / nr_deviation)"
exponents = 2 * (residuals - (2 * nr_deviation - nr_mean)) / nr_deviation
exponents.clip(-500, 500, out=exponents)
updated_weights = 1.0 / (1.0 + np.exp(exponents, dtype=np.float64))
# Step 5f: Check for convergence by measuring how much the weights have changed
if (norm(current_weights - updated_weights) / norm(current_weights)) < convergence_tolerance:
break # Stop iterating if the change in weights is below the specified tolerance
current_weights[:] = updated_weights
# Step 6: Remove the padding and return the baseline corresponding to the original spectrum
return baseline[edge_padding:-edge_padding]
# (M, N) spectra, where M is the number of signals acquired and N, the amount of wavelengths each signal contains
spectra = np.loadtxt("spectra.csv", dtype=np.float32, delimiter = ",")
baseline = np.apply_along_axis(arpls, axis=1, arr=spectra)
np.savetxt("baseline.csv", baseline, delimiter = ",")
В этом вопросе отсутствует (полностью) минимальный воспроизводимый пример или подробная информация о профилировании. Кроме того, вы должны показать, что вы пробовали.
«применительно к миллионам спектров» Многопоточность является ключом к ускорению независимых вычислений.
«Мне не хватает очевидного векторизованного решения?» Что касается кода, я не думаю, что существует какое-то простое решение. Это не редкость для сложных решений, особенно для алгоритмов, работающих с разреженными структурами.
«Алгоритм настолько тяжел?» Обычно нет, по крайней мере, до тех пор, пока у вас нет ограничений типа «использование только Numpy или Scipy», которые здесь определенно далеки от оптимального. Например, в этом коде создано много дорогих (бесполезных) временных массивов, и в функции есть цикл (который медленный в CPython, не говоря уже о том, что каждая функция Numpy имеет значительные накладные расходы на небольших массивах).
Я загрузил образец файла спектров вместе с полученными базовыми линиями. Обратите внимание, что я ограничил спектры примерно 2000 длинами волн, поскольку в этой точке существует спектральная граница.
Я не думаю, что есть что-то, что могло бы сделать это намного быстрее — большую часть времени он проводит в spsolve()
, и это уже написано на C. Чтобы сделать это намного быстрее, вам каким-то образом придется избегать решения этой матрицы, и я этого не делаю. Я недостаточно хорошо разбираюсь в математике, чтобы придумать альтернативу, не требующую решения матрицы.
Тем не менее, я нашел две идеи, которые могут сделать это примерно на 36% быстрее.
Первое, что я сделал, это профилировал программу. Я заметил, что программа тратит много времени на строку spsolve, поэтому разбил ее на несколько строк.
A = diagonal_weights + smoothness_penalies
b = current_weights * padded_spectrum
baseline[:] = spsolve(A, b, permc_spec = "NATURAL")
Затем я профилировал это.
69 1203 569457223.0 473364.3 22.6 A = diagonal_weights + smoothness_penalies
70 1203 4759976.0 3956.8 0.2 b = current_weights * padded_spectrum
71 1203 1352957577.0 1e+06 53.7 baseline[:] = spsolve(A, b, permc_spec = "NATURAL")
Расходы 53,7% времени на spsolve трудно сократить по причинам, которые я только что упомянул. Однако 22,6% времени также тратится на сложение двух матриц, что мне кажется безумием. Я думаю, что это некоторая комбинация накладных расходов из-за преобразования DIA в CSC и медленного сложения CSC + CSC.
В качестве альтернативы я попытался определить индексы диагональных элементов, начальное значение этих элементов, а затем манипулировать диагональными элементами напрямую с помощью NumPy.
# Sparse array implementation
def arpls(
spectrum: NDArray[np.floating],
smoothing_factor: float = 1E6,
convergence_tolerance: float = 0.05,
differentiation_order: int = 2,
max_iterations: int = 100,
edge_padding: int = 100,
) -> NDArray[np.floating]:
"""
Fits a baseline signal to the given spectrum using Asymmetrically Reweighted Penalized Least Squares smoothing.
### Parameters
- spectrum: `NDArray[np.floating]` -- The 1D spectrum input array.
- smoothing_factor: `float`, optional -- Smoothing strength parameter.\
The larger the parameter, the smoother the resulting baseline.\
High values suppress small fluctuations, which may lead to a smoother but less sensitive baseline.
- convergence_tolerance: `float`, optional -- Convergence criterion based on the change in weights between iterations.\
Must be a value between 0 and 1. Smaller values allow less negative deviations, promoting a more accurate baseline fitting.
- differentiation_order: `int`, optional -- The order of the difference operator used in penalization.\
An order of 1 enforces a linear baseline (smoothing the first derivative), while an order of 2 enforces a quadratic baseline (smoothing the second derivative).\
Higher orders allow for more complex baseline shapes.
- max_iterations: `int`, optional -- Maximum number of iterations to perform.\
The algorithm iteratively refines the baseline until the convergence criterion is met or the maximum number of iterations is reached.
- edge_padding: `int`, optional -- Number of points to pad on both ends of the spectrum to improve baseline fitting at the edges.\
Padding is done using edge values to minimize boundary effects.
### Returns
- `NDArray[np.floating]` -- The fitted baseline signal array of the same length as the input spectrum (after removing padding).
### Raises
- `ValueError` -- If the ratio is not between 0 and 1.
### Reference
- https://irfpy.irf.se/projects/ica/api/api_irfpy.ica.baseline.html#irfpy.ica.baseline.arpls
### Notes
The ARPLS algorithm is particularly useful for baseline correction in spectra where the baseline is not well-defined and varies in a complex way.
The algorithm iteratively adjusts the baseline by penalizing large deviations while allowing for asymmetric weighting, making it robust for various types of spectral data.
"""
# Step 1: Pad the spectrum to reduce boundary effects during baseline fitting.
padded_spectrum = np.pad(spectrum, pad_width=edge_padding, mode = "edge")
# Step 2: Initialize the weights array.
current_weights = np.ones_like(padded_spectrum)
# Step 3: Initialize a difference array for enforcing smoothness
differences: csr_array = eye_array(padded_spectrum.shape[0], dtype=np.float64, format = "csr")
for _ in range(differentiation_order):
# Update the difference array to account for higher-order differentials if required
differences = differences[1:] - differences[:-1]
# Step 4: Compute the smoothness penalties using the difference matrix and smoothing factor
smoothness_penalies: csc_array = smoothing_factor * (differences.T @ differences)
smoothness_penalies = smoothness_penalies.tocoo()
smoothness_penalies.sum_duplicates()
# smoothness_penalies must have canonical format, or changing to csr could change
# data order
assert smoothness_penalies.has_canonical_format
initial_diag_index = np.where(smoothness_penalies.row == smoothness_penalies.col)[0]
initial_diag = smoothness_penalies.data[initial_diag_index]
smoothness_penalies = smoothness_penalies.tocsr()
assert (smoothness_penalies.data[initial_diag_index] == smoothness_penalies.diagonal()).all()
A = smoothness_penalies.copy()
# Step 5: Iteratively refine the estimated baseline
baseline = np.zeros_like(padded_spectrum)
for _ in range(max_iterations):
# Step 5a: Generate a diagonal weights array from the current weights
A.data[initial_diag_index] = initial_diag + current_weights
# Step 5b: Solve for the baseline using the current weights and smoothness penalties
b = current_weights * padded_spectrum
baseline[:] = spsolve(A, b, permc_spec = "NATURAL")
# Step 5c: Calculate residuals (difference between the actual spectrum and the baseline)
residuals = padded_spectrum - baseline
# Step 5d: Calculate statistics of the negative residuals. Negative residuals indicate regions where the baseline may be overestimated
negative_residuals = residuals[residuals < 0]
nr_mean: np.floating = negative_residuals.mean(dtype=np.float64)
nr_deviation: np.floating = negative_residuals.std(dtype=np.float64)
# Step 5e: Update the weights to penalize large deviations while allowing for asymmetry. This step adjusts the baseline fitting to be more sensitive to regions below the baseline
# Potential overflow in the original algorithm: "np.exp(2.0 * (residuals - ((2 * nr_deviation) - nr_mean)) / nr_deviation)"
exponents = 2 * (residuals - (2 * nr_deviation - nr_mean)) / nr_deviation
exponents.clip(-500, 500, out=exponents)
updated_weights = 1.0 / (1.0 + np.exp(exponents, dtype=np.float64))
# Step 5f: Check for convergence by measuring how much the weights have changed
if (norm(current_weights - updated_weights) / norm(current_weights)) < convergence_tolerance:
break # Stop iterating if the change in weights is below the specified tolerance
current_weights[:] = updated_weights
# Step 6: Remove the padding and return the baseline corresponding to the original spectrum
return baseline[edge_padding:-edge_padding]
Это примерно на 26% быстрее.
Далее я рассмотрел создание матрицы smoothness_penalies
. Главное, что следует отметить в этой матрице, это то, что она зависит от трех вещей: размера спектра, коэффициента сглаживания и порядка дифференциации. Они одинаковы для каждого вызова arpls()
. Поскольку arpls()
вызывается много раз, это хороший кандидат для мемоизации.
Для этого сначала переместите создание smoothness_penalies
в отдельную функцию. Затем примените декоратор functools.lru_cache(). Это сокращает время, затрачиваемое на настройку smoothness_penalies
с 16% времени выполнения до 0,2%.
import numpy as np
from numpy.typing import NDArray
from scipy.linalg import norm
from scipy.sparse import csc_array, csr_array, dia_array, diags_array, eye_array
from scipy.sparse.linalg import spsolve
from functools import lru_cache
@lru_cache
def initialize_smoothness_penalies(size, differentiation_order, smoothing_factor):
# Step 3: Initialize a difference array for enforcing smoothness
differences: csr_array = eye_array(size, dtype=np.float64, format = "csr")
for _ in range(differentiation_order):
# Update the difference array to account for higher-order differentials if required
differences = differences[1:] - differences[:-1]
# Step 4: Compute the smoothness penalties using the difference matrix and smoothing factor
smoothness_penalies: csc_array = smoothing_factor * (differences.T @ differences)
smoothness_penalies = smoothness_penalies.tocoo()
smoothness_penalies.sum_duplicates()
# smoothness_penalies must have canonical format, or changing to csr could change
# data order
assert smoothness_penalies.has_canonical_format
initial_diag_index = np.where(smoothness_penalies.row == smoothness_penalies.col)[0]
initial_diag = smoothness_penalies.data[initial_diag_index]
smoothness_penalies = smoothness_penalies.tocsr()
assert (smoothness_penalies.data[initial_diag_index] == smoothness_penalies.diagonal()).all()
return smoothness_penalies, initial_diag, initial_diag_index
# Sparse array implementation
def arpls(
spectrum: NDArray[np.floating],
smoothing_factor: float = 1E6,
convergence_tolerance: float = 0.05,
differentiation_order: int = 2,
max_iterations: int = 100,
edge_padding: int = 100,
) -> NDArray[np.floating]:
"""
Fits a baseline signal to the given spectrum using Asymmetrically Reweighted Penalized Least Squares smoothing.
### Parameters
- spectrum: `NDArray[np.floating]` -- The 1D spectrum input array.
- smoothing_factor: `float`, optional -- Smoothing strength parameter.\
The larger the parameter, the smoother the resulting baseline.\
High values suppress small fluctuations, which may lead to a smoother but less sensitive baseline.
- convergence_tolerance: `float`, optional -- Convergence criterion based on the change in weights between iterations.\
Must be a value between 0 and 1. Smaller values allow less negative deviations, promoting a more accurate baseline fitting.
- differentiation_order: `int`, optional -- The order of the difference operator used in penalization.\
An order of 1 enforces a linear baseline (smoothing the first derivative), while an order of 2 enforces a quadratic baseline (smoothing the second derivative).\
Higher orders allow for more complex baseline shapes.
- max_iterations: `int`, optional -- Maximum number of iterations to perform.\
The algorithm iteratively refines the baseline until the convergence criterion is met or the maximum number of iterations is reached.
- edge_padding: `int`, optional -- Number of points to pad on both ends of the spectrum to improve baseline fitting at the edges.\
Padding is done using edge values to minimize boundary effects.
### Returns
- `NDArray[np.floating]` -- The fitted baseline signal array of the same length as the input spectrum (after removing padding).
### Raises
- `ValueError` -- If the ratio is not between 0 and 1.
### Reference
- https://irfpy.irf.se/projects/ica/api/api_irfpy.ica.baseline.html#irfpy.ica.baseline.arpls
### Notes
The ARPLS algorithm is particularly useful for baseline correction in spectra where the baseline is not well-defined and varies in a complex way.
The algorithm iteratively adjusts the baseline by penalizing large deviations while allowing for asymmetric weighting, making it robust for various types of spectral data.
"""
# Step 1: Pad the spectrum to reduce boundary effects during baseline fitting.
padded_spectrum = np.pad(spectrum, pad_width=edge_padding, mode = "edge")
# Step 2: Initialize the weights array.
current_weights = np.ones_like(padded_spectrum)
smoothness_penalies, initial_diag, initial_diag_index = initialize_smoothness_penalies(padded_spectrum.shape[0], differentiation_order, smoothing_factor)
A = smoothness_penalies.copy()
# Step 5: Iteratively refine the estimated baseline
baseline = np.zeros_like(padded_spectrum)
for _ in range(max_iterations):
# Step 5a: Generate a diagonal weights array from the current weights
A.data[initial_diag_index] = initial_diag + current_weights
# Step 5b: Solve for the baseline using the current weights and smoothness penalties
b = current_weights * padded_spectrum
baseline[:] = spsolve(A, b, permc_spec = "NATURAL")
# Step 5c: Calculate residuals (difference between the actual spectrum and the baseline)
residuals = padded_spectrum - baseline
# Step 5d: Calculate statistics of the negative residuals. Negative residuals indicate regions where the baseline may be overestimated
negative_residuals = residuals[residuals < 0]
nr_mean: np.floating = negative_residuals.mean(dtype=np.float64)
nr_deviation: np.floating = negative_residuals.std(dtype=np.float64)
# Step 5e: Update the weights to penalize large deviations while allowing for asymmetry. This step adjusts the baseline fitting to be more sensitive to regions below the baseline
# Potential overflow in the original algorithm: "np.exp(2.0 * (residuals - ((2 * nr_deviation) - nr_mean)) / nr_deviation)"
exponents = 2 * (residuals - (2 * nr_deviation - nr_mean)) / nr_deviation
exponents.clip(-500, 500, out=exponents)
updated_weights = 1.0 / (1.0 + np.exp(exponents, dtype=np.float64))
# Step 5f: Check for convergence by measuring how much the weights have changed
if (norm(current_weights - updated_weights) / norm(current_weights)) < convergence_tolerance:
break # Stop iterating if the change in weights is below the specified tolerance
current_weights[:] = updated_weights
# Step 6: Remove the padding and return the baseline corresponding to the original spectrum
return baseline[edge_padding:-edge_padding]
Это включает в себя предыдущую оптимизацию и примерно на 36% быстрее, чем исходная реализация.
Результаты находятся в пределах 10-7 от оригинала, что обусловлено изменением ввода A для spsolve с CSC на CSR.
Вы можете переписать Initialize_smoothness_penalies для предоставления результатов в CSC, но это замедлит работу spsolve.
Вот набор идей, которые я исследовал, но не смог реализовать. Возможно, вам повезет больше.
Благодаря вашему руководству мне удалось значительно повысить его производительность. Я опубликовал ответ с обновленным кодом для справки.
Я попробовал несколько итеративных решателей, но, судя по результатам, я думаю, что это потребует серьезного переписывания алгоритма... Что касается вашего второго пункта о дальнейшей работе, взглянув на несколько статей, на которые ссылается umfpack
, я не не думаю, что стоит затрачивать усилия на компиляцию.
Простой и наивный способ ускорить эти вычисления — запустить их на нескольких ядрах благодаря нескольким процессам. Вот пример:
def parallel_arpls(spectra):
results = joblib.Parallel(n_jobs=-1)(joblib.delayed(arpls)(row) for row in spectra)
return np.array(results)
Вот результаты производительности процессора Ryzen 7 5700U с 8 ядрами:
%timeit -n 1 np.apply_along_axis(arpls, axis=1, arr=spectra)
1.81 s ± 57 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
%timeit -n 1 parallel_arpls(spectra)
484 ms ± 66.5 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
Это в 3,7 раза быстрее. Это не очень хорошо масштабируется, но имеет то преимущество, что очень просто.
Я думаю, чтобы добиться значительно большей производительности, следует написать более быструю реализацию spsolve
. Однако это, безусловно, действительно огромная работа (даже для опытного разработчика). Альтернативным решением может быть реализация альтернативного алгоритма с меньшим количеством итераций или более прямым подходом (для этого требуется хорошее понимание базовой математики).
Благодаря предложениям Ника О'Делла мне удалось повысить производительность более чем на 300% по сравнению с исходным кодом. Вот обновленный алгоритм для справки:
from functools import lru_cache
import numpy as np
from numpy.typing import NDArray
from scipy.linalg import norm
from scipy.sparse import coo_array, csc_array, csr_array, eye_array
from scipy.sparse.linalg import spsolve
# Only need to cache up to two parameter sets
@lru_cache(maxsize=2)
def generate_penalties(
shape: int,
differentiation_order: int,
smoothing_factor: float,
) -> tuple[NDArray[np.floating], csr_array, NDArray[np.floating], NDArray[np.intp]]:
"""
Generates the initial weights and smoothness penalty matrix used in the ARPLS algorithm.
This function precomputes the structures needed for penalizing deviations during baseline correction, which are reused across iterations.
To optimize performance, it caches the results using `lru_cache`, allowing for reuse if the function is called again with the same parameters.
### Parameters
- shape: `int` -- The length of the spectrum for which penalties are generated.
- differentiation_order: `int` -- The order of the difference operator used in penalization. See the ARPLS docstring for more details.
- smoothing_factor: `float` -- Controls the smoothness of the resulting baseline. See the ARPLS docstring for more details.
### Returns
- current_weights: `NDArray[floating]` -- Initial weights array, all set to 1, indicating uniform weighting before any iterations.
- smoothness_penalties: `csr_array` -- A sparse matrix representing the smoothness penalties to be applied during baseline fitting.
- diagonal: `NDArray[floating]` -- The initial diagonal values of the penalty matrix, stored for efficient updates during iterations.
- diagonal_indices: `NDArray[intp]` -- Indices where diagonal elements are located in the sparse matrix data array.
### Notes
The caching mechanism assumes that the function is called with similar parameters multiple times, which helps in optimizing performance in repeated baseline fitting tasks.
"""
# Initialize the weights array to all ones, indicating no preference for any part of the spectrum initially
weights = np.ones(shape, dtype=np.float32)
# Create a sparse identity matrix and apply the difference operator iteratively to build the smoothness penalty matrix
differences = csr_array(eye_array(shape, dtype=np.float64, format = "csr"))
for _ in range(differentiation_order):
# Subtracts shifted matrices to build higher-order differences
differences = csr_array(differences[1:] - differences[:-1])
# Apply the smoothing factor to the penalty matrix, converting it to a compressed sparse column format for efficient computation
smoothness_penalties = csc_array(smoothing_factor * (differences.T @ differences))
smoothness_penalties = coo_array(smoothness_penalties.tocoo())
# Ensure there are no duplicate entries in the sparse matrix
smoothness_penalties.sum_duplicates()
# Identify and store the diagonal elements for quick updates during baseline fitting iterations
diagonal_indices: NDArray[np.intp] = np.where(smoothness_penalties.row == smoothness_penalties.col)[0]
diagonal = smoothness_penalties.data[diagonal_indices]
# Convert to CSR format for solving the linear system
smoothness_penalties = csr_array(smoothness_penalties.tocsr())
return weights, smoothness_penalties, diagonal, diagonal_indices
def arpls(
spectrum: NDArray[np.floating],
edge_padding: int = 100,
differentiation_order: int = 2,
smoothing_factor: float = 1E6,
convergence_tolerance: float = 0.05,
max_iterations: int = 100,
) -> NDArray[np.floating]:
"""
Fits a baseline signal to the given spectrum using the Asymmetrically Reweighted Penalized Least Squares algorithm.
This method iteratively fits a smooth baseline to a given spectrum by minimizing the effect of noise while preserving the original signal's characteristics.
The ARPLS algorithm adjusts the baseline by asymmetrically penalizing negative deviations, making it particularly robust for spectral data with varying baseline shapes.
### Parameters
- spectrum: `NDArray[floating]` -- The 1D spectrum input array.
- edge_padding: `int` -- Number of points to pad on both ends of the spectrum to improve baseline fitting at the edges.\
Padding is done using edge values to minimize boundary effects.
- differentiation_order: `int` -- The order of the difference operator used in penalization.\
An order of 1 enforces a linear baseline (smoothing the first derivative), while an order of 2 enforces a quadratic baseline (smoothing the second derivative).\
Higher orders allow for more complex baseline shapes.
- smoothing_factor: `float` -- Controls the smoothness of the resulting baseline.\
Larger values suppress small fluctuations, which may lead to a smoother, but less sensitive baseline.
- convergence_tolerance: `float` -- Convergence criterion based on the change in weights between iterations.\
Must be a value between 0 and 1. Smaller values allow less negative deviations, promoting a more accurate baseline fitting.
- max_iterations: `int` -- Maximum number of iterations to perform.\
The algorithm iteratively refines the baseline until the convergence criterion is met or the maximum number of iterations is reached.
### Returns
- `NDArray[floating]` -- The fitted baseline signal array of the same length as the input spectrum (after removing padding).
### Reference
Asymmetrically Reweighted Penalized Least Squares for Baseline Correction: https://irfpy.irf.se/projects/ica/api/api_irfpy.ica.baseline.html#irfpy.ica.baseline.arpls
### Notes
The ARPLS algorithm is particularly useful for baseline correction in spectra where the baseline is not well-defined and varies in a complex way.
The algorithm iteratively adjusts the baseline by penalizing large deviations while allowing for asymmetric weighting, making it robust for various types of spectral data.
"""
# Step 1: Pad the spectrum to mitigate boundary effects, which helps in obtaining a more accurate baseline
padded_spectrum = np.pad(spectrum, pad_width=edge_padding, mode = "edge")
# Step 2: Generate initial weights, penalty matrix, and indices for diagonal elements
weights, penalties, diagonal, diagonal_index = generate_penalties(padded_spectrum.shape[0], differentiation_order, smoothing_factor)
# Step 4: Begin iterative refinement of the baseline using updated weights and penalties
for _ in range(max_iterations):
# Step 5a: Update the diagonal elements of the penalty matrix with the current weights.
penalties.data[diagonal_index] = diagonal + weights
# Step 5b: Solve the linear system to obtain the new baseline.
baseline: NDArray[np.floating] = spsolve(penalties, weights * padded_spectrum, permc_spec = "NATURAL", use_umfpack=False)
# Step 5c: Calculate residuals (difference between the actual spectrum and the baseline)
residuals = padded_spectrum - baseline
# Step 5d: Extract the negative residuals, which indicate regions where the baseline may be overestimated
negative_residuals = residuals[residuals < 0]
if not negative_residuals.size:
break # Stop iterating if residuals are all positive
# Step 5e: Calculate statistics of the negative residuals.
nr_mean: np.floating = negative_residuals.mean(dtype=np.float64)
nr_deviation: np.floating = negative_residuals.std(dtype=np.float64)
# Step 5f: Update the weights to penalize large deviations while allowing for asymmetry. This step adjusts the baseline fitting to be more sensitive to regions below the baseline
# Potential overflow in the original algorithm: "np.exp(2.0 * (residuals - ((2 * nr_deviation) - nr_mean)) / nr_deviation)"
exponents = 2 * (residuals - (2 * nr_deviation - nr_mean)) / nr_deviation
exponents.clip(-500, 500, out=exponents)
updated_weights = 1.0 / (1.0 + np.exp(exponents, dtype=np.float64))
# Step 5g: Check for convergence by measuring how much the weights have changed
if (norm(weights - updated_weights) / norm(weights)) < convergence_tolerance:
break # Stop iterating if the change in weights is below the specified tolerance
# Step 5h: Update the weights for the next iteration
weights[:] = updated_weights
# Step 6: Remove the padding and return the baseline corresponding to the original spectrum
return baseline[edge_padding:-edge_padding]
Как предположил Жером Ришар, многопоточность, скорее всего, станет следующим шагом.
Если вы используете такой инструмент, как line_profiler, где сказано, что он тратит большую часть своего времени?