Я могу указать среднее и наиболее частое значение, используя dask-ml, вот так, это работает нормально:
mean_imputer = impute.SimpleImputer(strategy='mean')
most_frequent_imputer = impute.SimpleImputer(strategy='most_frequent')
data = [[100, 2, 5], [np.nan, np.nan, np.nan], [70, 7, 5]]
df = pd.DataFrame(data, columns = ['Weight', 'Age', 'Height'])
df.iloc[:, [0,1]] = mean_imputer.fit_transform(df.iloc[:,[0,1]])
df.iloc[:, [2]] = most_frequent_imputer.fit_transform(df.iloc[:,[2]])
print(df)
Weight Age Height
0 100.0 2.0 5.0
1 85.0 4.5 5.0
2 70.0 7.0 5.0
Но что, если у меня есть 100 миллионов строк данных, кажется, что dask сделает два цикла, когда он мог бы сделать только один, возможно ли запустить оба импутера одновременно и/или параллельно, а не последовательно? Каким будет пример кода для достижения этого?
В моем примере most_frequent_imputer.fit_transform
выполняется после mean_imputer.fit_transform
, мне нужно, чтобы они выполнялись параллельно
как я уже сказал: сначала вы должны проверить, не использует ли most_frequent_imputer
уже параллельный автоматически, и запуск другого mean_imputer
в то же время может не помочь.
Кстати: найдено с помощью Google: Dask: распараллелить код с помощью dask.delayed
Вы можете использовать dask.delayed , как это предлагается в документации, и Dask Tutorial для распараллеливания вычислений, если сущности независимы друг от друга.
Ваш код будет выглядеть так:
from dask.distributed import Client
client = Client(n_workers=4)
from dask import delayed
import numpy as np
import pandas as pd
from dask_ml import impute
mean_imputer = impute.SimpleImputer(strategy='mean')
most_frequent_imputer = impute.SimpleImputer(strategy='most_frequent')
def fit_transform_mi(d):
return mean_imputer.fit_transform(d)
def fit_transform_mfi(d):
return most_frequent_imputer.fit_transform(d)
def setdf(a,b,df):
df.iloc[:, [0,1]]=a
df.iloc[:, [2]]=b
return df
data = [[100, 2, 5], [np.nan, np.nan, np.nan], [70, 7, 5]]
df = pd.DataFrame(data, columns = ['Weight', 'Age', 'Height'])
a = delayed(fit_transform_mi)(df.iloc[:,[0,1]])
b = delayed(fit_transform_mfi)(df.iloc[:,[2]])
c = delayed(setdf)(a,b,df)
df= c.compute()
print(df)
client.close()
Объект c — это ленивый объект Delayed. Этот объект содержит все, что нам нужно для вычисления конечного результата, включая ссылки на все необходимые функции, их входные данные и взаимосвязь друг с другом.
Dask полезен для ускорения вычислений за счет параллельной обработки и когда данные не помещаются в памяти. В приведенном ниже примере 300 млн строк данных, содержащихся в десяти файлах, импутированы с использованием Dask. График процесса показывает, что: 1. Средний и наиболее частый импутеры выполняются параллельно; 2. Все десять файлов также обрабатываются параллельно.
Чтобы подготовить большой объем данных, три строки данных в вашем вопросе реплицируются, чтобы сформировать фрейм данных с 30 миллионами строк. Фрейм данных сохраняется в десяти разных файлах, что дает в общей сложности 300 миллионов строк с той же статистикой, что и в вашем вопросе.
import numpy as np
import pandas as pd
N = 10000000
weight = np.array([100, np.nan, 70]*N)
age = np.array([2, np.nan, 7]*N)
height = np.array([5, np.nan, 5]*N)
df = pd.DataFrame({'Weight': weight, 'Age': age, 'Height': height})
# Save ten large data frames to disk
for i in range(10):
df.to_parquet(f'./df_to_impute_{i}.parquet', compression='gzip',
index=False)
import graphviz
import dask
import dask.dataframe as dd
from dask_ml.impute import SimpleImputer
# Read all files for imputation in a dask data frame from a specific directory
df = dd.read_parquet('./df_to_impute_*.parquet')
# Set up the imputers and columns
mean_imputer = SimpleImputer(strategy='mean')
mostfreq_imputer = SimpleImputer(strategy='most_frequent')
imputers = [mean_imputer, mostfreq_imputer]
mean_cols = ['Weight', 'Age']
freq_cols = ['Height']
columns = [mean_cols, freq_cols]
# Create a new data frame with imputed values, then visualize the computation.
df_list = []
for imputer, col in zip(imputers, columns):
df_list.append(imputer.fit_transform(df.loc[:, col]))
imputed_df = dd.concat(df_list, axis=1)
imputed_df.visualize(filename='imputed.svg', rankdir='LR')
# Save the new data frame to disk
imputed_df.to_parquet('imputed_df.parquet', compression='gzip')
imputed_df.head()
Weight Age Height
0 100.0 2.0 5.0
1 85.0 4.5 5.0
2 70.0 7.0 5.0
3 100.0 2.0 5.0
4 85.0 4.5 5.0
# Check the summary statistics make sense - 300M rows and stats as expected
imputed_df.describe().compute()
Weight Age Height
count 3.000000e+08 3.000000e+08 300000000.0
mean 8.500000e+01 4.500000e+00 5.0
std 1.224745e+01 2.041241e+00 0.0
min 7.000000e+01 2.000000e+00 5.0
25% 7.000000e+01 2.000000e+00 5.0
50% 8.500000e+01 4.500000e+00 5.0
75% 1.000000e+02 7.000000e+00 5.0
max 1.000000e+02 7.000000e+00 5.0
Отличное объяснение, но я могу выбрать только один ответ
Масштабируется ли он до 100 миллионов строк?
Разве dask не должен сделать это прозрачным?
Не обязательно.
Delayed обеспечивает параллельные вычисления. Однако любой код Dask может заблокировать вашу память при вызове метода .compute(). То, что работает с крошечным фреймом данных, может не работать с 300 миллионами строк. Поэтому стоит проверить, работает ли решение Авинаша на больших данных (300 Мб).
Я всегда думал, что он уже запускает функции параллельно - и вам не нужно делать это вручную. Вы бы как-то тестировали время, используемое функциями. Возможно, он уже запускается первым
SimpleImputer
в параллельных процессах — он разбивает данные на более мелкие части и запускается затем в отдельных процессах (и может использовать всю мощность ЦП) — и добавление второгоSimpleImputer
может не работать быстрее.