Если вы из будущего, надеюсь этот пиар уже слит.
Если вы не из будущего, надеюсь, этот ответ решит вашу проблему.
Я хочу решить свою проблему только с помощью поляров (в которых я не эксперт, но могу следить за тем, что происходит), прежде чем просто копировать предложенную выше интеграцию DuckDB и сравнивать результаты с моими реальными данными.
У меня есть список событий (имя и отметка времени) и список временных окон. Я хочу подсчитать, сколько каждого события происходит в каждом временном окне.
Я чувствую, что близок к тому, чтобы получить что-то, что работает правильно, но я застрял на пару часов:
import polars as pl
events = {
"name": ["a", "b", "a", "b", "a", "c", "b", "a", "b", "a", "b", "a", "b", "a", "b", "a", "b", "a", "b"],
"time": [0.0, 1.0, 1.5, 2.0, 2.25, 2.26, 2.45, 2.5, 3.0, 3.4, 3.5, 3.6, 3.65, 3.7, 3.8, 4.0, 4.5, 5.0, 6.0],
}
windows = {
"start_time": [1.0, 2.0, 3.0, 4.0],
"stop_time": [3.5, 2.5, 3.7, 5.0],
}
events_df = pl.DataFrame(events).sort("time").with_row_index()
windows_df = (
pl.DataFrame(windows)
.sort("start_time")
.join_asof(events_df, left_on = "start_time", right_on = "time", strategy = "forward")
.drop("name", "time")
.rename({"index": "first_index"})
.sort("stop_time")
.join_asof(events_df, left_on = "stop_time", right_on = "time", strategy = "backward")
.drop("name", "time")
.rename({"index": "last_index"})
)
print(windows_df)
"""
shape: (4, 4)
┌────────────┬───────────┬─────────────┬────────────┐
│ start_time ┆ stop_time ┆ first_index ┆ last_index │
│ --- ┆ --- ┆ --- ┆ --- │
│ f64 ┆ f64 ┆ u32 ┆ u32 │
╞════════════╪═══════════╪═════════════╪════════════╡
│ 2.0 ┆ 2.5 ┆ 3 ┆ 7 │
│ 1.0 ┆ 3.5 ┆ 1 ┆ 10 │
│ 3.0 ┆ 3.7 ┆ 8 ┆ 13 │
│ 4.0 ┆ 5.0 ┆ 15 ┆ 17 │
└────────────┴───────────┴─────────────┴────────────┘
"""
На данный момент для каждого временного окна я могу получить индекс первого и последнего события, которое меня волнует. Теперь мне «просто» нужно посчитать, сколько их каждого типа. Могу ли я получить помощь о том, как это сделать?
Результат, который я ищу, должен выглядеть так:
shape: (4, 5)
┌────────────┬───────────┬─────┬─────┬─────┐
│ start_time ┆ stop_time ┆ a ┆ b ┆ c │
│ --- ┆ --- ┆ --- ┆ --- ┆ --- │
│ f64 ┆ f64 ┆ i64 ┆ i64 ┆ i64 │
╞════════════╪═══════════╪═════╪═════╪═════╡
│ 1.0 ┆ 3.5 ┆ 4 ┆ 5 ┆ 1 │
│ 2.0 ┆ 2.5 ┆ 2 ┆ 2 ┆ 1 │
│ 3.0 ┆ 3.7 ┆ 3 ┆ 3 ┆ 0 │
│ 4.0 ┆ 5.0 ┆ 2 ┆ 1 ┆ 0 │
└────────────┴───────────┴─────┴─────┴─────┘
Мне хочется использовать что-то вроде int_ranges() , сбора() и взрыв() , чтобы получить фрейм данных с каждым временным окном и всеми соответствующими событиями. Наконец, что-то вроде group_by() , count() и Pivot() может привести меня к нужному фрейму данных. Но я боролся с этим некоторое время.
@RomanPekar Да, в ответе DuckDB, который я цитировал выше (это ваш, хе-хе), обратите внимание, что я спрашивал, потому что решил это с помощью cross
join, но размер кадра данных увеличивается, и мой процесс прекращается.
Не уверен, что это будет более производительно, но вы можете превратить свой windows_df
в желаемый результат с помощью:
first_index
до last_index
.join()
, чтобы снова присоединиться к events_df
.(
windows_df
.with_columns(index = pl.int_ranges(pl.col.first_index, pl.col.last_index, dtype=pl.UInt32))
.explode("index")
.join(events_df, on = "index", how = "inner")
.pivot(on = "name", index=["start_time","stop_time"], aggregate_function = "len", values = "index")
.fill_null(0)
)
┌────────────┬───────────┬─────┬─────┬─────┐
│ start_time ┆ stop_time ┆ a ┆ b ┆ c │
│ --- ┆ --- ┆ --- ┆ --- ┆ --- │
│ f64 ┆ f64 ┆ u32 ┆ u32 ┆ u32 │
╞════════════╪═══════════╪═════╪═════╪═════╡
│ 2.0 ┆ 2.5 ┆ 1 ┆ 2 ┆ 1 │
│ 1.0 ┆ 3.5 ┆ 4 ┆ 4 ┆ 1 │
│ 3.0 ┆ 3.7 ┆ 2 ┆ 3 ┆ 0 │
│ 4.0 ┆ 5.0 ┆ 1 ┆ 1 ┆ 0 │
└────────────┴───────────┴─────┴─────┴─────┘
Вы не представляете, как долго я боролся с этим. Спасибо. Обратите внимание, что вместо pl.col.last_index
должно быть pl.col.last_index + 1
. Но это только потому, что я получал последний инклюзивный индекс. Еще раз спасибо.
У меня есть еще одна идея о том, как это можно рассчитать, немного по-другому (и, возможно, быстрее), но сейчас я на мобильном телефоне, проверю ее позже.
не могли бы вы указать мне на документы для pl.col.first_index
, так как я искал, но ничего не нашел (вероятно, я ищу неправильно)
@sammywemmy Это то же самое, что и pl.col("first_index")
(добавлен атрибут Polars -> диспетчеризация имени столбца для соответствия поведению pandas df.column_name после нескольких запросов)
Вы можете избежать использования нескольких join_asofs и использовать search_sorted
— именно так вы в любом случае будете обрабатывать соединение диапазона, если ваши данные соответствуют сценарию. Ваши данные отсортированы по столбцу time
, поэтому воспользуйтесь бинарным поиском, чтобы узнать начало и конец. К сожалению, explode
нельзя избежать (возможно, существует более производительный маршрут, о котором я не знаю) - было бы намного лучше/быстрее, если бы можно было избежать части explode
, а агрегирование выполнялось в начале и конце - это могло бы потребовать вхожу в ржавчину и делаю это.
time = events.get_column('time')
stop_time = windows.get_column('stop_time')
start_time = windows.get_column('start_time')
starts = time.search_sorted(start_time, side='left')
ends = time.search_sorted(stop_time, side='right')
indices = pl.int_ranges(starts, ends, dtype=pl.UInt32)
(windows
.with_columns(indices=indices)
# ideally would love to avoid the explosion here
# iterating in a low level language and aggregating
# should offer more performance
.explode('indices')
.join(events.with_row_index(name='indices'), on='indices')
.pivot(
index=['start_time', 'stop_time'],
on='name',
aggregate_function='len',
values='time')
.fill_null(0)
)
shape: (4, 5)
┌────────────┬───────────┬─────┬─────┬─────┐
│ start_time ┆ stop_time ┆ b ┆ a ┆ c │
│ --- ┆ --- ┆ --- ┆ --- ┆ --- │
│ f64 ┆ f64 ┆ u32 ┆ u32 ┆ u32 │
╞════════════╪═══════════╪═════╪═════╪═════╡
│ 1.0 ┆ 3.5 ┆ 5 ┆ 4 ┆ 1 │
│ 2.0 ┆ 2.5 ┆ 2 ┆ 2 ┆ 1 │
│ 3.0 ┆ 3.7 ┆ 3 ┆ 3 ┆ 0 │
│ 4.0 ┆ 5.0 ┆ 1 ┆ 2 ┆ 0 │
└────────────┴───────────┴─────┴─────┴─────┘
Вы также можете добиться результатов с помощью gather
и implode
и explode
- снова explode
поднимает голову - я бы пошел по пути join
, поскольку он более явный и ясный, чего мы пытаемся достичь:
imploded=events.select(pl.struct(pl.all()).implode())
(windows
.with_columns(imploded, indices=indices)
.select('start_time','stop_time',
pl.col.name.list.gather(pl.col.indices))
.explode('name')
.unnest('name')
.pivot(on='name',
values='time',
aggregate_function='len')
.fill_null(0)
)
shape: (4, 5)
┌────────────┬───────────┬─────┬─────┬─────┐
│ start_time ┆ stop_time ┆ b ┆ a ┆ c │
│ --- ┆ --- ┆ --- ┆ --- ┆ --- │
│ f64 ┆ f64 ┆ u32 ┆ u32 ┆ u32 │
╞════════════╪═══════════╪═════╪═════╪═════╡
│ 1.0 ┆ 3.5 ┆ 5 ┆ 4 ┆ 1 │
│ 2.0 ┆ 2.5 ┆ 2 ┆ 2 ┆ 1 │
│ 3.0 ┆ 3.7 ┆ 3 ┆ 3 ┆ 0 │
│ 4.0 ┆ 5.0 ┆ 1 ┆ 2 ┆ 0 │
└────────────┴───────────┴─────┴─────┴─────┘
В любом случае, если можете, используйте двоичный поиск для объединения диапазонов, это видно по производительности.
Один из простых способов сделать это в полярах — просто запустить
cross
присоединиться и отфильтровать события, которые находятся за пределами окна.