Как обрабатывать большие наборы данных xarray/dask, чтобы минимизировать время вычислений или нехватку памяти для хранения в виде ежегодных файлов (набор данных ERA5)

В настоящее время я использую данные ERA5 для расчета переменных, связанных с ветром. Хотя я могу вычислить то, что хочу, у меня возникают проблемы с эффективной реализацией, позволяющей переносить эти тяжелые данные реальным способом. Эти данные хранятся на «суперкомпьютере», к которому я могу получить доступ и запустить свой сценарий. Доступны несколько вариантов ЦП и памяти, но большую часть времени я использую либо 4 ЦП/18 ГБ, либо 7 ЦП/32 ГБ. Данные поступают в ежемесячные файлы, которые я открываю как годовые. Общее количество лет — 74, но я решил, что лучше подходить из года в год.

Вот мой код:

# Imports
import geopandas as gpd
from osgeo import gdal
import numpy as np
import matplotlib.pyplot as plt
import glob
import pickle
import xarray as xr
import time 

# setting up my workers (e.g. 7)
from dask.distributed import Client
client = Client(n_workers=7)
client

# Declaring variables
directory = 'PATH_TO_LOCAL_DIR'

years  = np.arange(1950,2023+1)

lon0 =  129.7
lat0 = - 27.9
lonbounds = [113.0,145.0]
latbounds = [-36.0,-16.0]

d     = 300e-6
z     = 10
g     = 9.8
z0    = 1e-2
rho_s = 2650
rho_f = 1.2
kappa = 0.4
phi = 0.6
fluxconstant      = 5
thresholdconstant = 0.082
usimths = np.linspace(0.01,0.8,80,dtype=np.float32)
r = 6371.229*1e+3 
s2yr = 1/60/60/24/365

uabinwidth = 3*np.pi/180
uabine = np.arange(-180*np.pi/180,(180+1e-4)*np.pi/180,uabinwidth)
uabinm = uabine[1:]-uabinwidth/2


# Opening the data. Currently I just grabe the first year worth of data
era5  = xr.open_mfdataset(glob.glob('u10_era5-land_oper_sfc_%d*.nc'%(years[i],years[i])))
era5  = era5.reindex(latitude=list(reversed(era5.latitude)))
era5  = era5.sel(longitude=slice(lonbounds[0],lonbounds[-1]),latitude=slice(latbounds[0],latbounds[-1]))       
era5v = xr.open_mfdataset(glob.glob('v10_era5-land_oper_sfc_%d*.nc'%(years[i],years[i])))
era5v = era5v.reindex(latitude=list(reversed(era5v.latitude)))
era5v = era5v.sel(longitude=slice(lonbounds[0],lonbounds[-1]),latitude=slice(latbounds[0],latbounds[-1]))
era5  = era5.merge(era5v)

Результат работыera5 на этом этапе:

 <xarray.Dataset>
 Dimensions:    (longitude: 321, latitude: 201, time: 8759)
 Coordinates:
   * longitude  (longitude) float32 113.0 113.1 113.2 113.3 ... 144.8 144.9 145.0
   * latitude   (latitude) float32 -36.0 -35.9 -35.8 -35.7 ... -16.2 -16.1 -16.0
   * time       (time) datetime64[ns] 1950-01-01T01:00:00 ... 1950-12-31T23:00:00
 Data variables:
     u10        (time, latitude, longitude) float32 dask.array<chunksize=(52, 100, 166), meta=np.ndarray>
     v10        (time, latitude, longitude) float32 dask.array<chunksize=(52, 100, 166), meta=np.ndarray>

Дальше я делаю:

# Calculation 
era5 = era5.assign(ua_from=np.arctan2(-era5.v10,-era5.u10))#.compute() <--- not sure if compute in between is a good idea?

for j in range(len(usimths)):
    era5 = era5.assign(temp=(((era5.u10**2+era5.v10**2)*(kappa/np.log(z/z0))**2-usimths[j]**2)*fluxconstant*usimths[j]/g*rho_f/rho_s).where(((era5.u10**2+era5.v10**2)**0.5*(kappa/np.log(z/z0)))>usimths[j],0))#.compute()
    era5['qm_{}'.format(str(round(usimths[j],2)))] = era5.temp
    era5 = era5.drop_vars('temp')

Результат работыera5:

 Output of era5:
 <xarray.Dataset>
 Dimensions:    (longitude: 321, latitude: 201, time: 8759)
 Coordinates:
   * longitude  (longitude) float32 113.0 113.1 113.2 113.3 ... 144.8 144.9 145.0
   * latitude   (latitude) float32 -36.0 -35.9 -35.8 -35.7 ... -16.2 -16.1 -16.0
   * time       (time) datetime64[ns] 1950-01-01T01:00:00 ... 1950-12-31T23:00:00
 Data variables: (12/83)
     u10        (time, latitude, longitude) float32 dask.array<chunksize=(52, 100, 166), meta=np.ndarray>
     v10        (time, latitude, longitude) float32 dask.array<chunksize=(52, 100, 166), meta=np.ndarray>
     ua_from    (time, latitude, longitude) float32 dask.array<chunksize=(52, 100, 166), meta=np.ndarray>
     qm_0.01    (time, latitude, longitude) float32 dask.array<chunksize=(52, 100, 166), meta=np.ndarray>
     qm_0.02    (time, latitude, longitude) float32 dask.array<chunksize=(52, 100, 166), meta=np.ndarray>
     qm_0.03    (time, latitude, longitude) float32 dask.array<chunksize=(52, 100, 166), meta=np.ndarray>
     ...         ...
     qm_0.75    (time, latitude, longitude) float32 dask.array<chunksize=(52, 100, 166), meta=np.ndarray>
     qm_0.76    (time, latitude, longitude) float32 dask.array<chunksize=(52, 100, 166), meta=np.ndarray>
     qm_0.77    (time, latitude, longitude) float32 dask.array<chunksize=(52, 100, 166), meta=np.ndarray>
     qm_0.78    (time, latitude, longitude) float32 dask.array<chunksize=(52, 100, 166), meta=np.ndarray>
     qm_0.79    (time, latitude, longitude) float32 dask.array<chunksize=(52, 100, 166), meta=np.ndarray>
     qm_0.8     (time, latitude, longitude) float32 dask.array<chunksize=(52, 100, 166), meta=np.ndarray>

Окончательно:

# this part takes some time and I am still unsure if I should compute or not in between
ua_from = era5.ua_from.compute().values #<-- need these for below
bin_arrays_mean = []
bin_arrays_sum = []
for ident in range(len(uabine)-1):
    era5tempid = era5.drop_vars(["u10","v10","ua_from"])
    era5tempid = era5tempid.where((ua_from >= uabine[ident]) & (ua_from < uabine[ident+1]) & (era5tempid > 0))
    bin_arrays_mean.append(era5tempid.mean(dim = "time", skipna=True))#.compute())
    bin_arrays_sum.append(era5tempid.sum(dim = "time", skipna=True))#.compute())

# merge and rearrange for desired dataset
era5ang_mean = xr.concat(bin_arrays_mean,dim = "ang_bins").assign_coords({"ang_bins":np.arange(120, dtype=np.uint32)})
era5ang_sum = xr.concat(bin_arrays_sum,dim = "ang_bins").assign_coords({"ang_bins":np.arange(120, dtype=np.uint32)})

qm_arrays_mean = []
for var in era5ang_mean:
    qm_arrays_mean.append(era5ang_mean[var].rename("qm"))

qm_arrays_sum = []
for var in era5ang_sum:
    qm_arrays_sum.append(era5ang_sum[var].rename("qm"))

era5_bins_mean = xr.concat(qm_arrays_mean, dim = "usimths").assign_coords({"usimths":usimths})
era5_bins_sum = xr.concat(qm_arrays_sum, dim = "usimths").assign_coords({"usimths":usimths})

era5_bins_comb = xr.Dataset({"qmmean":era5_bins_mean, "qmsum":era5_bins_sum})

Выходные данныеera5_bins_comb (ПРИМЕЧАНИЕ. Выходные данные ниже (вычислены) на основе описанного выше, но с выбранной ТОЛЬКО одной долготой, что заняло около часа):

<xarray.Dataset>
Dimensions:    (latitude: 201, ang_bins: 120, usimths: 80)
Coordinates:
    longitude  float32 113.0
  * latitude   (latitude) float32 -36.0 -35.9 -35.8 -35.7 ... -16.2 -16.1 -16.0
  * ang_bins   (ang_bins) uint32 0 1 2 3 4 5 6 7 ... 113 114 115 116 117 118 119
  * usimths    (usimths) float32 0.01 0.02 0.03 0.04 0.05 ... 0.77 0.78 0.79 0.8
Data variables:
    qmmean     (usimths, ang_bins, latitude) float32 nan nan nan ... nan nan nan
    qmsum      (usimths, ang_bins, latitude) float32 0.0 0.0 0.0 ... 0.0 0.0 0.0

На последнем шаге я хочу сохранить файл, используя era5_bins_comb.to_netcdf().

Боюсь, я не слишком хорошо знаком с тем, как работают xarray и dask. Конкретно, когда и когда не использовать .compute(). Попробовав пару вещей с вычислениями между вышеперечисленными, я оказался там. Часть, которая занимает больше всего времени (например, час или больше), — это цикл for ident in range(len(uabine)-1):. Остальное работает довольно быстро (наверное, потому что до этого я ничего не вычисляю). Кроме того, последняя часть сохранения набора данных для его временного хранения и объединения с остальными 73 годами позже также кажется сложной. Похоже, у меня может не хватить памяти и/или это займет вечность. Последнее, что я тестировал, — это использовать era5_bins_comb.isel(usimth=0).compute(), который, по крайней мере, завершился быстрее, но, очевидно, потребовал бы от меня перебора всех usimth и сохранения их отдельно. В качестве альтернативы у меня также есть доступ к большему количеству процессоров и памяти, например. 14 ЦП/63 ГБ (использовалось для тестирования era5_bins_comb.isel(usimth=0).compute()) или 28 ЦП/126.

Просто такое ощущение, что я мог бы улучшить свой код, пропустить некоторые циклы и/или лучше использовать вычислительную мощность. Опять же, я мало что знаю о больших данных, dask и принципах, которым следует следовать. Какие улучшения я могу внести, чтобы получать данные, не дожидаясь вечно и не тратя впустую ресурсы?

Редактировать

Просто дополнительная информация: сегодня я снова запустил свой код с 14 процессорами и засек время цикла for ident in range(len(uabine)-1):. Один цикл занимает примерно 6 минут, что составит 12 часов (12*74=888 часов для файлов за весь год!). Кроме того, поток задач на панели задач выглядит действительно неоптимизированным. Попытка запустить вложенный цикл, перебирая все 80 переменных и все 120 uabine, занимает 20 секунд для каждого вложенного цикла, поэтому 20*120*80/60/60=53 часа, что еще хуже. Возможно, мне нужно найти другой способ избежать цикла уабина, который фильтрует мои данные в зависимости от направления ветра, откуда они дуют.

Общее замечание: ваши вычисления, похоже, требуют как параллельных вычислений, так и эффективного использования памяти. Python не очень хорош для параллельных вычислений. Он имеет тенденцию либо потреблять значительно больше памяти, либо плохо использовать ядра ЦП (почти никогда и то, и другое, если только вы не делаете что-то вручную, используя инструмент компиляции, конвертирующий Python в собственный код с некоторыми ограничениями). Следует иметь в виду, что Python, как правило, хорош для прототипирования, но не для быстрого/эффективного использования памяти производственного кода. Если вам нужно что-то более эффективное, то вам подойдут языки низкого уровня и инструменты компиляции (хотя и более сложные).

Jérôme Richard 03.08.2024 17:57

Спасибо за ваш @JérômeRichard. Возможно, мне стоит в какой-то момент заняться низкоуровневыми языками, так как мне, возможно, придется работать с такими данными и в будущем. Но сейчас мне нужно заставить это работать с помощью Python.

Dominik N. 04.08.2024 05:21

Я не знаком с вашими тут алгоритмами, но раз в году подводятся итоги, разве это не делается? Могут ли ваши новые расчеты объединить анализ существующих годовых данных (которые все еще меняются до конца года) с прошлыми годами, которые выражены в виде сводки (предварительно рассчитанной)?

halfer 04.08.2024 19:01

Привет @halfer, если я правильно тебя понял, да. Я пытаюсь сделать вышеописанное в течение одного года, и как только этот год закончится, я могу просто сохранить файл, вычислить следующий год и, в конце концов, суммировать или иметь в виду эти годы. Моя проблема заключается в том, что расчет одного года занимает много времени и, вероятно, его можно ускорить.

Dominik N. 05.08.2024 01:11

Шаблонный совет (для вопросов и ответов): Обратите внимание, что мы предпочитаем здесь технический стиль письма. Мы мягко не поощряем приветствия, «надеюсь, что вы сможете помочь», «спасибо», «заранее спасибо», благодарственные письма, приветы, добрые пожелания, подписи, «пожалуйста, можете ли вы помочь», материалы для болтовни и сокращенный txtspk, просьбы, как долго вы застрял, советы по голосованию, метакомментарии и т. д. Просто объясните свою проблему и покажите, что вы пробовали, чего ожидали и что на самом деле произошло.

halfer 06.08.2024 21:01
Структурированный массив Numpy
Структурированный массив Numpy
Однако в реальных проектах я чаще всего имею дело со списками, состоящими из нескольких типов данных. Как мы можем использовать массивы numpy, чтобы...
2
5
84
2
Перейти к ответу Данный вопрос помечен как решенный

Ответы 2

Прежде всего, вы определенно используете правильные инструменты — xarray и dask созданы для подобных задач!

Теперь несколько комментариев:

era5  = xr.open_mfdataset(glob.glob('u10_era5-land_oper_sfc_%d*.nc'%(years[i],years[i])))

Используя open_mfdataset для своевременного открытия последовательности файлов, попробуйте использовать data_vars='minimal', coords='minimal', compat='override', чтобы ускорить процесс и избежать ненужных проверок.

Вы можете использовать xarray groupby_bins для группировки данных по направлению ветра, например:

import xarray as xr
import numpy as np
import pandas as pd

# Assuming you have a DataArray with wind direction data
# Replace this with your actual data loading
da = xr.DataArray(
    np.random.uniform(0, 360, size=(1000,)),
    dims=["time"],
    coords = {"time": pd.date_range("2024-01-01", periods=1000, freq = "h")},
)

ds = da.to_dataset(name='wind_direction')

x_bins = np.linspace(0,360,37)

dsg = ds.groupby_bins(group='wind_direction', bins=x_bins).mean(dim='time')

что приводит к:

Метод groupby_bins будет автоматически распараллелен Dask с использованием рабочих в вашем Клиенте. В приведенном выше коде я беру среднее значение групп, но вы, конечно, можете делать с ними все, что захотите.

Вы можете поэкспериментировать с более крупными размерами блоков, поскольку существующие размеры блоков довольно малы: (time, latitude, longitude)[52, 100, 166], что приводит к множеству мелких задач. Возможно, попробуйте ds = ds.chunk(chunks = {'time':52, 'latitude':-1, 'longitude':-1}) заставить каждую задачу загружать всю пространственную область.

Если вы работаете в системе HPC, воспользуйтесь Dask-Jobqueue, которая позволяет вам масштабироваться за пределы одной машины с помощью Slurm, PBS и т. д.

Привет @richsignell, большое спасибо за твои предложения. Я не знал ни об аргументах open_mfdataset, ни о функции groupby_bins! Я поиграю с ними, так как они могут пригодиться. Как вы думаете, было бы лучше разбить ds на части, чтобы время было равно -1, поскольку я хочу иметь в виду это измерение, или вы считаете, что загрузка всей пространственной области имеет больше смысла? К счастью, мой руководитель нашел хорошее решение моей проблемы, которое я опубликую здесь, но мне любопытно, смогут ли некоторые из ваших предложений еще больше улучшить этот код.

Dominik N. 06.08.2024 07:57
Ответ принят как подходящий

Мой руководитель нашел эффективное решение проблемы, которое рассчитало все данные за все годы всего за 13 часов! Его решение было:

hourcount = 0
from xhistogram.xarray import histogram
for i in range(len(years)):
     for j in range(len(months)):
            # get u
            era5  = xr.open_mfdataset(glob.glob('%d/u10_era5-land_oper_sfc_%d%02d*.nc'%(years[i],years[i],months[j]))[0])
            era5  = era5.reindex(latitude=list(reversed(era5.latitude)))
            era5  = era5.sel(longitude=slice(lonbounds[0],lonbounds[-1]),latitude=slice(latbounds[0],latbounds[-1]))
            
            # get v
            era5v = xr.open_mfdataset(glob.glob('%d/v10_era5-land_oper_sfc_%d%02d*.nc'%(years[i],years[i],months[j]))[0])
            era5v = era5v.reindex(latitude=list(reversed(era5v.latitude)))
            era5v = era5v.sel(longitude=slice(lonbounds[0],lonbounds[-1]),latitude=slice(latbounds[0],latbounds[-1]))
            
            # merge u and v
            era5  = era5.merge(era5v)
            
            era5 = era5.assign(ua=np.arctan2(-era5.v10,-era5.u10)) # from ONLY
            era5 = era5.assign(us=((era5.u10**2+era5.v10**2)**0.5*kappa/np.log(z/z0))) # make us
            era5 = era5.drop_vars(["u10","v10"]) # drop uv
            
            era5['us'] = era5.us.expand_dims({'usithr':usimths},axis=3) # expand us over usimthr dim
            
            era5 = era5.chunk({'longitude':64,'latitude':40,'time':len(era5.time),'usithr':1})
            
            era5['us'] = ((era5.us**2-era5.usithr**2)*era5.usithr*fluxk).where(era5.us>era5.usithr,0)
            era5 = era5.rename_vars({'us':'qm'})
            
            temp = histogram(era5.ua,bins=uabine,weights=era5.qm,dim=['time']).compute()/len(era5.time)
            
            if hourcount==0:
                final = temp
            else:
                final = (hourcount*final+len(era5.time)*temp)/(hourcount+len(era5.time))
    
            hourcount += len(era5.time)
            
            print(hourcount,end='\r')
    
        if i == 0:
            final.to_netcdf(directory+"flux_bins_%d.nc"%(years[i]))
        if i % 5 == 0:
            final.to_netcdf(directory+"flux_bins_%d.nc"%(years[i]))
            
    final.to_netcdf(directory+"flux_bins_all.nc")

Он сделал:

  • сосредоточьтесь на ежемесячных файлах, а не на ежегодных
  • расширить ds с помощью usithr (вместо того, чтобы перебирать их)
  • разделение данных на части
  • используя функцию histogram из xhistogram.xarray

Возможно, его можно было бы еще улучшить, например. разделение или реализация ds.groupby_bins и аргументов для open_mfdataset, как предложено @RichSignell.

Другие вопросы по теме