Я новичок в C++ и недавно начал изучать параллельное программирование. В настоящее время я пытаюсь использовать многопоточность для сканирования системного диска и построения дерева Trie со всеми путями к файлам. Однако во время тестирования я столкнулся с проблемой.
В моем main.cpp программа работает правильно, когда индекс цикла i равен 2, 4 или 6. Однако, когда i равен 8 или 10 (иногда работает i = 8), программа вылетает с ошибкой нарушения прав доступа в строке if (m_tasks.empty()) возвращает false; в функции WorkStealQueue::pop с сообщением «Место чтения нарушения доступа 0x00000014».
Я пробовал различные подходы к решению этой проблемы, но безуспешно. Буду очень признателен за любую помощь или рекомендации сообщества по решению этой проблемы.
Заранее большое спасибо!
Ниже приведена подробная реализация кода (моя операционная платформа — Windows):
// workstealthreadpool.h
#pragma once
#include <deque>
#include <functional>
#include <mutex>
#include <thread>
#include <future>
#include <vector>
class WorkStealQueue
{
public:
WorkStealQueue() = default;
WorkStealQueue(const WorkStealQueue& rhs) = delete;
WorkStealQueue& operator=(const WorkStealQueue& rhs) = delete;
~WorkStealQueue() = default;
void push(std::function<void()> task);
bool pop(std::function<void()>& task);
bool steal(std::function<void()>& task);
private:
std::deque<std::function<void()>> m_tasks;
std::mutex m_mutex;
};
class WorkStealThreadPool
{
public:
explicit WorkStealThreadPool(std::size_t threadNums)
: m_stop(false) { init(threadNums); }
~WorkStealThreadPool();
template<typename Callback, typename... Args>
auto addTask(Callback&& func, Args&&... args)->std::future<typename std::result_of<Callback(Args...)>::type>;
private:
void init(std::size_t threadNums);
bool stealTask(std::function<void()>& task);
void worker(size_t index);
private:
std::vector<std::thread> m_workThreads;
std::vector<std::unique_ptr<WorkStealQueue>> m_taskQueues;
std::atomic<bool> m_stop;
static thread_local std::size_t m_index;
};
template<typename Callback, typename... Args>
auto WorkStealThreadPool::addTask(Callback&& func, Args&&... args) -> std::future<typename std::result_of<Callback(Args...)>::type>
{
using returnType = typename std::result_of<Callback(Args...)>::type;
auto task = std::make_shared<std::packaged_task<returnType()>>(std::bind(std::forward<Callback>(func), std::forward<Args>(args)...));
std::future<returnType> result = task->get_future();
{
m_taskQueues[m_index]->push([task]() { (*task)(); });
}
return result;
}
// workstealthreadpool.cpp
#include "workstealthreadpool.h"
void WorkStealQueue::push(std::function<void()> task)
{
std::lock_guard<std::mutex> lock(m_mutex);
m_tasks.emplace_back(std::move(task));
}
bool WorkStealQueue::pop(std::function<void()>& task)
{
std::lock_guard<std::mutex> lock(m_mutex);
if (m_tasks.empty()) return false;
task = std::move(m_tasks.front());
m_tasks.pop_front();
return true;
}
bool WorkStealQueue::steal(std::function<void()>& task)
{
std::lock_guard<std::mutex> lock(m_mutex);
if (m_tasks.empty()) return false;
task = std::move(m_tasks.back());
m_tasks.pop_back();
return true;
}
thread_local std::size_t WorkStealThreadPool::m_index;
void WorkStealThreadPool::init(std::size_t threadNums)
{
for (std::size_t i = 0; i < threadNums; ++i)
{
m_taskQueues.emplace_back(std::make_unique<WorkStealQueue>());
m_workThreads.emplace_back(&WorkStealThreadPool::worker, this, i);
}
}
WorkStealThreadPool::~WorkStealThreadPool()
{
m_stop = true;
for (std::thread& workerThread : m_workThreads)
{
if (workerThread.joinable())
{
workerThread.join();
}
}
}
bool WorkStealThreadPool::stealTask(std::function<void()>& task)
{
for (std::size_t i = 0; i < m_taskQueues.size(); ++i)
{
std::size_t index = (m_index + i + 1) % m_taskQueues.size();
if (m_taskQueues[index]->steal(task))
{
return true;
}
}
return false;
}
void WorkStealThreadPool::worker(std::size_t index)
{
m_index = index;
while (!m_stop)
{
std::function<void()> task;
if (m_taskQueues[m_index]->pop(task) || stealTask(task))
{
task();
}
else
{
std::this_thread::yield();
}
}
}
// scanner.h
#pragma once
#include "workstealthreadpool.h"
#include <QMap>
#include <QString>
struct TrieNode
{
QMap<TrieNode*, QString> childs;
TrieNode* parent = nullptr;
};
class Scanner
{
public:
explicit Scanner(std::size_t threadNums);
virtual ~Scanner();
virtual void scanDrives(const QStringList& drives);
virtual bool isScanCompleted();
virtual std::vector<TrieNode*> fetchScanResults();
private:
void scanCore(const QString& currentPath, TrieNode* parent);
void clearTrie(TrieNode* root);
private:
TrieNode* m_root;
WorkStealThreadPool* m_threadPool;
std::mutex m_mutex;
std::vector<TrieNode*> m_fileNodes;
std::atomic<int> m_taskCount;
};
#ifdef _DEBUG
void Print(TrieNode* root);
void Print(std::vector<TrieNode*>* fileNodes);
#endif
// scanner.cpp
#include "scanner.h"
#include <QDir>
Scanner::Scanner(std::size_t threadNums)
: m_root(nullptr)
, m_threadPool(nullptr)
, m_taskCount(0)
{
if (threadNums != 0)
{
m_threadPool = new WorkStealThreadPool(threadNums);
}
}
Scanner::~Scanner()
{
delete m_threadPool;
clearTrie(m_root);
}
void Scanner::scanDrives(const QStringList& drives)
{
clearTrie(m_root);
m_fileNodes.clear();
m_root = new TrieNode();
for (const QString& drive : drives)
{
TrieNode* child = new TrieNode();
child->parent = m_root;
m_root->childs[child] = drive;
scanCore(drive, child);
}
}
bool Scanner::isScanCompleted()
{
return m_taskCount.load(std::memory_order_acquire) == 0;
}
std::vector<TrieNode*> Scanner::fetchScanResults()
{
if (!isScanCompleted())
{
std::lock_guard<std::mutex> lock(m_mutex);
return m_fileNodes;
}
return m_fileNodes;
}
void Scanner::scanCore(const QString& currentPath, TrieNode* parent)
{
QDir dir(currentPath);
if (!dir.exists()) return;
QStringList fileNames = dir.entryList(QDir::Files);
for (const QString& fileName : fileNames)
{
TrieNode* child = new TrieNode();
child->parent = parent;
parent->childs[child] = fileName;
std::lock_guard<std::mutex> lock(m_mutex);
m_fileNodes.emplace_back(child);
}
QStringList subdirNames = dir.entryList(QDir::Dirs | QDir::NoDotAndDotDot);
for (const QString& subdirName : subdirNames)
{
QString childPath = currentPath + QDir::separator() + subdirName;
TrieNode* child = new TrieNode();
child->parent = parent;
parent->childs[child] = subdirName + "/";
if (m_threadPool)
{
m_taskCount.fetch_add(1, std::memory_order_release);
m_threadPool->addTask([this, childPath, child]
{
scanCore(childPath, child);
m_taskCount.fetch_sub(1, std::memory_order_acquire);
}
);
}
else {
scanCore(childPath, child);
}
}
}
void Scanner::clearTrie(TrieNode* root)
{
if (root == nullptr || root->childs.empty()) return;
for (auto iter = root->childs.begin(); iter != root->childs.end(); ++iter)
{
clearTrie(iter.key());
}
delete root;
}
#ifdef _DEBUG
void Print(TrieNode* root)
{
static int level = 0;
if (root == nullptr || root->childs.empty()) return;
for (auto iter = root->childs.begin(); iter != root->childs.end(); ++iter)
{
qDebug().noquote() << QString(" ").repeated(level) << iter.value();
++level;
Print(iter.key());
--level;
}
}
#endif
#ifdef _DEBUG
void Print(std::vector<TrieNode*>* fileNodes)
{
for (TrieNode* fileNode : *fileNodes)
{
qDebug().noquote() << fileNode->parent->childs[fileNode];
}
}
#endif
//main.cpp
#include "scanner.h"
#include <QStorageInfo>
#include <chrono>
void TestScanDrives(Scanner& scanner, const QStringList& drives)
{
scanner.scanDrives(drives);
while (!scanner.isScanCompleted())
{
std::this_thread::sleep_for(std::chrono::seconds(1));
}
}
int main()
{
QStringList drives;
for (const QStorageInfo& drive : QStorageInfo::mountedVolumes())
{
if (drive.isValid() && drive.isReady())
{
drives << drive.rootPath();
}
}
for (int i = 2; i <= 10; i += 2)
{
Scanner scanner(i);
TestScanDrives(scanner, drives);
}
return 0;
}
Я тщательно просмотрел реализацию класса WorkStealQueue, особенно методы pop и push, чтобы убедиться, что они правильно справляются с задачами. Я изменил порядок памяти атомарной переменной m_taskCount на разные значения, чтобы посмотреть, повлияло ли это на проблему. Я попытался улучшить логику синхронизации потоков и завершения задач в WorkStealThreadPool, но существенных улучшений не увидел.
Не связано, но это if (root == nullptr || root->childs.empty()) return;
утечка памяти. Должно быть if (root == nullptr) return;
. На самом деле вы не можете удалить root
, если у него нет детей.
Спасибо за ваш отзыв. Оказывается, утечка памяти действительно была вызвана моей оплошностью, и сейчас я ее исправил. Я очень ценю вашу помощь!
у вас есть несинхронизированный доступ и изменение m_taskQueues.size()
и m_taskQueues[index]
for (std::size_t i = 0; i < threadNums; ++i)
{
m_taskQueues.emplace_back(std::make_unique<WorkStealQueue>());
m_workThreads.emplace_back(&WorkStealThreadPool::worker, this, i);
}
когда рабочий поток создается, он может попытаться украсть работу из потока после него, и m_taskQueues.size()
не будет синхронизирован с m_taskQueues[index]
(поскольку в m_taskQueues
нет мьютекса, а другой поток может видеть увеличение размера, но не видеть измененные векторные данные, потому что это гонка, к тому же он может не увидеть полностью построенную очередь, поскольку это не атомарные указатели), поэтому разделите этот цикл на два.
for (std::size_t i = 0; i < threadNums; ++i)
{
m_taskQueues.emplace_back(std::make_unique<WorkStealQueue>());
}
for (std::size_t i = 0; i < threadNums; ++i)
{
m_workThreads.emplace_back(&WorkStealThreadPool::worker, this, i);
}
создание потока — это точка синхронизации, когда это произойдет, все очереди будут полностью построены, поэтому гонок на m_taskQueues.size()
не будет.
Спасибо за ваш ответ. Сейчас я нахожусь на занятиях, но попробую внести предложенные вами изменения и вернусь к вам позже.
Это сработало! Еще раз спасибо за ваше предложение; он исправил ошибку, которая беспокоила меня все утро.
Используйте отладчик для проверки стека вызовов и переменных во время сбоя.