Переполнение стека tokio-runtime-worker в функции хеширования

Я пишу небольшое приложение для удаления дубликатов файлов (и для отработки навыков работы с Rust, с которым я относительно новичок!).

В рамках обучения (и поскольку у меня есть опыт управления мультимедиа с использованием очень больших файлов) я полон решимости обеспечить эффективную работу всех ядер моей машины. С этой целью я использую крейт времени выполнения Tokio и deadqueue в качестве реализации очереди.

Итак, код моего рабочего потока выглядит следующим образом (worker_count в настоящее время инициализируется количеством ядер, заданным num_cores, которое можно будет настроить из параметров командной строки):

    println!("Starting up {} worker threads...", worker_count);
    for n in 0..worker_count {
        let rxq = queue.clone();
        tokio::spawn(async move {
            eprintln!("INFO Thread {}", n);
            let _ = io::stderr().flush();
            loop {
                let ff = rxq.pop().await;
                eprintln!("Popped {}", ff.fullpath().display());
                
                process_item(ff).await;
            }
        });
    }

и функцияprocess_item определяется следующим образом:

async fn process_item(mut item:FoundFile) -> () {
    match item.calculate_sha().await {
        Ok(_)=>{}
        Err(e)=>{
            println!("ERROR Could not checksum: {}", e)
        }
    }
    ()
}

(с помощью spawn_blocking создан отдельный поток, который выполняет асинхронное сканирование и помещает FoundFile объекты в очередь)

calculate_sha() определяется в блоке impl FoundFile:

    pub async fn calculate_sha(&mut self) -> Result<String, Box<dyn Error> > {
        let mut file = tokio::fs::File::open(self.fullpath()).await?;
        let mut hasher = Box::new(Sha256::new());

        //let mut buffer: [u8; BUFFER_SIZE] = [0; BUFFER_SIZE];
        let mut buffer = vec![0_u8; BUFFER_SIZE];

        eprintln!("SHA calculation on {}", self.fullpath().display());


        while file.read(&mut buffer).await? > 0 {
            hasher.update(&buffer);
        }

        let final_result = encode(hasher.finalize());
        self.sha = Some(final_result.clone());
        Ok( final_result )
    }

по сути это то же самое, что я бы сделал в C, C++, Scala и т. д. — откройте файл, создайте хеш, настройте локальный буфер для 1 чанка, затем повторно заполните буфер из файла и вставьте в хеш ( обновление состояния хеширования) до тех пор, пока в файле не закончатся данные. Таким образом, мои требования к памяти должны быть ограничены одним куском и накладными расходами на состояние хеширования. RAII должен позаботиться о том, чтобы все было очищено. (Функция encode — это hex::encode, кстати, здесь это не актуально!)

Компилятор принимает все это нормально, но когда я на самом деле пытаюсь его запустить, я получаю довольно бесполезную информацию:

Starting up 8 worker threads...

thread 'tokio-runtime-worker' has overflowed its stack
fatal runtime error: stack overflow

Комментирование блока file.read() / hasher.update() означает, что он работает нормально (хотя, конечно, на самом деле ничего не происходит!).

Однако я не могу понять, что может быть причиной переполнения. Я ничего не повторяю, просто неоднократно вызываю hasher.update с локальным буфером. Как вы можете видеть из приведенного выше кода, я попытался убедиться, что оба hasher и buffer находятся в куче, используя Box и Vec соответственно, и убедился, что buffer инициализирован, но ничего, кроме комментирования цикла, похоже, не имеет никакого значения. .

Самое странное, что когда я отслеживаю выполнение с помощью операторов eprintln! (предполагая, что вывод STDERR не буферизуется, как и в случае с другими языками, которые я использовал), я даже не вижу строку INFO Thread {n}.

Очевидно, что под капотом происходит что-то более тонкое, но я немного не понимаю, что это такое - может ли кто-нибудь, кто знает Токио лучше, чем я, пролить свет на это?

Большое спасибо!

Использование tokio бесполезно, если все, что вы делаете, это файловый ввод-вывод. Кроме того, у вас есть небольшая ошибка, когда вы читаете не полный буфер.

Chayim Friedman 30.06.2024 15:20

Чтобы проверить, действительно ли здесь используется неограниченное использование стека или стек слишком мал, попробуйте увеличить размер стека. Вы можете сделать это, создав tokio::runtime::Runtime вручную и вызвав tokio::runtime::Builder::thread_stack_size().

Chayim Friedman 30.06.2024 15:22

Кроме того, стек переполнялся и в режиме выпуска, или только в режиме отладки?

Chayim Friedman 30.06.2024 15:23

@ChayimFriedman интересно! Когда я собирал в режиме выпуска, он не переполнялся, а работал правильно. Почему существует несоответствие?

fredex42 30.06.2024 15:57

Что касается файлового ввода-вывода, причина, по которой я использую Tokio, заключается в распараллеливании дорогостоящих вычислительных операций, таких как вычисление хеша SHA-256.

fredex42 30.06.2024 15:57

tokio еще хуже подходит для задач с интенсивными вычислениями. Используйте для этого обычные темы. Или район.

Chayim Friedman 30.06.2024 16:19

Потому что в режиме выпуска компилятор оптимизирует больше, поэтому стек используется меньше. Простой. Это показывает, что с вашим кодом нет никаких проблем, ему просто нужен больший стек (в режиме отладки).

Chayim Friedman 30.06.2024 16:20

Обратите внимание, что IIRC vec![0; BUFFER_SIZE] сначала выделяет данные в стеке, а затем копирует их в кучу (кстати, это может быть оптимизировано в режиме выпуска). Попробуйте с let mut buffer = std::iter::repeat (0_u8).take (BUFFER_SIZE).collect::<Vec<_>>();

Jmb 30.06.2024 20:52

@Jmb Макрос vec! определенно этого не делает. Похоже, вы путаете это с Box::new([0; BUFFER_SIZE])?

kmdreko 30.06.2024 21:26
the reason I am using Tokio is to parallelise expensive compute operations - это явно не один из вариантов использования Токио. См. tokio.rs/tokio/tutorial#when-not-to-use-tokio. Целью использования Tokio является ускорение массово параллельных систем ввода-вывода, таких как веб-серверы, которые параллельно обслуживают множество соединений.
Finomnis 30.06.2024 21:36

Большое спасибо всем за информацию, я заставил ее работать, настроив стек thead с помощью компоновщика, и теперь я отправляюсь исследовать Rayon (и правильно его отлаживать).

fredex42 30.06.2024 21:47
Стоит ли изучать PHP в 2023-2024 годах?
Стоит ли изучать PHP в 2023-2024 годах?
Привет всем, сегодня я хочу высказать свои соображения по поводу вопроса, который я уже много раз получал в своем сообществе: "Стоит ли изучать PHP в...
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
В JavaScript одним из самых запутанных понятий является поведение ключевого слова "this" в стрелочной и обычной функциях.
Приемы CSS-макетирования - floats и Flexbox
Приемы CSS-макетирования - floats и Flexbox
Здравствуйте, друзья-студенты! Готовы совершенствовать свои навыки веб-дизайна? Сегодня в нашем путешествии мы рассмотрим приемы CSS-верстки - в...
Тестирование функциональных ngrx-эффектов в Angular 16 с помощью Jest
В системе управления состояниями ngrx, совместимой с Angular 16, появились функциональные эффекты. Это здорово и делает код определенно легче для...
Концепция локализации и ее применение в приложениях React ⚡️
Концепция локализации и ее применение в приложениях React ⚡️
Локализация - это процесс адаптации приложения к различным языкам и культурным требованиям. Это позволяет пользователям получить опыт, соответствующий...
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
0
11
81
2
Перейти к ответу Данный вопрос помечен как решенный

Ответы 2

Ответ принят как подходящий

В вашем коде нет неограниченного использования стека, ему просто нужен стек большего размера.

Не существует портативного способа увеличить размер стека основного потока (хотя это возможно с помощью аргументов компоновщика), но для порожденных потоков Rust предоставляет std::thread::Builder::stack_size().

Однако ваши потоки tokio рабочие, поэтому вам нужен другой путь. К счастью, tokio также предоставляет возможность настроить размер стека воркеров, используя tokio::runtime::Builder::thread_stack_size() при сборке среды выполнения.

Также обратите внимание, что оптимизированные сборки обычно используют меньше места в стеке, и ваш код может работать на них без изменений.

Еще раз спасибо за всю помощь и подсказки выше, поскольку я уже упоминал, что причина, по которой я сделал это, заключалась в том, чтобы узнать больше о Rust, и я определенно это сделал :-)

Сначала я был в замешательстве относительно того, как использовать tokio::runtime::Builder, поэтому решил, что публикация самостоятельного ответа будет хорошей идеей, поэтому у меня есть решение для справки!

Благодаря указателю @Chayim Friedman я реорганизовал запуск своего кода следующим образом:

#[::tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
   .... do stuff ...
}

в это:

fn main() -> Result<(), Box<dyn Error>> {
    let runtime = Builder::new_multi_thread()
        .thread_stack_size(524288)
        .enable_io()
        .enable_time()
        .build()
        .unwrap();

    runtime.block_on(async_main())
}

async fn async_main() -> Result<(), Box<dyn Error>> {
   ...do stuff...
}

и тогда все пошло нормально.

Однако, основываясь на другой информации, я затем полностью переработал Tokio в пользу Rayon, что значительно упростило построение пула потоков и дало мне повышение производительности до 25% (по времени, затраченному на дорогостоящие операции) на моем 8-ядерном i7.

fn main() -> Result<(), Box<dyn Error>> {
  ...setup...

  rayon::scope(|s| {
     s.spawn(move |_| {
        ... do stuff ...
     });

     ... more spawns etc. ...
  });
}

Другие вопросы по теме