У нас есть функция, которую вызывает один поток (мы называем это основным потоком). В теле функции мы создаем несколько рабочих потоков для выполнения интенсивной работы с ЦП, ожидаем завершения всех потоков, а затем возвращаем результат в основной поток.
В результате вызывающий может наивно использовать функцию, а внутри он будет использовать несколько ядер.
Пока все хорошо ..
Проблема в том, что мы имеем дело с исключениями. Мы не хотим, чтобы исключения в рабочих потоках приводили к сбою приложения. Мы хотим, чтобы вызывающий объект функции мог перехватить их в основном потоке. Мы должны перехватывать исключения в рабочих потоках и распространять их на основной поток, чтобы они продолжали раскручиваться оттуда.
Как мы можем это сделать?
Лучшее, о чем я могу думать, это:
Это имеет очевидный недостаток, заключающийся в поддержке только ограниченного набора типов исключений и требует модификации всякий раз, когда добавляются новые типы исключений.





Действительно, нет хорошего и универсального способа передачи исключений из одного потока в другой.
Если, как и должно быть, все ваши исключения являются производными от std :: exception, тогда у вас может быть общий перехват исключения верхнего уровня, который каким-то образом отправит исключение в основной поток, где оно будет создано снова. Проблема в том, что вы теряете точку выброса исключения. Вы, вероятно, можете написать код, зависящий от компилятора, чтобы получить эту информацию и передать ее.
Если не все ваши исключения наследуют std :: exception, тогда у вас проблемы и вам нужно написать много ловушек верхнего уровня в своем потоке ... но решение все еще остается в силе.
Не могли бы вы сериализовать исключение в рабочем потоке, передать его обратно в основной поток, десериализовать и снова выбросить? Я ожидаю, что для того, чтобы это сработало, все исключения должны быть производными от одного и того же класса (или, по крайней мере, небольшого набора классов с оператором switch снова). Кроме того, я не уверен, что они будут сериализуемыми, я просто думаю вслух.
@Nawaz, потому что исключение, вероятно, имеет ссылки на локальные переменные потока, которые не доступны автоматически другим потокам.
Вам нужно будет сделать общий перехват для всех исключений в работнике (включая исключения, не являющиеся стандартными, например, нарушения доступа), и отправить сообщение из рабочего потока (я полагаю, у вас есть какой-то обмен сообщениями?) В управляющий поток, содержащий активный указатель на исключение, и повторно вызвать его, создав копию исключения. Затем рабочий может освободить исходный объект и выйти.
Ваша проблема в том, что вы можете получить несколько исключений из нескольких потоков, поскольку каждый может выйти из строя, возможно, по разным причинам.
Я предполагаю, что основной поток каким-то образом ожидает завершения потоков, чтобы получить результаты, или регулярно проверяет ход выполнения других потоков, и что доступ к общим данным синхронизируется.
Простым решением было бы перехватить все исключения в каждом потоке, записать их в общую переменную (в основном потоке).
После завершения всех потоков решите, что делать с исключениями. Это означает, что все остальные потоки продолжили свою обработку, что, возможно, не то, что вам нужно.
Более сложное решение состоит в том, чтобы каждый из ваших потоков проверял стратегические точки своего выполнения, если исключение было выброшено из другого потока.
Если поток генерирует исключение, оно перехватывается перед выходом из потока, объект исключения копируется в некоторый контейнер в основном потоке (как в простом решении), а для некоторой общей логической переменной устанавливается значение true.
И когда другой поток проверяет это логическое значение, он видит, что выполнение должно быть прервано, и прерывает его корректным образом.
Когда все потоки были прерваны, основной поток может обработать исключение по мере необходимости.
Исключение, созданное из потока, не будет перехвачено в родительском потоке. У потоков есть разные контексты и стеки, и, как правило, родительскому потоку не требуется оставаться там и ждать завершения дочерних процессов, чтобы он мог перехватить их исключения. Для этой уловки в коде просто нет места:
try
{
start thread();
wait_finish( thread );
}
catch(...)
{
// will catch exceptions generated within start and wait,
// but not from the thread itself
}
Вам нужно будет перехватывать исключения внутри каждого потока и интерпретировать статус выхода из потоков в основном потоке, чтобы повторно генерировать любые исключения, которые могут вам понадобиться.
Кстати, при отсутствии улова в потоке это зависит от реализации, если вообще будет выполняться раскрутка стека, т.е. деструкторы ваших автоматических переменных могут даже не вызываться до вызова terminate. Некоторые компиляторы это делают, но это не обязательно.
В настоящее время единственный способ портативный - это написать предложения catch для всех типов исключений, которые вы можете передавать между потоками, сохранить информацию где-нибудь из этого предложения catch, а затем использовать ее позже для повторной генерации исключения. Это подход, принятый Boost.Exception.
В C++ 0x вы сможете перехватить исключение с помощью catch(...), а затем сохранить его в экземпляре std::exception_ptr с помощью std::current_exception(). Затем вы можете повторно загрузить его позже из того же или другого потока с помощью std::rethrow_exception().
Если вы используете Microsoft Visual Studio 2005 или новее, то just :: thread библиотека потоков C++ 0x поддерживает std::exception_ptr. (Отказ от ответственности: это мой продукт).
Теперь это часть C++ 11 и поддерживается MSVS 2010; см. msdn.microsoft.com/en-us/library/dd293602.aspx.
Он также поддерживается gcc 4.4+ в Linux.
Круто, есть ссылка на пример использования: en.cppreference.com/w/cpp/error/exception_ptr
См. http://www.boost.org/doc/libs/release/libs/exception/doc/tutorial_exception_ptr.html. Также можно написать функцию-оболочку для любой функции, которую вы вызываете для присоединения к дочернему потоку, которая автоматически повторно генерирует (с помощью boost :: rethrow_exception) любое исключение, испускаемое дочерним потоком.
Если вы используете C++ 11, то std::future может делать именно то, что вы ищете: он может автоматически перехватывать исключения, которые попадают в верхнюю часть рабочего потока, и передавать их родительскому потоку в точке, где std::future::get называется. (За кулисами это происходит точно так же, как в ответе @AnthonyWilliams; это уже было реализовано для вас.)
Обратной стороной является то, что не существует стандартного способа «перестать заботиться» о std::future; даже его деструктор просто заблокируется, пока задача не будет выполнена. [РЕДАКТИРОВАТЬ, 2017: Поведение блокирующего деструктора - это неправильная функция Только псевдофьючерсов, возвращаемых из std::async, которую вы никогда не должны использовать в любом случае. Обычные фьючерсы не блокируются в их деструкторе. Но вы по-прежнему не можете «отменить» задачи, если используете std::future: задачи, выполняющие обещания, будут продолжать выполняться за кулисами, даже если никто больше не ожидает ответа.] Вот игрушечный пример, который может прояснить, что я имею в виду:
#include <atomic>
#include <chrono>
#include <exception>
#include <future>
#include <thread>
#include <vector>
#include <stdio.h>
bool is_prime(int n)
{
if (n == 1010) {
puts("is_prime(1010) throws an exception");
throw std::logic_error("1010");
}
/* We actually want this loop to run slowly, for demonstration purposes. */
std::this_thread::sleep_for(std::chrono::milliseconds(100));
for (int i=2; i < n; ++i) { if (n % i == 0) return false; }
return (n >= 2);
}
int worker()
{
static std::atomic<int> hundreds(0);
const int start = 100 * hundreds++;
const int end = start + 100;
int sum = 0;
for (int i=start; i < end; ++i) {
if (is_prime(i)) { printf("%d is prime\n", i); sum += i; }
}
return sum;
}
int spawn_workers(int N)
{
std::vector<std::future<int>> waitables;
for (int i=0; i < N; ++i) {
std::future<int> f = std::async(std::launch::async, worker);
waitables.emplace_back(std::move(f));
}
int sum = 0;
for (std::future<int> &f : waitables) {
sum += f.get(); /* may throw an exception */
}
return sum;
/* But watch out! When f.get() throws an exception, we still need
* to unwind the stack, which means destructing "waitables" and each
* of its elements. The destructor of each std::future will block
* as if calling this->wait(). So in fact this may not do what you
* really want. */
}
int main()
{
try {
int sum = spawn_workers(100);
printf("sum is %d\n", sum);
} catch (std::exception &e) {
/* This line will be printed after all the prime-number output. */
printf("Caught %s\n", e.what());
}
}
Я просто попытался написать аналогичный пример с использованием std::thread и std::exception_ptr, но что-то пошло не так с std::exception_ptr (с использованием libC++), поэтому я еще не получил его, чтобы он действительно работал. :(
[РЕДАКТИРОВАТЬ, 2017:
int main() {
std::exception_ptr e;
std::thread t1([&e](){
try {
::operator new(-1);
} catch (...) {
e = std::current_exception();
}
});
t1.join();
try {
std::rethrow_exception(e);
} catch (const std::bad_alloc&) {
puts("Success!");
}
}
Понятия не имею, что я делал не так в 2013 году, но уверен, что это была моя вина.]
Почему вы назначаете будущее творений названному f, а затем emplace_back? Не могли бы вы просто сделать waitables.push_back(std::async(…));, или я что-то не замечаю (он компилируется, вопрос в том, может ли утечка, но я не понимаю, как)?
Кроме того, есть ли способ раскрутить стек, прервав фьючерс вместо wait? Что-то вроде «как только одна из работ потерпела неудачу, другие больше не имеют значения».
Спустя 4 года мой ответ не устарел. :) По поводу "Почему": я думаю, это было просто для ясности (чтобы показать, что async возвращает будущее, а не что-то еще). Re "Также есть там": нет в std::future, но см. Доклад Шона Родителя «Лучший код: параллелизм» или мой «Фьючерсы с нуля», чтобы узнать о различных способах реализации этого, если вы не против переписать весь STL для начала. :) Ключевой термин для поиска - «отмена».
Спасибо за ответ. Я обязательно посмотрю переговоры, когда найду минутку.
Хорошая редакция 2017 года. То же, что принято, но с указателем на исключение с ограниченной областью действия. Я бы поставил его наверху и, может быть, даже избавился от остальных.
C++ 11 представил тип exception_ptr, который позволяет передавать исключения между потоками:
#include<iostream>
#include<thread>
#include<exception>
#include<stdexcept>
static std::exception_ptr teptr = nullptr;
void f()
{
try
{
std::this_thread::sleep_for(std::chrono::seconds(1));
throw std::runtime_error("To be passed between threads");
}
catch(...)
{
teptr = std::current_exception();
}
}
int main(int argc, char **argv)
{
std::thread mythread(f);
mythread.join();
if (teptr) {
try{
std::rethrow_exception(teptr);
}
catch(const std::exception &ex)
{
std::cerr << "Thread exited with exception: " << ex.what() << "\n";
}
}
return 0;
}
Поскольку в вашем случае у вас есть несколько рабочих потоков, вам нужно будет сохранить по одному exception_ptr для каждого из них.
Обратите внимание, что exception_ptr - это общий указатель, подобный ptr, поэтому вам нужно будет оставить хотя бы один exception_ptr, указывающий на каждое исключение, иначе они будут выпущены.
Специфика Microsoft: если вы используете исключения SEH (/EHa), пример кода также будет передавать исключения SEH, такие как нарушения доступа, что может быть не тем, что вам нужно.
А как насчет нескольких потоков, созданных из основного? Если первый поток встречает исключение и завершает работу, main () будет ждать во втором потоке join (), который может работать бесконечно. main () никогда не сможет протестировать teptr после двух соединений joins (). Кажется, что все потоки должны периодически проверять глобальный teptr и при необходимости выходить. Есть ли чистый способ справиться с этой ситуацией?
Зачем нужно сериализовать его, если оба потока находятся в одном процессе?