У меня много (~ 1 миллион) точек P с неправильным интервалом вдоль одномерной линии. Эти маркируют сегменты линии таким образом, что если точки имеют вид {0, x_a, x_b, x_c, x_d, ...}, сегменты идут от 0-> x_a, x_a-> x_b, x_b-> x_c, x_c- > x_d и т.д. У меня также есть значение y для каждого сегмента, которое я хочу интерпретировать как глубину цвета. Мне нужно построить эту линию как изображение, но может быть только (скажем) 1000 пикселей для представления всей длины линии. Эти пиксели, конечно, соответствуют регулярным интервалам вдоль линии, скажем, в 0..X1, X1..X2, X2..X3 и т. д., Где X1, X2, X3 расположены регулярно. Чтобы определить цвет для каждого пикселя, мне нужно взять среднее значение всех значений y, которые попадают в границы равномерно распределенных пикселей, взвешенных по длине сегмента, попадающего в этот интервал. Также могут быть пиксели, которые не содержат никакого значения в P, и которые просто принимают значение цвета, определенное сегментом, который проходит через весь пиксель.
Это похоже на то, что, вероятно, нужно много делать при анализе изображений. Итак, есть ли имя для этой операции и какой самый быстрый способ в numpy вычислить такой регулярный набор средних значений y? Полагаю, это немного похоже на интерполяцию, только я хочу брать не только среднее значение двух окружающих точек, а средневзвешенное значение всех точек в пределах регулярного интервала (плюс небольшое перекрытие).
[Edit - добавлен минимальный пример]
Скажем, есть 5 сегментов вдоль горизонтальной линии, разделенных [0, 1.1, 2.2, 2.3, 2.8, 4] (т.е. линия идет от 0 до 4). Предположим, что каждый из сегментов принимает произвольные значения затенения, например, мы могли бы иметь 5 значений затенения [0,0.88,0.55,0.11,0.44], где 0 - черный, а 1 - белый. Затем, если бы я хотел построить это, используя 4 пикселя, мне нужно было бы создать 4 значения, от 0 ... 1, 1 ... 2 и т.д., и ожидал бы, что расчет вернет следующие значения для каждого:
0 ... 1 = 0 (это покрывается первым отрезком линии, 0-> 1.1)
1 ... 2 = 0,1 * 0 + 0,9 * 0,88 (1 ... 1,1 покрывается первым отрезком линии, остальное - вторым)
2 ... 3 = 0,2 * 0,88, 0,1 * 0,55 + 0,5 * 0,11 + 0,2 * 0,44 (это покрывается сегментами со второго по пятый)
3 ... 4 = 0,44 (это покрывается последним отрезком линии, 2,8-> 4)
Если бы я хотел уместить эти данные в строку длиной 2 пикселя, эти 2 пикселя имели бы следующие значения:
0 ... 2 = 1,1 / 2 * 0 + 0,9 / 2 * 0,88
2 ... 4 = 0,2 / 2 * 0,88 + 0,1 / 2 * 0,55 + 0,5 / 2 * 0,11 + 1,2 * 0,44
Это кажется «правильным» способом понижения дискретизации по 1d линии. Я ищу быструю реализацию (в идеале что-то встроенное), когда у меня есть (скажем) миллион точек вдоль линии и только 1000 (или около того) пикселей, чтобы уместить их.
Вопросы обычно лучше получать, когда вы уже проделали тяжелую работу по созданию входных данных, некоторого (псевдокода) и ожидаемого результата. См. минимальный воспроизводимый пример
@DanielF - спасибо. Я добавил числовой пример
Я думал, что у меня есть для вас умный ответ, но вместо этого мне пришлось задать другой вопрос: stackoverflow.com/q/52319504/2988730
Учитывая ваше ключевое требование, заключающееся в том, что вам не нужна линейная интерполяция, вам следует взглянуть на использование scipy.signal.resample.
Это преобразует ваш сигнал в спектр, а затем в новый временной ряд, который равномерно разнесен по оси x.
Также см. Этот вопрос: Как равномерно передискретизировать неоднородный сигнал с помощью scipy.
Спасибо. Однако похоже, что этот метод пытается выбрать одну точку с заданным значением, предполагая, что сигнал более или менее плавный. Мне нужно среднее значение по диапазону (а не точечная оценка), а значения Y могут вообще не изменяться плавно, поэтому подгонка кривой кажется (а) не совсем подходящей для произвольных (негладких, непериодических ) сигналы и (б) чрезмерно сложный / ресурсоемкий для того, что кажется простой (линейной) задачей.
Вы все еще можете сделать это с помощью линейной интерполяции. Хотя ваша функция кусочно-постоянная, вам нужно ее среднее значение по целому ряду небольших интервалов. Среднее значение некоторой функции ж (Икс) в интервале от а до б - это просто ее интеграл в этом диапазоне, деленный на разность между а и б. Интеграл от кусочно-постоянной функции будет кусочно-линейной функцией. Итак, предположим, что у вас есть данные:
x = [0, 1.1, 2.2, 2.3, 2.8, 4]
y = [0, 0.88, 0.55, 0.11, 0.44]
Создайте функцию, которая даст свой интеграл при любом значении Икс. Здесь массив I
будет содержать значение интеграла для каждого из заданных вами значений Икс, а функция f
- это его линейный интерполянт, который даст точное значение в любой точке:
I = numpy.zeros_like(x)
I[1:] = numpy.cumsum(numpy.diff(x) * y)
f = scipy.interpolate.interp1d(x, I)
Теперь легко оценить среднее значение по каждому пикселю:
pix_x = numpy.linspace(0, 4, 5)
pix_y = (f(pix_x[1:]) - f(pix_x[:-1])) / (pix_x[1:] - pix_x[:-1])
Мы можем проверить, что находится в этих массивах:
>>> pix_x
array([0., 1., 2., 3., 4.])
>>> pix_y
array([0. , 0.792, 0.374, 0.44 ])
Значения затенения для ваших пикселей теперь находятся в pix_y
. Они должны точно соответствовать значениям, которые вы указали выше в своем примере.
Это должно быть довольно быстро даже для многих, многих точек:
def test(x, y):
I = numpy.zeros_like(x)
I[1:] = numpy.cumsum(numpy.diff(x) * y)
f = scipy.interpolate.interp1d(x, I,
bounds_error=False, fill_value=(0, I[-1]))
pix_x = numpy.linspace(0, 1, 1001)
pix_y = (f(pix_x[1:]) - f(pix_x[:-1])) / (pix_x[1:] - pix_x[:-1])
return pix_y
timeit
сообщает:
225 ms ± 37.6 ms per loop
в моей системе, когда x имеет размер 1000000 (и y размера 999999). Обратите внимание, что bounds_error=False
и fill_value=(0, I[-1])
передаются в interp1d
. В результате предполагается, что ваша функция затенения равна нулю за пределами диапазона значений Икс. Кроме того, interp1d
не требует сортировки входных значений; в приведенном выше тесте я дал и Икс, и у как массивы однородных случайных чисел от 0 до 1. Однако, если вы точно знаете, что они отсортированы, вы можете пройти assume_sorted=True
, и вы должны получить увеличение скорости:
20.2 ms ± 377 µs per loop
Сегодня я проведу несколько тестов с нашими двумя алгоритмами. Я подозреваю, что ваш будет намного быстрее :)
Как и следовало ожидать, существует простое решение этой проблемы. Уловка состоит в том, чтобы грамотно смешать np.searchsorted
, который поместит вашу обычную сетку в ближайший бункер оригинала, и np.add.reduceat
для вычисления сумм бинов:
import numpy as np
def distribute(x, y, n):
"""
Down-samples/interpolates the y-values of each segment across a
domain with `n` points. `x` represents segment endpoints, so should
have one more element than `y`.
"""
y = np.asanyarray(y)
x = np.asanyarray(x)
new_x = np.linspace(x[0], x[-1], n + 1)
# Find the insertion indices
locs = np.searchsorted(x, new_x)[1:]
# create a matrix of indices
indices = np.zeros(2 * n, dtype=np.int)
# Fill it in
dloc = locs[:-1] - 1
indices[2::2] = dloc
indices[1::2] = locs
# This is the sum of every original segment a new segment touches
weighted = np.append(y * np.diff(x), 0)
sums = np.add.reduceat(weighted, indices)[::2]
# Now subtract the adjusted portions from the right end of the sums
sums[:-1] -= (x[dloc + 1] - new_x[1:-1]) * y[dloc]
# Now do the same for the left of each interval
sums[1:] -= (new_x[1:-1] - x[dloc]) * y[dloc]
return new_x, sums / np.diff(new_x)
seg = [0, 1.1, 2.2, 2.3, 2.8, 4]
color = [0, 0.88, 0.55, 0.11, 0.44]
seg, color = distribute(seg, color, 4)
print(seg, color)
Результат
[0. 1. 2. 3. 4.] [0. 0.792 0.374 0.44 ]
Это именно то, что вы ожидали от ручных вычислений.
Контрольные точки
Я провел следующий набор тестов, чтобы убедиться, что и Решение EE_, и мой согласились с ответом, и проверить тайминги. Я немного изменил другое решение, чтобы иметь тот же интерфейс, что и мой:
from scipy.interpolate import interp1d
def EE_(x, y, n):
I = np.zeros_like(x)
I[1:] = np.cumsum(np.diff(x) * y)
f = interp1d(x, I, bounds_error=False, fill_value=(0, I[-1]))
pix_x = np.linspace(x[0], x[-1], n + 1)
pix_y = (f(pix_x[1:]) - f(pix_x[:-1])) / (pix_x[1:] - pix_x[:-1])
return pix_x, pix_y
А вот и тестовый стенд (метод MadPhysicist
просто переименован из функции distribute
выше). На входе всегда 1001 элемент для x
и 1000 элементов для y
. Выходные числа: 5, 10, 100, 1000, 10000:
np.random.seed(0x1234ABCD)
x = np.cumsum(np.random.gamma(3.0, 0.2, size=1001))
y = np.random.uniform(0.0, 1.0, size=1000)
tests = (
MadPhysicist,
EE_,
)
for n in (5, 10, 100, 1000, 10000):
print(f'N = {n}')
results = {test.__name__: test(x, y, n) for test in tests}
for name, (x_out, y_out) in results.items():
print(f'{name}:\n\tx = {x_out}\n\ty = {y_out}')
allsame = np.array([[np.allclose(x1, x2) and np.allclose(y1, y2)
for x2, y2 in results.values()]
for x1, y1 in results.values()])
print()
print(f'Result Match:\n{allsame}')
from IPython import get_ipython
magic = get_ipython().magic
for test in tests:
print(f'{test.__name__}({n}):\n\t', end='')
magic(f'timeit {test.__name__}(x, y, n)')
Я пропущу распечатки данных и договоров (результаты идентичны), а время покажу:
N = 5
MadPhysicist: 50.6 µs ± 349 ns per loop (mean ± std. dev. of 7 runs, 10000 loops each)
EE_: 110 µs ± 568 ns per loop (mean ± std. dev. of 7 runs, 10000 loops each)
N = 10
MadPhysicist: 50.5 µs ± 732 ns per loop (mean ± std. dev. of 7 runs, 10000 loops each)
EE_: 111 µs ± 635 ns per loop (mean ± std. dev. of 7 runs, 10000 loops each)
N = 100
MadPhysicist: 54.5 µs ± 284 ns per loop (mean ± std. dev. of 7 runs, 10000 loops each)
EE_: 114 µs ± 215 ns per loop (mean ± std. dev. of 7 runs, 10000 loops each)
N = 1000
MadPhysicist: 107 µs ± 5.73 µs per loop (mean ± std. dev. of 7 runs, 10000 loops each)
EE_: 148 µs ± 5.11 µs per loop (mean ± std. dev. of 7 runs, 10000 loops each)
N = 10000
MadPhysicist: 458 µs ± 2.21 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
EE_: 301 µs ± 4.57 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
Вы можете видеть, что при меньших размерах вывода решение numpy работает намного быстрее, вероятно, потому, что преобладают накладные расходы. Однако при большем количестве точек останова решение scipy становится намного быстрее. Чтобы получить реальное представление о том, как работает синхронизация, вам придется сравнивать разные размеры входных данных, а не только разные выходные размеры.
Отлично, большое спасибо. Я попробую сравнить его с ответом @EE_ ниже, но здорово иметь 2 подхода
@ user2667066. Я добавил несколько идей и результатов для сравнительного анализа
Отлично, спасибо @ mad-Physicist. Я намеревался это сделать, но не нашел для этого времени. Я отмечу это как предпочтительный, поскольку в нем перечислены оба решения.
Думаю, небольшое изображение исходных данных поможет распознать проблему, которую вы пытаетесь решить.