В настоящее время я использую данные ERA5 для расчета переменных, связанных с ветром. Хотя я могу вычислить то, что хочу, у меня возникают проблемы с эффективной реализацией, позволяющей переносить эти тяжелые данные реальным способом. Эти данные хранятся на «суперкомпьютере», к которому я могу получить доступ и запустить свой сценарий. Доступны несколько вариантов ЦП и памяти, но большую часть времени я использую либо 4 ЦП/18 ГБ, либо 7 ЦП/32 ГБ. Данные поступают в ежемесячные файлы, которые я открываю как годовые. Общее количество лет — 74, но я решил, что лучше подходить из года в год.
Вот мой код:
# Imports
import geopandas as gpd
from osgeo import gdal
import numpy as np
import matplotlib.pyplot as plt
import glob
import pickle
import xarray as xr
import time
# setting up my workers (e.g. 7)
from dask.distributed import Client
client = Client(n_workers=7)
client
# Declaring variables
directory = 'PATH_TO_LOCAL_DIR'
years = np.arange(1950,2023+1)
lon0 = 129.7
lat0 = - 27.9
lonbounds = [113.0,145.0]
latbounds = [-36.0,-16.0]
d = 300e-6
z = 10
g = 9.8
z0 = 1e-2
rho_s = 2650
rho_f = 1.2
kappa = 0.4
phi = 0.6
fluxconstant = 5
thresholdconstant = 0.082
usimths = np.linspace(0.01,0.8,80,dtype=np.float32)
r = 6371.229*1e+3
s2yr = 1/60/60/24/365
uabinwidth = 3*np.pi/180
uabine = np.arange(-180*np.pi/180,(180+1e-4)*np.pi/180,uabinwidth)
uabinm = uabine[1:]-uabinwidth/2
# Opening the data. Currently I just grabe the first year worth of data
era5 = xr.open_mfdataset(glob.glob('u10_era5-land_oper_sfc_%d*.nc'%(years[i],years[i])))
era5 = era5.reindex(latitude=list(reversed(era5.latitude)))
era5 = era5.sel(longitude=slice(lonbounds[0],lonbounds[-1]),latitude=slice(latbounds[0],latbounds[-1]))
era5v = xr.open_mfdataset(glob.glob('v10_era5-land_oper_sfc_%d*.nc'%(years[i],years[i])))
era5v = era5v.reindex(latitude=list(reversed(era5v.latitude)))
era5v = era5v.sel(longitude=slice(lonbounds[0],lonbounds[-1]),latitude=slice(latbounds[0],latbounds[-1]))
era5 = era5.merge(era5v)
Результат работыera5 на этом этапе:
<xarray.Dataset>
Dimensions: (longitude: 321, latitude: 201, time: 8759)
Coordinates:
* longitude (longitude) float32 113.0 113.1 113.2 113.3 ... 144.8 144.9 145.0
* latitude (latitude) float32 -36.0 -35.9 -35.8 -35.7 ... -16.2 -16.1 -16.0
* time (time) datetime64[ns] 1950-01-01T01:00:00 ... 1950-12-31T23:00:00
Data variables:
u10 (time, latitude, longitude) float32 dask.array<chunksize=(52, 100, 166), meta=np.ndarray>
v10 (time, latitude, longitude) float32 dask.array<chunksize=(52, 100, 166), meta=np.ndarray>
Дальше я делаю:
# Calculation
era5 = era5.assign(ua_from=np.arctan2(-era5.v10,-era5.u10))#.compute() <--- not sure if compute in between is a good idea?
for j in range(len(usimths)):
era5 = era5.assign(temp=(((era5.u10**2+era5.v10**2)*(kappa/np.log(z/z0))**2-usimths[j]**2)*fluxconstant*usimths[j]/g*rho_f/rho_s).where(((era5.u10**2+era5.v10**2)**0.5*(kappa/np.log(z/z0)))>usimths[j],0))#.compute()
era5['qm_{}'.format(str(round(usimths[j],2)))] = era5.temp
era5 = era5.drop_vars('temp')
Результат работыera5:
Output of era5:
<xarray.Dataset>
Dimensions: (longitude: 321, latitude: 201, time: 8759)
Coordinates:
* longitude (longitude) float32 113.0 113.1 113.2 113.3 ... 144.8 144.9 145.0
* latitude (latitude) float32 -36.0 -35.9 -35.8 -35.7 ... -16.2 -16.1 -16.0
* time (time) datetime64[ns] 1950-01-01T01:00:00 ... 1950-12-31T23:00:00
Data variables: (12/83)
u10 (time, latitude, longitude) float32 dask.array<chunksize=(52, 100, 166), meta=np.ndarray>
v10 (time, latitude, longitude) float32 dask.array<chunksize=(52, 100, 166), meta=np.ndarray>
ua_from (time, latitude, longitude) float32 dask.array<chunksize=(52, 100, 166), meta=np.ndarray>
qm_0.01 (time, latitude, longitude) float32 dask.array<chunksize=(52, 100, 166), meta=np.ndarray>
qm_0.02 (time, latitude, longitude) float32 dask.array<chunksize=(52, 100, 166), meta=np.ndarray>
qm_0.03 (time, latitude, longitude) float32 dask.array<chunksize=(52, 100, 166), meta=np.ndarray>
... ...
qm_0.75 (time, latitude, longitude) float32 dask.array<chunksize=(52, 100, 166), meta=np.ndarray>
qm_0.76 (time, latitude, longitude) float32 dask.array<chunksize=(52, 100, 166), meta=np.ndarray>
qm_0.77 (time, latitude, longitude) float32 dask.array<chunksize=(52, 100, 166), meta=np.ndarray>
qm_0.78 (time, latitude, longitude) float32 dask.array<chunksize=(52, 100, 166), meta=np.ndarray>
qm_0.79 (time, latitude, longitude) float32 dask.array<chunksize=(52, 100, 166), meta=np.ndarray>
qm_0.8 (time, latitude, longitude) float32 dask.array<chunksize=(52, 100, 166), meta=np.ndarray>
Окончательно:
# this part takes some time and I am still unsure if I should compute or not in between
ua_from = era5.ua_from.compute().values #<-- need these for below
bin_arrays_mean = []
bin_arrays_sum = []
for ident in range(len(uabine)-1):
era5tempid = era5.drop_vars(["u10","v10","ua_from"])
era5tempid = era5tempid.where((ua_from >= uabine[ident]) & (ua_from < uabine[ident+1]) & (era5tempid > 0))
bin_arrays_mean.append(era5tempid.mean(dim = "time", skipna=True))#.compute())
bin_arrays_sum.append(era5tempid.sum(dim = "time", skipna=True))#.compute())
# merge and rearrange for desired dataset
era5ang_mean = xr.concat(bin_arrays_mean,dim = "ang_bins").assign_coords({"ang_bins":np.arange(120, dtype=np.uint32)})
era5ang_sum = xr.concat(bin_arrays_sum,dim = "ang_bins").assign_coords({"ang_bins":np.arange(120, dtype=np.uint32)})
qm_arrays_mean = []
for var in era5ang_mean:
qm_arrays_mean.append(era5ang_mean[var].rename("qm"))
qm_arrays_sum = []
for var in era5ang_sum:
qm_arrays_sum.append(era5ang_sum[var].rename("qm"))
era5_bins_mean = xr.concat(qm_arrays_mean, dim = "usimths").assign_coords({"usimths":usimths})
era5_bins_sum = xr.concat(qm_arrays_sum, dim = "usimths").assign_coords({"usimths":usimths})
era5_bins_comb = xr.Dataset({"qmmean":era5_bins_mean, "qmsum":era5_bins_sum})
Выходные данныеera5_bins_comb (ПРИМЕЧАНИЕ. Выходные данные ниже (вычислены) на основе описанного выше, но с выбранной ТОЛЬКО одной долготой, что заняло около часа):
<xarray.Dataset>
Dimensions: (latitude: 201, ang_bins: 120, usimths: 80)
Coordinates:
longitude float32 113.0
* latitude (latitude) float32 -36.0 -35.9 -35.8 -35.7 ... -16.2 -16.1 -16.0
* ang_bins (ang_bins) uint32 0 1 2 3 4 5 6 7 ... 113 114 115 116 117 118 119
* usimths (usimths) float32 0.01 0.02 0.03 0.04 0.05 ... 0.77 0.78 0.79 0.8
Data variables:
qmmean (usimths, ang_bins, latitude) float32 nan nan nan ... nan nan nan
qmsum (usimths, ang_bins, latitude) float32 0.0 0.0 0.0 ... 0.0 0.0 0.0
На последнем шаге я хочу сохранить файл, используя era5_bins_comb.to_netcdf()
.
Боюсь, я не слишком хорошо знаком с тем, как работают xarray и dask. Конкретно, когда и когда не использовать .compute()
. Попробовав пару вещей с вычислениями между вышеперечисленными, я оказался там. Часть, которая занимает больше всего времени (например, час или больше), — это цикл for ident in range(len(uabine)-1):
. Остальное работает довольно быстро (наверное, потому что до этого я ничего не вычисляю). Кроме того, последняя часть сохранения набора данных для его временного хранения и объединения с остальными 73 годами позже также кажется сложной. Похоже, у меня может не хватить памяти и/или это займет вечность. Последнее, что я тестировал, — это использовать era5_bins_comb.isel(usimth=0).compute()
, который, по крайней мере, завершился быстрее, но, очевидно, потребовал бы от меня перебора всех usimth и сохранения их отдельно. В качестве альтернативы у меня также есть доступ к большему количеству процессоров и памяти, например. 14 ЦП/63 ГБ (использовалось для тестирования era5_bins_comb.isel(usimth=0).compute()
) или 28 ЦП/126.
Просто такое ощущение, что я мог бы улучшить свой код, пропустить некоторые циклы и/или лучше использовать вычислительную мощность. Опять же, я мало что знаю о больших данных, dask и принципах, которым следует следовать. Какие улучшения я могу внести, чтобы получать данные, не дожидаясь вечно и не тратя впустую ресурсы?
Просто дополнительная информация: сегодня я снова запустил свой код с 14 процессорами и засек время цикла for ident in range(len(uabine)-1):
. Один цикл занимает примерно 6 минут, что составит 12 часов (12*74=888 часов для файлов за весь год!). Кроме того, поток задач на панели задач выглядит действительно неоптимизированным. Попытка запустить вложенный цикл, перебирая все 80 переменных и все 120 uabine, занимает 20 секунд для каждого вложенного цикла, поэтому 20*120*80/60/60=53 часа, что еще хуже. Возможно, мне нужно найти другой способ избежать цикла уабина, который фильтрует мои данные в зависимости от направления ветра, откуда они дуют.
Спасибо за ваш @JérômeRichard. Возможно, мне стоит в какой-то момент заняться низкоуровневыми языками, так как мне, возможно, придется работать с такими данными и в будущем. Но сейчас мне нужно заставить это работать с помощью Python.
Я не знаком с вашими тут алгоритмами, но раз в году подводятся итоги, разве это не делается? Могут ли ваши новые расчеты объединить анализ существующих годовых данных (которые все еще меняются до конца года) с прошлыми годами, которые выражены в виде сводки (предварительно рассчитанной)?
Привет @halfer, если я правильно тебя понял, да. Я пытаюсь сделать вышеописанное в течение одного года, и как только этот год закончится, я могу просто сохранить файл, вычислить следующий год и, в конце концов, суммировать или иметь в виду эти годы. Моя проблема заключается в том, что расчет одного года занимает много времени и, вероятно, его можно ускорить.
Шаблонный совет (для вопросов и ответов): Обратите внимание, что мы предпочитаем здесь технический стиль письма. Мы мягко не поощряем приветствия, «надеюсь, что вы сможете помочь», «спасибо», «заранее спасибо», благодарственные письма, приветы, добрые пожелания, подписи, «пожалуйста, можете ли вы помочь», материалы для болтовни и сокращенный txtspk, просьбы, как долго вы застрял, советы по голосованию, метакомментарии и т. д. Просто объясните свою проблему и покажите, что вы пробовали, чего ожидали и что на самом деле произошло.
Прежде всего, вы определенно используете правильные инструменты — xarray и dask созданы для подобных задач!
Теперь несколько комментариев:
era5 = xr.open_mfdataset(glob.glob('u10_era5-land_oper_sfc_%d*.nc'%(years[i],years[i])))
Используя open_mfdataset
для своевременного открытия последовательности файлов, попробуйте использовать data_vars='minimal', coords='minimal', compat='override'
, чтобы ускорить процесс и избежать ненужных проверок.
Вы можете использовать xarray groupby_bins
для группировки данных по направлению ветра, например:
import xarray as xr
import numpy as np
import pandas as pd
# Assuming you have a DataArray with wind direction data
# Replace this with your actual data loading
da = xr.DataArray(
np.random.uniform(0, 360, size=(1000,)),
dims=["time"],
coords = {"time": pd.date_range("2024-01-01", periods=1000, freq = "h")},
)
ds = da.to_dataset(name='wind_direction')
x_bins = np.linspace(0,360,37)
dsg = ds.groupby_bins(group='wind_direction', bins=x_bins).mean(dim='time')
что приводит к:
Метод groupby_bins
будет автоматически распараллелен Dask с использованием рабочих в вашем Клиенте. В приведенном выше коде я беру среднее значение групп, но вы, конечно, можете делать с ними все, что захотите.
Вы можете поэкспериментировать с более крупными размерами блоков, поскольку существующие размеры блоков довольно малы: (time, latitude, longitude)[52, 100, 166]
, что приводит к множеству мелких задач. Возможно, попробуйте ds = ds.chunk(chunks = {'time':52, 'latitude':-1, 'longitude':-1})
заставить каждую задачу загружать всю пространственную область.
Если вы работаете в системе HPC, воспользуйтесь Dask-Jobqueue, которая позволяет вам масштабироваться за пределы одной машины с помощью Slurm, PBS и т. д.
Привет @richsignell, большое спасибо за твои предложения. Я не знал ни об аргументах open_mfdataset
, ни о функции groupby_bins! Я поиграю с ними, так как они могут пригодиться. Как вы думаете, было бы лучше разбить ds на части, чтобы время было равно -1, поскольку я хочу иметь в виду это измерение, или вы считаете, что загрузка всей пространственной области имеет больше смысла? К счастью, мой руководитель нашел хорошее решение моей проблемы, которое я опубликую здесь, но мне любопытно, смогут ли некоторые из ваших предложений еще больше улучшить этот код.
Мой руководитель нашел эффективное решение проблемы, которое рассчитало все данные за все годы всего за 13 часов! Его решение было:
hourcount = 0
from xhistogram.xarray import histogram
for i in range(len(years)):
for j in range(len(months)):
# get u
era5 = xr.open_mfdataset(glob.glob('%d/u10_era5-land_oper_sfc_%d%02d*.nc'%(years[i],years[i],months[j]))[0])
era5 = era5.reindex(latitude=list(reversed(era5.latitude)))
era5 = era5.sel(longitude=slice(lonbounds[0],lonbounds[-1]),latitude=slice(latbounds[0],latbounds[-1]))
# get v
era5v = xr.open_mfdataset(glob.glob('%d/v10_era5-land_oper_sfc_%d%02d*.nc'%(years[i],years[i],months[j]))[0])
era5v = era5v.reindex(latitude=list(reversed(era5v.latitude)))
era5v = era5v.sel(longitude=slice(lonbounds[0],lonbounds[-1]),latitude=slice(latbounds[0],latbounds[-1]))
# merge u and v
era5 = era5.merge(era5v)
era5 = era5.assign(ua=np.arctan2(-era5.v10,-era5.u10)) # from ONLY
era5 = era5.assign(us=((era5.u10**2+era5.v10**2)**0.5*kappa/np.log(z/z0))) # make us
era5 = era5.drop_vars(["u10","v10"]) # drop uv
era5['us'] = era5.us.expand_dims({'usithr':usimths},axis=3) # expand us over usimthr dim
era5 = era5.chunk({'longitude':64,'latitude':40,'time':len(era5.time),'usithr':1})
era5['us'] = ((era5.us**2-era5.usithr**2)*era5.usithr*fluxk).where(era5.us>era5.usithr,0)
era5 = era5.rename_vars({'us':'qm'})
temp = histogram(era5.ua,bins=uabine,weights=era5.qm,dim=['time']).compute()/len(era5.time)
if hourcount==0:
final = temp
else:
final = (hourcount*final+len(era5.time)*temp)/(hourcount+len(era5.time))
hourcount += len(era5.time)
print(hourcount,end='\r')
if i == 0:
final.to_netcdf(directory+"flux_bins_%d.nc"%(years[i]))
if i % 5 == 0:
final.to_netcdf(directory+"flux_bins_%d.nc"%(years[i]))
final.to_netcdf(directory+"flux_bins_all.nc")
Он сделал:
histogram
из xhistogram.xarray
Возможно, его можно было бы еще улучшить, например. разделение или реализация ds.groupby_bins
и аргументов для open_mfdataset
, как предложено @RichSignell.
Общее замечание: ваши вычисления, похоже, требуют как параллельных вычислений, так и эффективного использования памяти. Python не очень хорош для параллельных вычислений. Он имеет тенденцию либо потреблять значительно больше памяти, либо плохо использовать ядра ЦП (почти никогда и то, и другое, если только вы не делаете что-то вручную, используя инструмент компиляции, конвертирующий Python в собственный код с некоторыми ограничениями). Следует иметь в виду, что Python, как правило, хорош для прототипирования, но не для быстрого/эффективного использования памяти производственного кода. Если вам нужно что-то более эффективное, то вам подойдут языки низкого уровня и инструменты компиляции (хотя и более сложные).