Я пытаюсь открыть несколько потоков через цикл, где каждый поток является экземпляром класса, конструктор которого перегружен таким образом, что он автоматически запускает нужный код, эта функция возвращает unordered_list, и я хотел бы получить его для этого конкретного экземпляра. затем добавить в окончательный unordered_list
Я пытался использовать фьючерсы и обещания, но в конечном итоге я запутался, когда пытался. Этот проект призван бросить мне вызов и помочь мне изучить многопоточность в C++.
//class to be instantiated per thread
class WordCounter {
public:
std::unordered_map<std::string, int> thisWordCount;
std::string word;
WordCounter(std::string filepath) {}//will be overloaded
~WordCounter() {}//destructor
std::unordered_map<std::string, int>operator()(std::string filepath) const {}//overloaded constructor signature
std::unordered_map<std::string, int>operator()(std::string currentFile) {//overloaded constructor implementation
fstream myReadFile;
myReadFile.open(currentFile);
if (!!!myReadFile) {
cout << "Unable to open file";
exit(1); // terminate with error
}
else if (myReadFile.is_open()) {
while (!myReadFile.eof()) {
while (myReadFile >> word) {
++thisWordCount[word];
}
}
}
myReadFile.close();
return thisWordCount;
}
};
int main(int argc, char** argv)
{
std::vector<std::thread> threads;//store instantiated threads using WordCounter
static std::unordered_map<std::string, int> finalWordCount; //append result from each thread to this unordered_list only when a particular thread finish's reading a file
vector<string> fileName = { "input1.txt" , "input2.txt" };//filepaths to the files used
for (int i = 0; i < fileName.size(); ++i)//loop through vector of filepaths to open a thread for each file to then be processed by that thread
{
std::string currentFile = DIR + fileName[i];
std::thread _newThread(new WordCount(currentFile); //is this how the thread would be created?
threads.emplace_back(_newThread);//store new thread in a vector
//I want to read through the vector when a particular thread finishes and append that particular threads result to finalWordCount
}
}
Начнем с написания многопоточной countWords
функции. Это даст нам общее представление о том, что должен делать код, а затем мы заполним недостающие части.
countWords
countWords
подсчитывает частоты слов в каждом файле в векторе имен файлов. Делает это параллельно.
Обзор шагов:
finalWordCount
)WordCounter
, когда это будет сделаноWordCounter
.finalWordCount
Объект WordCounter
принимает имя файла в качестве входных данных при запуске потока.
Отсутствующие детали:
makeWordCounter
Реализация:
using std::unordered_map;
using std::string;
using std::vector;
unordered_map<string, int> countWords(vector<string> const& filenames) {
// Create vector of threads
vector<std::thread> threads;
threads.reserve(filenames.size());
// We have to have a lock because maps aren't thread safe
std::mutex map_lock;
// The final result goes here
unordered_map<std::string, int> totalWordCount;
// Define the callback function
// This operation is basically free
// Internally, it just copies a reference to the mutex and a reference
// to the totalWordCount
auto callback = [&](unordered_map<string, int> const& partial_count) {
// Lock the mutex so only we have access to the map
map_lock.lock();
// Update the map
for(auto count : partial_count) {
totalWordCount[count.first] += count.second;
}
// Unlock the mutex
map_lock.unlock();
};
// Create a new thread for each file
for(auto& file : filenames) {
auto word_counter = makeWordCounter(callback);
threads.push_back(std::thread(word_counter, file));
}
// Wait until all threads have finished
for(auto& thread : threads) {
thread.join();
}
return totalWordCount;
}
makeWordCounter
Наша функция makeWordCounter
очень проста: она просто создает функцию WordCounter
по шаблону обратного вызова.
template<class Callback>
WordCounter<Callback> makeWordCounter(Callback const& func) {
return WordCounter<Callback>{func};
}
WordCounter
Переменные-члены:
Функции
operator()
вызывает countWordsFromFilename
с именем файлаcountWordsFromFilename
открывает файл, убеждается, что все в порядке, и вызывает countWords
с файловым потокомcountWords
читает все слова в файловом потоке и подсчитывает количество, а затем вызывает обратный вызов с окончательным подсчетом.Поскольку WordCounter
очень прост, я просто сделал его структурой. Ему нужно только сохранить функцию Callback
, и, сделав функцию callback
общедоступной, нам не нужно писать конструктор (компилятор обрабатывает его автоматически, используя агрегатную инициализацию).
template<class Callback>
struct WordCounter {
Callback callback;
void operator()(std::string filename) {
countWordsFromFilename(filename);
}
void countWordsFromFilename(std::string const& filename) {
std::ifstream myFile(filename);
if (myFile) {
countWords(myFile);
}
else {
std::cerr << "Unable to open " + filename << '\n';
}
}
void countWords(std::ifstream& filestream) {
std::unordered_map<std::string, int> wordCount;
std::string word;
while (!filestream.eof() && !filestream.fail()) {
filestream >> word;
wordCount[word] += 1;
}
callback(wordCount);
}
};
Вы можете увидеть полный код для countWords
здесь: https://pastebin.com/WjFTkNYF
Единственное, что я добавил, это #include
s.
Шаблоны — это простой и полезный инструмент для написания кода. Их можно использовать для устранения взаимных зависимостей; сделать алгоритмы универсальными (чтобы их можно было использовать с любыми типами, которые вам нравятся); и они даже могут сделать код быстрее и эффективнее, позволяя избежать вызовов виртуальных функций-членов или указателей на функции.
Давайте посмотрим на действительно простой шаблон класса, представляющий пару:
template<class First, class Second>
struct pair {
First first;
Second second;
};
Здесь мы объявили pair
как struct
, потому что мы хотим, чтобы все члены были общедоступными.
Обратите внимание, что нет ни типа First
, ни типа Second
. Когда мы используем имена First
и Second
, на самом деле мы говорим, что «в контексте класса pair
имя First
будет представлять First
аргумент парного класса, а имя Second
будет представлять второй элемент класса парный класс.
Мы могли бы просто написать это так:
// This is completely valid too
template<class A, class B>
struct pair {
A first;
B second;
};
Использовать pair
довольно просто:
int main() {
// Create pair with an int and a string
pair<int, std::string> myPair{14, "Hello, world!"};
// Print out the first value, which is 14
std::cout << "int value: " << myPair.first << '\n';
// Print out the second value, which is "Hello, world!"
std::cout << "string value: " << myPair.second << '\n';
}
Как и обычный класс, pair
может иметь функции-члены, конструктор, деструктор... что угодно. Поскольку pair
такой простой класс, компилятор автоматически генерирует для нас конструктор и деструктор, и нам не нужно о них беспокоиться.
Шаблонные функции похожи на обычные функции. Единственная разница в том, что они имеют объявление template
перед остальным объявлением функции.
Давайте напишем простую функцию для печати пары:
template<class A, class B>
std::ostream& operator<<(std::ostream& stream, pair<A, B> pair)
{
stream << '(' << pair.first << ", " << pair.second << ')';
return stream;
}
Мы можем дать ему любое pair
, какое захотим, если оно знает, как напечатать оба элемента пары:
int main() {
// Create pair with an int and a string
pair<int, std::string> myPair{14, "Hello, world!"};
std::cout << myPair << '\n';
}
Это выводит (14, Hello, world)
.
В C++ нет типа Callback
. Нам он не нужен. Обратный вызов — это просто то, что вы используете, чтобы указать, что что-то произошло.
Давайте рассмотрим простой пример. Эта функция ищет прогрессивно увеличивающиеся числа, и каждый раз, когда она их находит, она вызывает output
, который является параметром, который мы предоставили. В данном случае output
— это обратный вызов, и мы используем его, чтобы указать, что найдено новое наибольшее число.
template<class Func>
void getIncreasingNumbers(std::vector<double> const& nums, Func output)
{
// Exit if there are no numbers
if (nums.size() == 0)
return;
double biggest = nums[0];
// We always output the first one
output(biggest);
for(double num : nums)
{
if (num > biggest)
{
biggest = num;
output(num);
}
}
}
Мы можем использовать getIncreasingNumbers
по-разному. Например, мы можем отфильтровать числа, которые не были больше предыдущего:
std::vector<double> filterNonIncreasing(std::vector<double> const& nums)
{
std::vector<double> newNums;
// Here, we use an & inside the square brackets
// This is so we can use newNums by reference
auto my_callback = [&](double val) {
newNums.push_back(val);
};
getIncreasingNumbers(nums, my_callback);
return newNums;
}
Или мы можем распечатать их:
void printNonIncreasing(std::vector<double> const& nums)
{
// Here, we don't put anything in the square brackts
// Since we don't access any local variables
auto my_callback = [](double val) {
std::cout << "New biggest number: " << val << '\n';
};
getIncreasingNums(nums, my_callback);
}
Или мы можем найти самый большой разрыв между ними:
double findBiggestJumpBetweenIncreasing(std::vector<double> const& nums)
{
double previous;
double biggest_gap = 0.0;
bool assigned_previous = false;
auto my_callback = [&](double val) {
if (not assigned_previous) {
previous = val;
assigned_previous = true;
}
else
{
double new_gap = val - previous;
if (biggest_gap < new_gap) {
biggest_gap = new_gap;
}
}
};
getIncreasingNums(nums, my_callback);
return biggest_gap;
}
Ваши перегрузки operator() в WordCounter принимают аргументы и должны предоставлять эти аргументы при запуске std::thread с WordCounter. Кроме того, вы не можете копировать поток; вам нужно либо переместить его в вектор с помощью std::move, либо построить его на месте вызова (что я и сделал в приведенном выше коде)
Спасибо за разъяснения, очень интересно! Аргумент, переданный в конструктор, представляет собой строку, содержащую путь к конкретному файлу из комбинации каталога и имени файла, через который проходит цикл, который затем считывается в myReadFile.open().
Конечно! Просто убедитесь, что вы не объединяете каталог и имя файла дважды (так что, возможно, напечатайте имя файла в операторе WordCounter () просто для проверки)
Теперь мне нужно выяснить, как читать возвращаемое значение для каждого экземпляра потока. Затем я могу добавить этот результат (возвращаемое значение) к finalWordCount одновременно, без состояния гонки или других нежелательных последствий использования потоков, которые, как я предполагаю, затем использую std::mutex для предоставления семафора (маркера) для finalWordCount? и получение возвращаемого результата будет означать использование фьючерсов, верно? Как это реализовано?
Пусть WordCounter сохранит указатель или ссылку на то место, куда он должен вставить результат. В качестве альтернативы можно сохранить обратный вызов, который вызывается после его завершения, и сделать обратный вызов ответственным за обновление результата. Если вы сделаете finalWordCount
атомарным целым числом (поищите std::atomic), вам не нужно будет делать какие-либо блокировки при его обновлении.
Хотите, чтобы я обновил свой ответ простым примером того, как их использовать?
Конечно! Это было бы очень полезно
Я повторил ответ! Дайте мне знать, что вы думаете!
Конечно! Дайте знать, если у вас есть еще вопросы!
Не могли бы вы объяснить, как вы использовали шаблоны и обратные вызовы? Это области, которых я не касался, все, что я знал раньше, это то, что вы можете вызывать функцию в соответствии с лямбда-функцией [=](){ somecodetoexecute; }. более конкретно, есть тип данных обратного вызова? Это один из способов использования обратных вызовов или довольно стандартный?
Типа данных обратного вызова не существует, и это всего лишь один из способов использования обратных вызовов. Я добавил к ответу «Шаблоны и обратные вызовы 101»!
Фантастический! Я перечитал и увидел, насколько глупым был мой вопрос! И последнее... не сделает ли ресурс finalWordCount ссылкой в каждом потоке снова последовательным весь процесс? Если каждый поток обращается к нему по одному с помощью семафора (мьютекса), это может разрушить всю суть многопоточности, верно? Нет прироста производительности по сравнению с последовательным чтением этих файлов без потоков.
Не существует поточно-безопасного способа добавления элементов на карту, поэтому вам придется использовать мьютекс. Если бы вы уже знали, какие слова проверять, вы могли бы заранее поместить эти слова в карту, а затем вы могли бы делать что-то с атомами (избегая блокировки), но вы не могли бы добавлять новые слова в карту без мьютекс. Шаг синхронизации (часть, для которой должен использоваться мьютекс) обычно является узким местом параллельного кода, но вы все равно получите ускорение, если большая часть работы может быть выполнена до синхронизации (а не во время синхронизации). ).
При этом узким местом в приведенном выше коде является чтение файлов. Ваш жесткий диск может считывать ограниченное количество данных в секунду, и вполне вероятно, что даже однопоточный код может обрабатывать данные быстрее, чем они могут быть прочитаны с жесткого диска. Лучше всего было бы читать файл в одном потоке и обрабатывать его в другом потоке.
Эй, большое спасибо за ваше понимание и использование шаблонов, это заставило меня узнать немного больше о cpp. С чтением и записью из одного и того же буфера это было бы предельным ограничением с точки зрения полного использования многопоточности в этом сценарии (и многих проблем ввода-вывода, которые необходимо решить). Можно ли создать несколько буферов, которые могут одновременно разделять и захватывать данные, предполагая, что порядок чтения данных не имеет значения? И во время процесса «разделяй и властвуй» могут быть очереди сообщений, сообщающие о прогрессе и намерениях во время чтения. например: начальный и конечный диапазон байтов, которые будут прочитаны?
Когда вы говорите о буферах, вы имеете в виду файловые буферы? Или вы имеете в виду std::unordered_map, используемый для генерации частичных подсчетов частот слов для каждого файла? В случае файлового ввода-вывода скорость буфера обычно ограничивается самим жестким диском или твердотельным накопителем, поэтому создание нескольких буферов для одного и того же файла не приведет к ускорению обработки. С другой стороны, если у вас есть отдельные файлы, поступающие с разных дисков, или если у вас есть поток данных, поступающих из сети, то может иметь смысл иметь несколько буферов ввода-вывода.
Я не предоставил ненужного дополнительного кода, поэтому для int main требуется return 0; а currentFile — это DIR (каталог) + текущий путь к файлу, который мы используем. Так что я думаю, что мы передали бы currentPath в конструктор WordCounter. который используется в реализации перегруженного конструктора в моем классе