У меня есть приложение, в котором мне нужно работать с очень большим объемом данных. Я читал, что поляры должны иметь возможность работать с наборами данных, превышающими доступную память, поэтому я, должно быть, делаю что-то не так...
В моем сценарии приложение работает в контейнере Kubernetes, где я настроил его на использование 12 ГБ памяти. Я пытаюсь работать с данными из CSV с 200 миллионами записей, примерно ~9 ГБ на диске.
Основной формат этого CSV следующий:
id,aligned_value,date_reference,model,predict,segment
00000000001,564,20240520,XPTO,,3
00000000002,741,20240520,XPTO,,6
00000000003,503,20240520,XPTO,,5
00000000004,200,20240520,XPTO,,0
Я пытаюсь запустить простую и простую агрегацию для подсчета уникальных значений этого набора данных, сгруппированных по двум полям («aligned_value», которое варьируется от 0 до 1000, и сегменту, который идет от 0 до 6). Но когда я запускаю следующий код, потребление памяти увеличивается и увеличивается, пока контейнер просто не будет уничтожен.
def get_summarized_base_df(self, filepath: str) -> pl.DataFrame:
"""
Summarizes the base on a dataframe grouped by all fields included
on this report's setup
"""
# This part will return a list of fields, which for this scenario should be just ["aligned_value", "segment"]
required_fields = self.list_all_required_fields()
base_lf = pl.scan_csv(filepath)
summarized_base_df = base_lf.group_by(required_fields).agg(pl.count()).collect()
return summarized_base_df
Есть ли параметры, которые я мог бы использовать для уменьшения использования памяти? Я неправильно использую фреймворк?
Для справки, я пытался ограничить использование памяти поляров, установив переменную среды «POLARS_MAX_MEMORY_MIB», но, похоже, это не имело никакого значения.
Некоторая дополнительная информация:
Версия Python: 3.10.11 Версия Поларса: 0.20.18
Для точек стиля вы можете использовать .len()
вместо .agg(pl.count())
, но реальная проблема заключается в том, что потоковая обработка наборов данных (которая необходима для объемов, превышающих объем памяти) во многих местах все еще находится в стадии эксперимента, поэтому она включена. Вы можете принять участие, передав streaming=True
, чтобы получить:
base_lf.group_by(required_fields).len().collect(streaming=True)
API потоковой передачи отлично работал в моем сценарии. Большое спасибо. А ещё я поменял с "count" на "len", спасибо за совет!