Я пишу небольшое приложение для удаления дубликатов файлов (и для отработки навыков работы с Rust, с которым я относительно новичок!).
В рамках обучения (и поскольку у меня есть опыт управления мультимедиа с использованием очень больших файлов) я полон решимости обеспечить эффективную работу всех ядер моей машины. С этой целью я использую крейт времени выполнения Tokio и deadqueue
в качестве реализации очереди.
Итак, код моего рабочего потока выглядит следующим образом (worker_count
в настоящее время инициализируется количеством ядер, заданным num_cores
, которое можно будет настроить из параметров командной строки):
println!("Starting up {} worker threads...", worker_count);
for n in 0..worker_count {
let rxq = queue.clone();
tokio::spawn(async move {
eprintln!("INFO Thread {}", n);
let _ = io::stderr().flush();
loop {
let ff = rxq.pop().await;
eprintln!("Popped {}", ff.fullpath().display());
process_item(ff).await;
}
});
}
и функцияprocess_item определяется следующим образом:
async fn process_item(mut item:FoundFile) -> () {
match item.calculate_sha().await {
Ok(_)=>{}
Err(e)=>{
println!("ERROR Could not checksum: {}", e)
}
}
()
}
(с помощью spawn_blocking
создан отдельный поток, который выполняет асинхронное сканирование и помещает FoundFile
объекты в очередь)
calculate_sha()
определяется в блоке impl FoundFile
:
pub async fn calculate_sha(&mut self) -> Result<String, Box<dyn Error> > {
let mut file = tokio::fs::File::open(self.fullpath()).await?;
let mut hasher = Box::new(Sha256::new());
//let mut buffer: [u8; BUFFER_SIZE] = [0; BUFFER_SIZE];
let mut buffer = vec![0_u8; BUFFER_SIZE];
eprintln!("SHA calculation on {}", self.fullpath().display());
while file.read(&mut buffer).await? > 0 {
hasher.update(&buffer);
}
let final_result = encode(hasher.finalize());
self.sha = Some(final_result.clone());
Ok( final_result )
}
по сути это то же самое, что я бы сделал в C, C++, Scala и т. д. — откройте файл, создайте хеш, настройте локальный буфер для 1 чанка, затем повторно заполните буфер из файла и вставьте в хеш ( обновление состояния хеширования) до тех пор, пока в файле не закончатся данные. Таким образом, мои требования к памяти должны быть ограничены одним куском и накладными расходами на состояние хеширования. RAII должен позаботиться о том, чтобы все было очищено. (Функция encode
— это hex::encode
, кстати, здесь это не актуально!)
Компилятор принимает все это нормально, но когда я на самом деле пытаюсь его запустить, я получаю довольно бесполезную информацию:
Starting up 8 worker threads...
thread 'tokio-runtime-worker' has overflowed its stack
fatal runtime error: stack overflow
Комментирование блока file.read() / hasher.update()
означает, что он работает нормально (хотя, конечно, на самом деле ничего не происходит!).
Однако я не могу понять, что может быть причиной переполнения. Я ничего не повторяю, просто неоднократно вызываю hasher.update с локальным буфером. Как вы можете видеть из приведенного выше кода, я попытался убедиться, что оба hasher
и buffer
находятся в куче, используя Box
и Vec
соответственно, и убедился, что buffer
инициализирован, но ничего, кроме комментирования цикла, похоже, не имеет никакого значения. .
Самое странное, что когда я отслеживаю выполнение с помощью операторов eprintln!
(предполагая, что вывод STDERR не буферизуется, как и в случае с другими языками, которые я использовал), я даже не вижу строку INFO Thread {n}
.
Очевидно, что под капотом происходит что-то более тонкое, но я немного не понимаю, что это такое - может ли кто-нибудь, кто знает Токио лучше, чем я, пролить свет на это?
Большое спасибо!
Чтобы проверить, действительно ли здесь используется неограниченное использование стека или стек слишком мал, попробуйте увеличить размер стека. Вы можете сделать это, создав tokio::runtime::Runtime
вручную и вызвав tokio::runtime::Builder::thread_stack_size().
Кроме того, стек переполнялся и в режиме выпуска, или только в режиме отладки?
@ChayimFriedman интересно! Когда я собирал в режиме выпуска, он не переполнялся, а работал правильно. Почему существует несоответствие?
Что касается файлового ввода-вывода, причина, по которой я использую Tokio, заключается в распараллеливании дорогостоящих вычислительных операций, таких как вычисление хеша SHA-256.
tokio еще хуже подходит для задач с интенсивными вычислениями. Используйте для этого обычные темы. Или район.
Потому что в режиме выпуска компилятор оптимизирует больше, поэтому стек используется меньше. Простой. Это показывает, что с вашим кодом нет никаких проблем, ему просто нужен больший стек (в режиме отладки).
Обратите внимание, что IIRC vec![0; BUFFER_SIZE]
сначала выделяет данные в стеке, а затем копирует их в кучу (кстати, это может быть оптимизировано в режиме выпуска). Попробуйте с let mut buffer = std::iter::repeat (0_u8).take (BUFFER_SIZE).collect::<Vec<_>>();
@Jmb Макрос vec!
определенно этого не делает. Похоже, вы путаете это с Box::new([0; BUFFER_SIZE])
?
the reason I am using Tokio is to parallelise expensive compute operations
- это явно не один из вариантов использования Токио. См. tokio.rs/tokio/tutorial#when-not-to-use-tokio. Целью использования Tokio является ускорение массово параллельных систем ввода-вывода, таких как веб-серверы, которые параллельно обслуживают множество соединений.
Большое спасибо всем за информацию, я заставил ее работать, настроив стек thead с помощью компоновщика, и теперь я отправляюсь исследовать Rayon (и правильно его отлаживать).
В вашем коде нет неограниченного использования стека, ему просто нужен стек большего размера.
Не существует портативного способа увеличить размер стека основного потока (хотя это возможно с помощью аргументов компоновщика), но для порожденных потоков Rust предоставляет std::thread::Builder::stack_size().
Однако ваши потоки tokio
рабочие, поэтому вам нужен другой путь. К счастью, tokio
также предоставляет возможность настроить размер стека воркеров, используя tokio::runtime::Builder::thread_stack_size() при сборке среды выполнения.
Также обратите внимание, что оптимизированные сборки обычно используют меньше места в стеке, и ваш код может работать на них без изменений.
Еще раз спасибо за всю помощь и подсказки выше, поскольку я уже упоминал, что причина, по которой я сделал это, заключалась в том, чтобы узнать больше о Rust, и я определенно это сделал :-)
Сначала я был в замешательстве относительно того, как использовать tokio::runtime::Builder, поэтому решил, что публикация самостоятельного ответа будет хорошей идеей, поэтому у меня есть решение для справки!
Благодаря указателю @Chayim Friedman я реорганизовал запуск своего кода следующим образом:
#[::tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
.... do stuff ...
}
в это:
fn main() -> Result<(), Box<dyn Error>> {
let runtime = Builder::new_multi_thread()
.thread_stack_size(524288)
.enable_io()
.enable_time()
.build()
.unwrap();
runtime.block_on(async_main())
}
async fn async_main() -> Result<(), Box<dyn Error>> {
...do stuff...
}
и тогда все пошло нормально.
Однако, основываясь на другой информации, я затем полностью переработал Tokio в пользу Rayon, что значительно упростило построение пула потоков и дало мне повышение производительности до 25% (по времени, затраченному на дорогостоящие операции) на моем 8-ядерном i7.
fn main() -> Result<(), Box<dyn Error>> {
...setup...
rayon::scope(|s| {
s.spawn(move |_| {
... do stuff ...
});
... more spawns etc. ...
});
}
Использование tokio бесполезно, если все, что вы делаете, это файловый ввод-вывод. Кроме того, у вас есть небольшая ошибка, когда вы читаете не полный буфер.