Распараллеливание обработки файлов с помощью rayon

У меня есть 7 файлов CSV (по 55 МБ каждый) в моей локальной исходной папке, которые я хочу преобразовать в формат JSON и сохранить в локальной папке. Моя ОС — MacOS (четырехъядерный процессор Intel i5). По сути, это простая программа на Rust, которая запускается из консоли как

./target/release/convert <source-folder> <target-folder>

Мой подход к многопоточности с использованием потоков Rust выглядит следующим образом

fn main() -> Result<()> {
    let source_dir = PathBuf::from(get_first_arg()?);
    let target_dir = PathBuf::from(get_second_arg()?);

    let paths = get_file_paths(&source_dir)?;

    let mut handles = vec![];
    for source_path in paths {
        let target_path = create_target_file_path(&source_path, &target_dir)?;

        handles.push(thread::spawn(move || {
            let _ = convert(&source_path, &target_path);
        }));
    }

    for h in handles {
        let _ = h.join();
    }

    Ok(())
}

Я запускаю его, используя time для измерения загрузки ЦП, что дает

2.93s user 0.55s system 316% cpu 1.098 total

Затем я пытаюсь реализовать ту же задачу, используя крейт rayon (threadpool):

fn main() -> Result<()> {
    let source_dir = PathBuf::from(get_first_arg()?);
    let target_dir = PathBuf::from(get_second_arg()?);

    let paths = get_file_paths(&source_dir)?;
    let pool = rayon::ThreadPoolBuilder::new().num_threads(15).build()?;

    for source_path in paths {
        let target_path = create_target_file_path(&source_path, &target_dir)?;

        pool.install(|| {
            let _ = convert(&source_path, &target_path);
        });
    }

    Ok(())
}

Я запускаю его, используя time для измерения загрузки ЦП, что дает

2.97s user 0.53s system 98% cpu 3.561 total

Я не вижу никаких улучшений, когда использую искусственный шелк. Наверное, я неправильно использую искусственный шелк. Кто-нибудь знает, что с ним не так?

Обновление (09 апр)

После некоторого времени борьбы с проверкой ржавчины ?, просто хочу поделиться решение, может быть, это поможет другим, или кто-нибудь еще может предложить лучший подход/решение

pool.scope(move |s| {
        for source_path in paths {
            let target_path = create_target_file_path(&source_path, &target_dir).unwrap();
            s.spawn(move |_s| {
                convert(&source_path, &target_path).unwrap();
            });
        }
    });

Но все же не бьет подход с использованием ржавчины std::thread для 113 файлов.

46.72s user 8.30s system 367% cpu 14.955 total

Обновление (10 апр)

После комментария @maxy

// rayon solution
paths.into_par_iter().for_each(|source_path| {
        let target_path = create_target_file_path(&source_path, &target_dir);

        match target_path {
            Ok(target_path) => {
                info!(
                    "Processing {}",
                    target_path.to_str().unwrap_or("Unable to convert")
                );
                let res = convert(&source_path, &target_path);
                if let Err(e) = res {
                    error!("{}", e);
                }
            }
            Err(e) => error!("{}", e),
        }
    });
    // std::thread solution
    let mut handles = vec![];
    for source_path in paths {
        let target_path = create_target_file_path(&source_path, &target_dir)?;
        handles.push(thread::spawn(move || {
            let _ = convert(&source_path, &target_path);
        }));
    }

    for handle in handles {
        let _ = handle.join();
    }

Сравнение на 57 файлах:

std::threads: 23.71s user 4.19s system 356% cpu 7.835 total
rayon:        23.36s user 4.08s system 324% cpu 8.464 total

может быть попробовать добавить больше файлов может 100-500. И вы увидите результат. Для ЦП его тривиальная задача - обработать ваш csv. Разница тем больше, чем больше вы продвигаетесь к сложной обработке/огромным файлам/количеству файлов, которые вы обрабатываете.

Arjun 09.04.2022 13:34

@Arjun Я пытался преобразовать 113 файлов (всего 4 ГБ). Я думаю, что этого количества должно быть достаточно, чтобы увидеть разницу. Ржавые нити: 11.64s user 2.20s system 333% cpu 4.156 total, искусственный шелк: 47.31s user 8.31s system 96% cpu 57.465 total.

fade2black 09.04.2022 13:58

@Arjun В случае потоков Rust пользователь + система намного больше, чем общее количество, а это означает, что используется несколько ядер. Но во втором случае я не вижу никаких признаков. Наверное, я неправильно использую вискоза, но не знаю, как правильно.

fade2black 09.04.2022 14:01

позвольте мне проверить ваш код на локальном компьютере и дать ему несколько запусков

Arjun 09.04.2022 14:35

Вам просто нужно использовать параллельный итератор на get_file_paths(), а район сделает все остальное.

MeetTitan 09.04.2022 17:42

@MeetTitan спасибо за предложение. Да, ты прав. Я рассматривал такой подход. Но я хотел бы реализовать это вручную, чтобы я мог управлять пулами.

fade2black 09.04.2022 21:18
Стоит ли изучать PHP в 2023-2024 годах?
Стоит ли изучать PHP в 2023-2024 годах?
Привет всем, сегодня я хочу высказать свои соображения по поводу вопроса, который я уже много раз получал в своем сообществе: "Стоит ли изучать PHP в...
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
В JavaScript одним из самых запутанных понятий является поведение ключевого слова "this" в стрелочной и обычной функциях.
Приемы CSS-макетирования - floats и Flexbox
Приемы CSS-макетирования - floats и Flexbox
Здравствуйте, друзья-студенты! Готовы совершенствовать свои навыки веб-дизайна? Сегодня в нашем путешествии мы рассмотрим приемы CSS-верстки - в...
Тестирование функциональных ngrx-эффектов в Angular 16 с помощью Jest
В системе управления состояниями ngrx, совместимой с Angular 16, появились функциональные эффекты. Это здорово и делает код определенно легче для...
Концепция локализации и ее применение в приложениях React ⚡️
Концепция локализации и ее применение в приложениях React ⚡️
Локализация - это процесс адаптации приложения к различным языкам и культурным требованиям. Это позволяет пользователям получить опыт, соответствующий...
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
0
6
66
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Документ для вискоза установить не очень понятен, но подпись:

pub fn install<OP, R>(&self, op: OP) -> R where
    R: Send,
    OP: FnOnce() -> R + Send, 

говорит, что возвращает тип R. Тот же тип R, который возвращает ваше закрытие. Так что очевидно install() нужно дождаться результата.

Это имеет смысл только в том случае, если замыкание порождает дополнительные задачи, например, используя .par_iter() внутри замыкания. Я предлагаю использовать параллельные итераторы района напрямую (вместо вашего цикла for) над списком файлов. Вам даже не нужно создавать собственный пул потоков, пул по умолчанию обычно подходит.

Если вы настаиваете на том, чтобы сделать это вручную, вам придется использовать порождать() вместо install. И вам, вероятно, придется переместить свой цикл в лямбду, переданную в объем().

Есть улучшения, но все еще далеко от простого подхода с использованием std::thread. Пожалуйста, смотрите обновления в ОП.

fade2black 09.04.2022 21:10

Я бы все равно попробовал сравнить с paths.into_par_iter().for_each(|source_path| {...}) просто потому, что это так легко сделать. Сделайте это внутри pool.install(|| { ... }), чтобы использовать свой бассейн.

maxy 09.04.2022 22:55

Вы можете сравнить в обновлении OP. Почти то же самое.

fade2black 10.04.2022 01:16

Мое предположение: потоки rayon дольше ждут файлового ввода-вывода, потому что они запрашивают следующий файл только тогда, когда поток становится бездействующим. (Rayon оптимизирован для задач, связанных с ЦП.) Ваша версия std::threads запрашивает 57 файлов параллельно, поэтому ОС уже может планировать ввод-вывод для других файлов в фоновом режиме.

maxy 10.04.2022 10:54

Другие вопросы по теме