Запуск двух импьютеров dask-ml одновременно, а не последовательно

Я могу указать среднее и наиболее частое значение, используя dask-ml, вот так, это работает нормально:

mean_imputer = impute.SimpleImputer(strategy='mean')
most_frequent_imputer = impute.SimpleImputer(strategy='most_frequent')
data = [[100, 2, 5], [np.nan, np.nan, np.nan], [70, 7, 5]]
df = pd.DataFrame(data, columns = ['Weight', 'Age', 'Height']) 
df.iloc[:, [0,1]] = mean_imputer.fit_transform(df.iloc[:,[0,1]])
df.iloc[:, [2]] = most_frequent_imputer.fit_transform(df.iloc[:,[2]])
print(df)


    Weight  Age   Height
0   100.0   2.0   5.0
1   85.0    4.5   5.0
2   70.0    7.0   5.0

Но что, если у меня есть 100 миллионов строк данных, кажется, что dask сделает два цикла, когда он мог бы сделать только один, возможно ли запустить оба импутера одновременно и/или параллельно, а не последовательно? Каким будет пример кода для достижения этого?

Я всегда думал, что он уже запускает функции параллельно - и вам не нужно делать это вручную. Вы бы как-то тестировали время, используемое функциями. Возможно, он уже запускается первым SimpleImputer в параллельных процессах — он разбивает данные на более мелкие части и запускается затем в отдельных процессах (и может использовать всю мощность ЦП) — и добавление второго SimpleImputer может не работать быстрее.

furas 22.12.2020 19:10

В моем примере most_frequent_imputer.fit_transform выполняется после mean_imputer.fit_transform, мне нужно, чтобы они выполнялись параллельно

ps0604 22.12.2020 19:23

как я уже сказал: сначала вы должны проверить, не использует ли most_frequent_imputer уже параллельный автоматически, и запуск другого mean_imputer в то же время может не помочь.

furas 22.12.2020 19:25

Кстати: найдено с помощью Google: Dask: распараллелить код с помощью dask.delayed

furas 22.12.2020 19:29
Почему в Python есть оператор "pass"?
Почему в Python есть оператор "pass"?
Оператор pass в Python - это простая концепция, которую могут быстро освоить даже новички без опыта программирования.
Некоторые методы, о которых вы не знали, что они существуют в Python
Некоторые методы, о которых вы не знали, что они существуют в Python
Python - самый известный и самый простой в изучении язык в наши дни. Имея широкий спектр применения в области машинного обучения, Data Science,...
Основы Python Часть I
Основы Python Часть I
Вы когда-нибудь задумывались, почему в программах на Python вы видите приведенный ниже код?
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
Алиса и Боб имеют неориентированный граф из n узлов и трех типов ребер:
Оптимизация кода с помощью тернарного оператора Python
Оптимизация кода с помощью тернарного оператора Python
И последнее, что мы хотели бы показать вам, прежде чем двигаться дальше, это
Советы по эффективной веб-разработке с помощью Python
Советы по эффективной веб-разработке с помощью Python
Как веб-разработчик, Python может стать мощным инструментом для создания эффективных и масштабируемых веб-приложений.
6
4
359
2
Перейти к ответу Данный вопрос помечен как решенный

Ответы 2

Ответ принят как подходящий

Вы можете использовать dask.delayed , как это предлагается в документации, и Dask Tutorial для распараллеливания вычислений, если сущности независимы друг от друга.

Ваш код будет выглядеть так:

from dask.distributed import Client

client = Client(n_workers=4)

from dask import delayed
import numpy as np
import pandas as pd
from dask_ml import impute

mean_imputer = impute.SimpleImputer(strategy='mean')
most_frequent_imputer = impute.SimpleImputer(strategy='most_frequent')

def fit_transform_mi(d):
    return mean_imputer.fit_transform(d)
def fit_transform_mfi(d):
    return most_frequent_imputer.fit_transform(d)
def setdf(a,b,df):
    df.iloc[:, [0,1]]=a
    df.iloc[:, [2]]=b
    return df

data = [[100, 2, 5], [np.nan, np.nan, np.nan], [70, 7, 5]]
df = pd.DataFrame(data, columns = ['Weight', 'Age', 'Height']) 
a = delayed(fit_transform_mi)(df.iloc[:,[0,1]])
b = delayed(fit_transform_mfi)(df.iloc[:,[2]])
c = delayed(setdf)(a,b,df)
df= c.compute()
print(df)
client.close()

Объект c — это ленивый объект Delayed. Этот объект содержит все, что нам нужно для вычисления конечного результата, включая ссылки на все необходимые функции, их входные данные и взаимосвязь друг с другом.

Dask полезен для ускорения вычислений за счет параллельной обработки и когда данные не помещаются в памяти. В приведенном ниже примере 300 млн строк данных, содержащихся в десяти файлах, импутированы с использованием Dask. График процесса показывает, что: 1. Средний и наиболее частый импутеры выполняются параллельно; 2. Все десять файлов также обрабатываются параллельно.

Настраивать

Чтобы подготовить большой объем данных, три строки данных в вашем вопросе реплицируются, чтобы сформировать фрейм данных с 30 миллионами строк. Фрейм данных сохраняется в десяти разных файлах, что дает в общей сложности 300 миллионов строк с той же статистикой, что и в вашем вопросе.

import numpy as np
import pandas as pd

N = 10000000
weight = np.array([100, np.nan, 70]*N)
age = np.array([2, np.nan, 7]*N)
height = np.array([5, np.nan, 5]*N)
df = pd.DataFrame({'Weight': weight, 'Age': age, 'Height': height})

# Save ten large data frames to disk
for i in range(10):
    df.to_parquet(f'./df_to_impute_{i}.parquet', compression='gzip',
                  index=False)

Даск вменение

import graphviz
import dask
import dask.dataframe as dd
from dask_ml.impute import SimpleImputer

# Read all files for imputation in a dask data frame from a specific directory
df = dd.read_parquet('./df_to_impute_*.parquet')

# Set up the imputers and columns
mean_imputer = SimpleImputer(strategy='mean')
mostfreq_imputer = SimpleImputer(strategy='most_frequent')
imputers = [mean_imputer, mostfreq_imputer]

mean_cols = ['Weight', 'Age']
freq_cols = ['Height']
columns = [mean_cols, freq_cols]

# Create a new data frame with imputed values, then visualize the computation.
df_list = []
for imputer, col in zip(imputers, columns):
    df_list.append(imputer.fit_transform(df.loc[:, col]))
imputed_df = dd.concat(df_list, axis=1)
imputed_df.visualize(filename='imputed.svg', rankdir='LR')

# Save the new data frame to disk
imputed_df.to_parquet('imputed_df.parquet', compression='gzip')

Выход

imputed_df.head()

    Weight  Age     Height
0   100.0   2.0     5.0
1   85.0    4.5     5.0
2   70.0    7.0     5.0
3   100.0   2.0     5.0
4   85.0    4.5     5.0


# Check the summary statistics make sense - 300M rows and stats as expected
imputed_df.describe().compute()

    Weight  Age     Height
count   3.000000e+08    3.000000e+08    300000000.0
mean    8.500000e+01    4.500000e+00    5.0
std     1.224745e+01    2.041241e+00    0.0
min     7.000000e+01    2.000000e+00    5.0
25%     7.000000e+01    2.000000e+00    5.0
50%     8.500000e+01    4.500000e+00    5.0
75%     1.000000e+02    7.000000e+00    5.0
max     1.000000e+02    7.000000e+00    5.0

Отличное объяснение, но я могу выбрать только один ответ

ps0604 31.12.2020 03:09

Масштабируется ли он до 100 миллионов строк?

KRKirov 31.12.2020 12:04

Разве dask не должен сделать это прозрачным?

ps0604 31.12.2020 12:09

Не обязательно.

KRKirov 31.12.2020 12:13

Delayed обеспечивает параллельные вычисления. Однако любой код Dask может заблокировать вашу память при вызове метода .compute(). То, что работает с крошечным фреймом данных, может не работать с 300 миллионами строк. Поэтому стоит проверить, работает ли решение Авинаша на больших данных (300 Мб).

KRKirov 31.12.2020 12:27

Другие вопросы по теме