У меня есть 7 файлов CSV (по 55 МБ каждый) в моей локальной исходной папке, которые я хочу преобразовать в формат JSON и сохранить в локальной папке. Моя ОС — MacOS (четырехъядерный процессор Intel i5). По сути, это простая программа на Rust, которая запускается из консоли как
./target/release/convert <source-folder> <target-folder>
Мой подход к многопоточности с использованием потоков Rust выглядит следующим образом
fn main() -> Result<()> {
let source_dir = PathBuf::from(get_first_arg()?);
let target_dir = PathBuf::from(get_second_arg()?);
let paths = get_file_paths(&source_dir)?;
let mut handles = vec![];
for source_path in paths {
let target_path = create_target_file_path(&source_path, &target_dir)?;
handles.push(thread::spawn(move || {
let _ = convert(&source_path, &target_path);
}));
}
for h in handles {
let _ = h.join();
}
Ok(())
}
Я запускаю его, используя time
для измерения загрузки ЦП, что дает
2.93s user 0.55s system 316% cpu 1.098 total
Затем я пытаюсь реализовать ту же задачу, используя крейт rayon
(threadpool):
fn main() -> Result<()> {
let source_dir = PathBuf::from(get_first_arg()?);
let target_dir = PathBuf::from(get_second_arg()?);
let paths = get_file_paths(&source_dir)?;
let pool = rayon::ThreadPoolBuilder::new().num_threads(15).build()?;
for source_path in paths {
let target_path = create_target_file_path(&source_path, &target_dir)?;
pool.install(|| {
let _ = convert(&source_path, &target_path);
});
}
Ok(())
}
Я запускаю его, используя time
для измерения загрузки ЦП, что дает
2.97s user 0.53s system 98% cpu 3.561 total
Я не вижу никаких улучшений, когда использую искусственный шелк. Наверное, я неправильно использую искусственный шелк. Кто-нибудь знает, что с ним не так?
Обновление (09 апр)
После некоторого времени борьбы с проверкой ржавчины ?, просто хочу поделиться решение, может быть, это поможет другим, или кто-нибудь еще может предложить лучший подход/решение
pool.scope(move |s| {
for source_path in paths {
let target_path = create_target_file_path(&source_path, &target_dir).unwrap();
s.spawn(move |_s| {
convert(&source_path, &target_path).unwrap();
});
}
});
Но все же не бьет подход с использованием ржавчины std::thread
для 113 файлов.
46.72s user 8.30s system 367% cpu 14.955 total
Обновление (10 апр)
После комментария @maxy
// rayon solution
paths.into_par_iter().for_each(|source_path| {
let target_path = create_target_file_path(&source_path, &target_dir);
match target_path {
Ok(target_path) => {
info!(
"Processing {}",
target_path.to_str().unwrap_or("Unable to convert")
);
let res = convert(&source_path, &target_path);
if let Err(e) = res {
error!("{}", e);
}
}
Err(e) => error!("{}", e),
}
});
// std::thread solution
let mut handles = vec![];
for source_path in paths {
let target_path = create_target_file_path(&source_path, &target_dir)?;
handles.push(thread::spawn(move || {
let _ = convert(&source_path, &target_path);
}));
}
for handle in handles {
let _ = handle.join();
}
Сравнение на 57 файлах:
std::threads: 23.71s user 4.19s system 356% cpu 7.835 total
rayon: 23.36s user 4.08s system 324% cpu 8.464 total
@Arjun Я пытался преобразовать 113 файлов (всего 4 ГБ). Я думаю, что этого количества должно быть достаточно, чтобы увидеть разницу. Ржавые нити: 11.64s user 2.20s system 333% cpu 4.156 total
, искусственный шелк: 47.31s user 8.31s system 96% cpu 57.465 total
.
@Arjun В случае потоков Rust пользователь + система намного больше, чем общее количество, а это означает, что используется несколько ядер. Но во втором случае я не вижу никаких признаков. Наверное, я неправильно использую вискоза, но не знаю, как правильно.
позвольте мне проверить ваш код на локальном компьютере и дать ему несколько запусков
Вам просто нужно использовать параллельный итератор на get_file_paths()
, а район сделает все остальное.
@MeetTitan спасибо за предложение. Да, ты прав. Я рассматривал такой подход. Но я хотел бы реализовать это вручную, чтобы я мог управлять пулами.
Документ для вискоза установить не очень понятен, но подпись:
pub fn install<OP, R>(&self, op: OP) -> R where
R: Send,
OP: FnOnce() -> R + Send,
говорит, что возвращает тип R
. Тот же тип R
, который возвращает ваше закрытие. Так что очевидно install()
нужно дождаться результата.
Это имеет смысл только в том случае, если замыкание порождает дополнительные задачи, например, используя .par_iter()
внутри замыкания. Я предлагаю использовать параллельные итераторы района напрямую (вместо вашего цикла for
) над списком файлов. Вам даже не нужно создавать собственный пул потоков, пул по умолчанию обычно подходит.
Если вы настаиваете на том, чтобы сделать это вручную, вам придется использовать порождать() вместо install
. И вам, вероятно, придется переместить свой цикл в лямбду, переданную в объем().
Есть улучшения, но все еще далеко от простого подхода с использованием std::thread
. Пожалуйста, смотрите обновления в ОП.
Я бы все равно попробовал сравнить с paths.into_par_iter().for_each(|source_path| {...})
просто потому, что это так легко сделать. Сделайте это внутри pool.install(|| { ... })
, чтобы использовать свой бассейн.
Вы можете сравнить в обновлении OP. Почти то же самое.
Мое предположение: потоки rayon дольше ждут файлового ввода-вывода, потому что они запрашивают следующий файл только тогда, когда поток становится бездействующим. (Rayon оптимизирован для задач, связанных с ЦП.) Ваша версия std::threads запрашивает 57 файлов параллельно, поэтому ОС уже может планировать ввод-вывод для других файлов в фоновом режиме.
может быть попробовать добавить больше файлов может 100-500. И вы увидите результат. Для ЦП его тривиальная задача - обработать ваш csv. Разница тем больше, чем больше вы продвигаетесь к сложной обработке/огромным файлам/количеству файлов, которые вы обрабатываете.