Ошибка нарушения прав доступа при многопоточном сканировании диска с помощью C++

Я новичок в C++ и недавно начал изучать параллельное программирование. В настоящее время я пытаюсь использовать многопоточность для сканирования системного диска и построения дерева Trie со всеми путями к файлам. Однако во время тестирования я столкнулся с проблемой.

В моем main.cpp программа работает правильно, когда индекс цикла i равен 2, 4 или 6. Однако, когда i равен 8 или 10 (иногда работает i = 8), программа вылетает с ошибкой нарушения прав доступа в строке if (m_tasks.empty()) возвращает false; в функции WorkStealQueue::pop с сообщением «Место чтения нарушения доступа 0x00000014».

Я пробовал различные подходы к решению этой проблемы, но безуспешно. Буду очень признателен за любую помощь или рекомендации сообщества по решению этой проблемы.

Заранее большое спасибо!

Ниже приведена подробная реализация кода (моя операционная платформа — Windows):

// workstealthreadpool.h
#pragma once

#include <deque>
#include <functional>
#include <mutex>
#include <thread>
#include <future>
#include <vector>

class WorkStealQueue
{
public:
    WorkStealQueue() = default;
    WorkStealQueue(const WorkStealQueue& rhs) = delete;
    WorkStealQueue& operator=(const WorkStealQueue& rhs) = delete;
    ~WorkStealQueue() = default;

    void push(std::function<void()> task);
    bool pop(std::function<void()>& task);
    bool steal(std::function<void()>& task);

private:
    std::deque<std::function<void()>> m_tasks;
    std::mutex m_mutex;
};

class WorkStealThreadPool
{
public:
    explicit WorkStealThreadPool(std::size_t threadNums)
        : m_stop(false) { init(threadNums); }
    ~WorkStealThreadPool();

    template<typename Callback, typename... Args>
    auto addTask(Callback&& func, Args&&... args)->std::future<typename std::result_of<Callback(Args...)>::type>;

private:
    void init(std::size_t threadNums);
    bool stealTask(std::function<void()>& task);
    void worker(size_t index);

private:
    std::vector<std::thread> m_workThreads;
    std::vector<std::unique_ptr<WorkStealQueue>> m_taskQueues;
    std::atomic<bool> m_stop;
    static thread_local std::size_t m_index;
};

template<typename Callback, typename... Args>
auto WorkStealThreadPool::addTask(Callback&& func, Args&&... args) -> std::future<typename std::result_of<Callback(Args...)>::type>
{
    using returnType = typename std::result_of<Callback(Args...)>::type;
    auto task = std::make_shared<std::packaged_task<returnType()>>(std::bind(std::forward<Callback>(func), std::forward<Args>(args)...));
    std::future<returnType> result = task->get_future();
    {
        m_taskQueues[m_index]->push([task]() { (*task)(); });
    }
    return result;
}
// workstealthreadpool.cpp
#include "workstealthreadpool.h"

void WorkStealQueue::push(std::function<void()> task)
{
    std::lock_guard<std::mutex> lock(m_mutex);
    m_tasks.emplace_back(std::move(task));
}

bool WorkStealQueue::pop(std::function<void()>& task)
{
    std::lock_guard<std::mutex> lock(m_mutex);
    if (m_tasks.empty()) return false;
    task = std::move(m_tasks.front());
    m_tasks.pop_front();
    return true;
}

bool WorkStealQueue::steal(std::function<void()>& task)
{
    std::lock_guard<std::mutex> lock(m_mutex);
    if (m_tasks.empty()) return false;
    task = std::move(m_tasks.back());
    m_tasks.pop_back();
    return true;
}

thread_local std::size_t WorkStealThreadPool::m_index;

void WorkStealThreadPool::init(std::size_t threadNums)
{
    for (std::size_t i = 0; i < threadNums; ++i)
    {
        m_taskQueues.emplace_back(std::make_unique<WorkStealQueue>());
        m_workThreads.emplace_back(&WorkStealThreadPool::worker, this, i);
    }
}

WorkStealThreadPool::~WorkStealThreadPool()
{
    m_stop = true;
    for (std::thread& workerThread : m_workThreads)
    {
        if (workerThread.joinable())
        {
            workerThread.join();
        }
    }
}

bool WorkStealThreadPool::stealTask(std::function<void()>& task)
{
    for (std::size_t i = 0; i < m_taskQueues.size(); ++i)
    {
        std::size_t index = (m_index + i + 1) % m_taskQueues.size();
        if (m_taskQueues[index]->steal(task))
        {
            return true;
        }
    }
    return false;
}

void WorkStealThreadPool::worker(std::size_t index)
{
    m_index = index;
    while (!m_stop)
    {
        std::function<void()> task;
        if (m_taskQueues[m_index]->pop(task) || stealTask(task))
        {
            task();
        }
        else
        {
            std::this_thread::yield();
        }
    }
}
// scanner.h
#pragma once

#include "workstealthreadpool.h"
#include <QMap>
#include <QString>

struct TrieNode
{
    QMap<TrieNode*, QString> childs;
    TrieNode* parent = nullptr;
};

class Scanner
{
public:
    explicit Scanner(std::size_t threadNums);
    virtual ~Scanner();
    virtual void scanDrives(const QStringList& drives);
    virtual bool isScanCompleted();
    virtual std::vector<TrieNode*> fetchScanResults();

private:
    void scanCore(const QString& currentPath, TrieNode* parent);
    void clearTrie(TrieNode* root);

private:
    TrieNode* m_root;
    WorkStealThreadPool* m_threadPool;
    std::mutex m_mutex;
    std::vector<TrieNode*> m_fileNodes;
    std::atomic<int> m_taskCount;
};

#ifdef _DEBUG
void Print(TrieNode* root);
void Print(std::vector<TrieNode*>* fileNodes);
#endif
// scanner.cpp
#include "scanner.h"
#include <QDir>

Scanner::Scanner(std::size_t threadNums)
    : m_root(nullptr)
    , m_threadPool(nullptr)
    , m_taskCount(0)
{
    if (threadNums != 0)
    {
        m_threadPool = new WorkStealThreadPool(threadNums);
    }
}

Scanner::~Scanner()
{
    delete m_threadPool;
    clearTrie(m_root);
}

void Scanner::scanDrives(const QStringList& drives)
{
    clearTrie(m_root);
    m_fileNodes.clear();
    m_root = new TrieNode();
    for (const QString& drive : drives)
    {
        TrieNode* child = new TrieNode();
        child->parent = m_root;
        m_root->childs[child] = drive;
        scanCore(drive, child);
    }
}

bool Scanner::isScanCompleted()
{
    return m_taskCount.load(std::memory_order_acquire) == 0;
}

std::vector<TrieNode*> Scanner::fetchScanResults()
{
    if (!isScanCompleted())
    {
        std::lock_guard<std::mutex> lock(m_mutex);
        return m_fileNodes;
    }
    return m_fileNodes;
}

void Scanner::scanCore(const QString& currentPath, TrieNode* parent)
{
    QDir dir(currentPath);
    if (!dir.exists()) return;

    QStringList fileNames = dir.entryList(QDir::Files);
    for (const QString& fileName : fileNames)
    {
        TrieNode* child = new TrieNode();
        child->parent = parent;
        parent->childs[child] = fileName;

        std::lock_guard<std::mutex> lock(m_mutex);
        m_fileNodes.emplace_back(child);
    }

    QStringList subdirNames = dir.entryList(QDir::Dirs | QDir::NoDotAndDotDot);
    for (const QString& subdirName : subdirNames)
    {
        QString childPath = currentPath + QDir::separator() + subdirName;
        TrieNode* child = new TrieNode();
        child->parent = parent;
        parent->childs[child] = subdirName + "/";

        if (m_threadPool)
        {
            m_taskCount.fetch_add(1, std::memory_order_release);
            m_threadPool->addTask([this, childPath, child]
                {
                    scanCore(childPath, child);
                    m_taskCount.fetch_sub(1, std::memory_order_acquire);
                }
            );
        }
        else {
            scanCore(childPath, child);
        }
    }

}

void Scanner::clearTrie(TrieNode* root)
{
    if (root == nullptr || root->childs.empty()) return;
    for (auto iter = root->childs.begin(); iter != root->childs.end(); ++iter)
    {
        clearTrie(iter.key());
    }
    delete root;
}

#ifdef _DEBUG
void Print(TrieNode* root)
{
    static int level = 0;
    if (root == nullptr || root->childs.empty()) return;
    for (auto iter = root->childs.begin(); iter != root->childs.end(); ++iter)
    {
        qDebug().noquote() << QString("    ").repeated(level) << iter.value();
        ++level;
        Print(iter.key());
        --level;
    }
}
#endif

#ifdef _DEBUG
void Print(std::vector<TrieNode*>* fileNodes)
{
    for (TrieNode* fileNode : *fileNodes)
    {
        qDebug().noquote() << fileNode->parent->childs[fileNode];
    }
}
#endif
//main.cpp
#include "scanner.h"
#include <QStorageInfo>
#include <chrono>

void TestScanDrives(Scanner& scanner, const QStringList& drives)
{
    scanner.scanDrives(drives);
    while (!scanner.isScanCompleted())
    {
        std::this_thread::sleep_for(std::chrono::seconds(1));
    }
}

int main()
{
    QStringList drives;
    for (const QStorageInfo& drive : QStorageInfo::mountedVolumes())
    {
        if (drive.isValid() && drive.isReady())
        {
            drives << drive.rootPath();
        }
    }

    for (int i = 2; i <= 10; i += 2)
    {
        Scanner scanner(i);
        TestScanDrives(scanner, drives);
    }
    
    return 0;
}

Я тщательно просмотрел реализацию класса WorkStealQueue, особенно методы pop и push, чтобы убедиться, что они правильно справляются с задачами. Я изменил порядок памяти атомарной переменной m_taskCount на разные значения, чтобы посмотреть, повлияло ли это на проблему. Я попытался улучшить логику синхронизации потоков и завершения задач в WorkStealThreadPool, но существенных улучшений не увидел.

Используйте отладчик для проверки стека вызовов и переменных во время сбоя.

n. m. could be an AI 29.08.2024 07:58

Не связано, но это if (root == nullptr || root->childs.empty()) return; утечка памяти. Должно быть if (root == nullptr) return;. На самом деле вы не можете удалить root, если у него нет детей.

john 29.08.2024 08:01

Спасибо за ваш отзыв. Оказывается, утечка памяти действительно была вызвана моей оплошностью, и сейчас я ее исправил. Я очень ценю вашу помощь!

bd7xzz 29.08.2024 09:04
Стоит ли изучать PHP в 2023-2024 годах?
Стоит ли изучать PHP в 2023-2024 годах?
Привет всем, сегодня я хочу высказать свои соображения по поводу вопроса, который я уже много раз получал в своем сообществе: "Стоит ли изучать PHP в...
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
В JavaScript одним из самых запутанных понятий является поведение ключевого слова "this" в стрелочной и обычной функциях.
Приемы CSS-макетирования - floats и Flexbox
Приемы CSS-макетирования - floats и Flexbox
Здравствуйте, друзья-студенты! Готовы совершенствовать свои навыки веб-дизайна? Сегодня в нашем путешествии мы рассмотрим приемы CSS-верстки - в...
Тестирование функциональных ngrx-эффектов в Angular 16 с помощью Jest
В системе управления состояниями ngrx, совместимой с Angular 16, появились функциональные эффекты. Это здорово и делает код определенно легче для...
Концепция локализации и ее применение в приложениях React ⚡️
Концепция локализации и ее применение в приложениях React ⚡️
Локализация - это процесс адаптации приложения к различным языкам и культурным требованиям. Это позволяет пользователям получить опыт, соответствующий...
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
1
3
57
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

у вас есть несинхронизированный доступ и изменение m_taskQueues.size() и m_taskQueues[index]

for (std::size_t i = 0; i < threadNums; ++i)
    {
        m_taskQueues.emplace_back(std::make_unique<WorkStealQueue>());
        m_workThreads.emplace_back(&WorkStealThreadPool::worker, this, i);
    }

когда рабочий поток создается, он может попытаться украсть работу из потока после него, и m_taskQueues.size() не будет синхронизирован с m_taskQueues[index] (поскольку в m_taskQueues нет мьютекса, а другой поток может видеть увеличение размера, но не видеть измененные векторные данные, потому что это гонка, к тому же он может не увидеть полностью построенную очередь, поскольку это не атомарные указатели), поэтому разделите этот цикл на два.

for (std::size_t i = 0; i < threadNums; ++i)
    {
        m_taskQueues.emplace_back(std::make_unique<WorkStealQueue>());
    }
for (std::size_t i = 0; i < threadNums; ++i)
    {
        m_workThreads.emplace_back(&WorkStealThreadPool::worker, this, i);
    }

создание потока — это точка синхронизации, когда это произойдет, все очереди будут полностью построены, поэтому гонок на m_taskQueues.size() не будет.

Спасибо за ваш ответ. Сейчас я нахожусь на занятиях, но попробую внести предложенные вами изменения и вернусь к вам позже.

bd7xzz 29.08.2024 08:38

Это сработало! Еще раз спасибо за ваше предложение; он исправил ошибку, которая беспокоила меня все утро.

bd7xzz 29.08.2024 09:02

Другие вопросы по теме