Предположим, мои данные выглядят так:
data = {
'value': [1,9,6,7,3, 2,4,5,1,9]
}
Для каждой строки я хотел бы найти номер строки последнего предыдущего элемента, больший, чем текущий.
Итак, мой ожидаемый результат:
[None, 0, 1, 2, 1, 1, 3, 4, 1, 0]
1 нет предыдущего элемента, поэтому я хочу None в результате9 по крайней мере такой же большой, как и все его предыдущие элементы, поэтому я хочу, чтобы в результате был 06 имеет предыдущий элемент 9, который больше его. Расстояние между ними 1. Итак, я хочу 1 в результате здесь.Я знаю, что могу сделать это в цикле на Python (или на C/Rust, если напишу расширение).
Мой вопрос: можно ли решить эту проблему, используя полностью операции с данными? панды или полярные животные — подойдет и то, и другое. Но только операции с данными.
Итак, ничего из следующего, пожалуйста:
applymap_elementsmap_rowsiter_rowsЕсли я правильно понял проблему, вы не сможете сделать это с помощью векторизованных операций только для панд. Вам нужно перебирать значения одно за другим.
@Hericks Я ищу расстояние до индекса предыдущего самого большого элемента. Для 7 его индекс равен 3, а индекс 9 (предыдущего по величине элемента) равен 1 и 3-1=2.
Если есть верхняя граница, вы можете сделать что-то вроде pl.coalesce(pl.when(col("value")<col("value").shift(x)).then(lit(x)) for x in range(upper_bound))






Полагаю, вы ищете часть алгоритма для реализации на Rust, поэтому предлагаю вам следующее:
import pandas as pd
import time
import numpy as np
data = {
'value': [1, 9, 6, 7, 3, 2, 4, 5, 1, 9]
}
df = pd.DataFrame(data)
values = df['value'].tolist()
start = time.time()
### Algorithm for implementation in Rust, C ...
_max = values[0]
r = [None]
for i in range(1, len(values)):
prev = values[:i+1][:-1]
last = values[:i+1][-1]
dist=0
_max = max(prev) if last >= _max else _max
for j in range(len(prev)-1, -1, -1):
if last < _max:
dist+=1
else:
r.append(dist)
break
if last < prev[j]:
r.append(dist)
break
end = time.time()
print(end-start)
print(r)
[None, 0, 1, 2, 1, 1, 3, 4, 1, 0]
Чтобы реализовать расчет в фрейме данных после расчета в Rust, Python или чем-то еще:
df['out'] = r
print(df)
value out
0 1 NaN
1 9 0.0
2 6 1.0
3 7 2.0
4 3 1.0
5 2 1.0
6 4 3.0
7 5 4.0
8 1 1.0
9 9 0.0
Реализация Rust (см. PyO3):
(Априори логика алгоритма Python должна быть сохранена)
Онлайн-компилятор: https://play.rust-lang.org/?version=stable&mode=debug&edition=2021
use std::time::Instant;
fn main() {
let values = vec![1, 9, 6, 7, 3, 2, 4, 5, 1, 9];
let mut r: Vec<Option<usize>> = Vec::new();
let mut _max = values[0];
r.push(None);
let start = Instant::now();
for i in 1..values.len() {
let last = values[i];
let mut dist = 0;
// Calculate _max if last is greater than or equal to previous _max
_max = if last >= _max {
last
} else {
*values[..i].iter().max().unwrap()
};
for &value in values[..i].iter().rev() {
if last < _max {
dist += 1;
if last < value {
r.push(Some(dist));
break;
}
} else {
r.push(Some(dist));
break;
}
}
}
let duration = start.elapsed();
println!("Time elapsed is: {:?}", duration);
println!("{:?}", r);
}
Результат (проверено онлайн):
Time elapsed is: 5.02µs
[None, Some(0), Some(1), Some(2), Some(1), Some(1), Some(3), Some(4), Some(1), Some(0)]
Примечание :
Для распараллеливания задач в Rust вы можете использовать radius.rayon, который предоставляет параллельные итераторы для распараллеливания многих задач по обработке данных.
Возможный вариант с использованием pandas / numpy :
# mask & indices
tril = np.tril(arr[:, None] < arr)
last = np.where(tril, ser.index, -1).max(axis=1)
# distance
dist = (ser.index - last).where(last != -1, 0).astype("Int64")
dist.array[0] = np.nan
Выход :
>>> dist.tolist()
# [<NA>, 0, 1, 2, 1, 1, 3, 4, 1, 0]
Используемый вход:
import pandas as pd
import numpy as np
ser = pd.Series(data["value"])
arr = ser.to_numpy()
Я тоже это имел в виду, но это O(n²);)
Тогда это, наверное, плохой подход. Я просто хотел дать ему шанс ;)
Это хороший подход, но он ограничен входными данными разумного размера :)
Использование janitor Conditional_join для выполнения самосоединения с условиями:
import janitor
tmp = df.reset_index()
df['out'] = (tmp
.conditional_join(tmp, ('index', 'index', '>'), ('value', 'value', '<'),
keep='last', how='left', right_columns='index')
[('right', 'index')]
.rsub(tmp.index)
.fillna(pd.Series(0, index=tmp.index[1:])).to_numpy()
)
Выход:
value out
0 1 NaN
1 9 0.0
2 6 1.0
3 7 2.0
4 3 1.0
5 2 1.0
6 4 3.0
7 5 4.0
8 1 1.0
9 9 0.0
это работает, но всего за 100_000 строк мне не хватает памяти. Спасибо хоть!
Векторизировать подобные задачи сложно, но вы можете использовать модуль numba, чтобы ускорить задачу. Также эту проблему можно очень легко распараллелить:
from numba import njit, prange
@njit(parallel=True)
def get_values(values):
out = np.zeros_like(values, dtype=np.float64)
for i in prange(len(values)):
idx = np.int64(i)
v = values[idx]
while idx > -1 and values[idx] <= v:
idx -= 1
if idx > -1:
out[i] = i - idx
out[0] = np.nan
return out
data = {
"value": [1, 9, 6, 7, 3, 2, 4, 5, 1, 9],
"out": [None, 0, 1, 2, 1, 1, 3, 4, 1, 0],
}
df = pd.DataFrame(data)
df["out2"] = get_values(df["value"].values)
print(df)
Распечатки:
value out out2
0 1 NaN NaN
1 9 0.0 0.0
2 6 1.0 1.0
3 7 2.0 2.0
4 3 1.0 1.0
5 2 1.0 1.0
6 4 3.0 3.0
7 5 4.0 4.0
8 1 1.0 1.0
9 9 0.0 0.0
Тест (с 1_000_000 элементов от 1 до 100):
from timeit import timeit
data = {
"value": np.random.randint(1, 100, size=1_000_000),
}
df = pd.DataFrame(data)
t = timeit('df["out"] = get_values(df["value"].values)', globals=globals(), number=1)
print(t)
Печатает на моей машине (AMD 5700x):
0.3559090679627843
Это выполняет итерацию только по диапазону строк, который должен выглядеть. Он не перебирает сами строки в Python. Если ваш начальный bound_range охватывает все случаи, то цикл на самом деле никогда не будет выполняться.
lb=0
bound_range=3
df=df.with_columns(z=pl.lit(None, dtype=pl.UInt64))
while True:
df=df.with_columns(
z=pl.when(pl.col('value')>=pl.col('value').shift(1).cum_max())
.then(pl.lit(0, dtype=pl.UInt64))
.when(pl.col('z').is_null())
.then(
pl.coalesce(
pl.when(pl.col('value')<pl.col('value').shift(x))
.then(pl.lit(x, dtype=pl.UInt64))
for x in range(lb, lb+bound_range)
)
)
.otherwise(pl.col('z'))
)
if df[1:]['z'].drop_nulls().shape[0]==df.shape[0]-1:
break
lb+=bound_range
В этом примере я установил bound_range на 3, чтобы убедиться, что он зацикливается хотя бы один раз. Я запустил это с 1 млн случайных целых чисел от 0 до 9 (включительно) и установил для параметраbound_range значение 50, и это заняло менее 2 секунд. Вы можете сделать это более разумным между циклами, проверяя вещи более явно, но лучший подход будет зависеть от данных.
Предыдущий пост слишком длинный, представляю здесь еще одну версию с тестом скорости:
# Significative speed improvement with this new version
import pandas as pd
import time
import numpy as np
def calculation(values):
start = time.time()
r = [None]
max_value = values[0]
for i in range(1, len(values)):
if values[i] >= max_value:
max_value = values[i]
r.append(0)
else:
dist = 0
for j in range(i - 1, -1, -1):
dist += 1
if values[j] > values[i]:
r.append(dist)
break
end = time.time()
return [r, end - start]
data = {
"value": [1, 9, 6, 7, 3, 2, 4, 5, 1, 9]
}
df = pd.DataFrame(data)
values_10 = df['value'].tolist()
data = {
"value": np.random.randint(1, 100, size=1_000_000),
}
df = pd.DataFrame(data)
values_1M = df['value'].tolist()
Тесты скорости:
c = calculation(values_10)
print(f"values_10 : \n Result : {c[0]} \n Speed : {c[1]}")
# values_10 :
# Result : [None, 0, 1, 2, 1, 1, 3, 4, 1, 0]
# Speed : 5.0067901611328125e-06
c = calculation(values_1M)
print(f"values_1M : \n Result : {c[0]} \n Speed : {c[1]}")
# ...Speed : 0.49079418182373047
Тест скорости с ржавчиной (о реализации см. в предыдущем посте):
use rand::Rng;
use std::time::Instant;
fn calculation(values: &[i32]) -> (Vec<Option<usize>>, f64) {
let start = Instant::now();
let mut r = vec![None];
let mut max_value = values[0];
for i in 1..values.len() {
let current_value = values[i];
if current_value >= max_value {
max_value = current_value;
r.push(Some(0));
} else {
let mut dist = 0;
for j in (0..i).rev() {
dist += 1;
if values[j] > current_value {
r.push(Some(dist));
break;
}
}
}
}
let duration = start.elapsed().as_secs_f64();
(r, duration)
}
fn main() {
// let values_10: Vec<i32> = vec![1, 9, 6, 7, 3, 2, 4, 5, 1, 9];
// For the 1_000_000 random values, we'll use Rust's rand crate
let mut rng = rand::thread_rng();
let values_1m: Vec<i32> = (0..1_000_000).map(|_| rng.gen_range(1..100)).collect();
// let (result_10, speed_10) = calculation(&values_10);
// println!("values_10 : \n Result : {:?} \n Speed : {:?}", result_10, speed_10);
// Uncomment the following lines to run the algorithm on the vector of 1,000,000 random values
let (result_1m, speed_1m) = calculation(&values_1m);
println!("values_1M : \n Result : {:?} \n Speed : {:?}", result_1m, speed_1m);
}
values_10 :
Result : [None, Some(0), Some(1), Some(2), Some(1), Some(1), Some(3), Some(4), Some(1), Some(0)]
Speed : 4.72e-6
...
Speed : 0.082586922
Примечание :
Результат может быть даже значительно лучше при распараллеливании с ржавчиной (искусственный ящик).
Не могли бы вы подробнее рассказать об ожидаемом результате? Для 6 я вижу, что 9 (индекс = 1) является единственным предыдущим элементом, большим, чем 6. Однако для 7, 9 (индекс = 1) по-прежнему является единственным предыдущим элементом, большим, чем 7. Почему ожидаемый результат 2 в этом случае?