В приведенном ниже коде перед нами стояла задача создать программу-потребитель/производитель с использованием многопоточности и найти способ предотвращения взаимоблокировок. Это мой код:
#include <iostream>
#include <stack>
#include <time.h>
#include <mutex>
#include <condition_variable>
#include <random>
#include <thread>
#include <windows.h>
#include <algorithm>
#include <stack>
#define MAX 100
#define MIN 0
std::condition_variable cv;
std::mutex mtx;
class ProducerConsumer{
public:
// Creates a stack
std::stack<int> stack_array;
// Facilitates checking of values
int producer_sum = 0;
int consumer_sum = 0;
// Generates random number
int random_number_generator(){
std::random_device rd;
std::default_random_engine re{rd()};
return re();
}
// Produces values and pushes them into the stack
void producer(){
for (int i = 0 ; i < MAX ; i++){
std::unique_lock<std::mutex> lg(mtx);
if (stack_array.size() == MAX){
std::cout << "Producer Thread " << std::this_thread::get_id() << " is waiting" << std::endl;
cv.wait(lg);
}
int rand_num = random_number_generator()%10+1;
int rand_num_sleep = random_number_generator()%100+1;
std::cout << "Current Stack Size - " << stack_array.size() << " | Pushing: " << rand_num << "... " << "and sleeping for: " << rand_num_sleep << " milliseconds" << std::endl;
producer_sum += rand_num;
stack_array.push(rand_num);
std::this_thread::sleep_for(std::chrono::milliseconds(rand_num_sleep));
cv.notify_all();
lg.unlock();
}
}
void consumer(){
for (int i = 0 ; i < MAX ; i++) {
std::unique_lock<std::mutex> lg(mtx);
if (stack_array.size() == MIN){
std::cout << "Consumer Thread " << std::this_thread::get_id() << " is waiting" << std::endl;
cv.wait(lg);
}
int rand_num_sleep = random_number_generator()%100+1;
std::cout << "Current Stack Size - " << stack_array.size() << " | Popping: " << stack_array.top() << "... " << "and sleeping for: " << rand_num_sleep << " milliseconds" << std::endl;
consumer_sum += stack_array.top();
stack_array.pop();
std::this_thread::sleep_for(std::chrono::milliseconds(rand_num_sleep));
cv.notify_all();
lg.unlock();
}
}
};
int main(){
ProducerConsumer pc;
std::thread producer_one (&ProducerConsumer::producer, &pc);
std::thread consumer_one (&ProducerConsumer::consumer, &pc);
std::thread producer_two (&ProducerConsumer::producer, &pc);
std::thread consumer_two (&ProducerConsumer::consumer, &pc);
producer_one.join();
producer_two.join();
consumer_one.join();
consumer_two.join();
std::cout << "Producer sum: " << pc.producer_sum << std::endl;
std::cout << "Consumer sum: " << pc.consumer_sum << std::endl;
std::cout << "Stack size: " << pc.stack_array.size() << std::endl;
}
При попытке запустить код бывают случаи, когда код сталкивается с взаимоблокировкой из-за того, что я предполагаю, что программа начинается с потока производителя (что понятно, поскольку я не запрограммировал способ обойти это). Но иногда код работает отлично. В других случаях программа получает ошибку сегментации или сталкивается с взаимоблокировкой в середине выполнения.
Я предполагаю, что моя реализация взаимного исключения является правильной. Если нет, может ли кто-нибудь объяснить, что не так с моим кодом? Любая помощь приветствуется. Большое спасибо!
условная переменная ожидания всегда должна быть в цикле, потому что она может проснуться, а другой поток уже принял элемент, или может проснуться без всякой причины.
Конечно, компилятор не сообщит вам об ошибке, если нет цикла. Я имею в виду, что переменная бесполезна без цикла, потому что она ненадежна.
Ваши производители используют notify_all
, и у вас есть два потребителя, это означает, что после добавления одного элемента вы будите обоих потребителей (и обоих производителей), и они оба пытаются получить один элемент. Конечно, у одного из них не будет товара, если только другой производитель не произведет товар (случайно) за это время.
Вместо:
if (stack_array.size() == MAX){
std::cout << "Producer Thread " << std::this_thread::get_id() << " is waiting" << std::endl;
cv.wait(lg);
}
вы должны просто изменить if
на while
- и то же самое в потребителях.
Обратите внимание, что зацикливание требуется, даже если вы внимательно следите за балансом количества уведомлений и потребления, потому что wait
может вернуться без всякой причины. В некоторых операционных системах иногда wait
не уверен, произошло уведомление или нет, поэтому он все равно перестает ждать. Это называется «ложным пробуждением». Затем вы проверяете, заполнена ли очередь, и если она все еще заполнена, вы ждете еще немного.
я бы всегда использовал другую перегрузку, которая принимает предикат и инкапсулирует цикл. Я нахожу немного странным, что эти две перегрузки с одним и тем же именем, но такие разные
Вам нужно, чтобы производители ждали, пока что-то будет потреблено, а потребители ждали, пока что-то было произведено. Прямо сейчас ваши потоки просто продолжают работать, когда бы они ни проснулись, независимо от того, что произошло. Например, потребитель не проверяет, есть ли что-нибудь в стеке, когда просыпается, но другой потребитель может забрать все, что было произведено, пока он спал. А затем он пытается вытолкнуть стек, и программа начинает бум.
Также необходимо защититься от «ложных пробуждений».
К счастью, есть перегрузка wait
, которая помогает справиться с обеими проблемами:
if (stack_array.size() == MAX){
std::cout << "Producer Thread " << std::this_thread::get_id() << " is waiting" << std::endl;
cv.wait(lg, [&stack_array]() { return stack_array.size() < MAX; });
}
и
if (stack_array.size() == MIN){
std::cout << "Consumer Thread " << std::this_thread::get_id() << " is waiting" << std::endl;
cv.wait(lg, [&stack_array]() { return stack_array.size() > MIN; });
}
... и, по крайней мере, как правило, вы обычно хотите использовать перегрузку wait
, которая принимает условие для ожидания.
возможно, стоит упомянуть, что в дополнение к проблеме, описанной в ответе, ваш код на самом деле не использует потоки для параллельной работы. Все они сохраняют блокировку одного и того же мьютекса на протяжении всей итерации, и только когда очередь заполнена/пуста, они останавливаются и продолжается другой поток. Рабочую нагрузку, которую вы имитируете с помощью сна, следует выполнять, не удерживая блокировку.