Сбой потока Boost

Могу ли я использовать boost thread + atomic, построенный с флагом С++ 20. Я не нашел ничего, что упоминало бы об этой возможности в документации по ускорению этих библиотек.

У меня было приложение, которое отлично работает с gcc 7.1 c++17 boost 1.75, но при обновлении до gcc 11.1 c++20 у меня произошел сбой в потоке boost

Sanitizer не сообщает о каких-либо проблемах.

Программа использует boost condition_variable.

Live On Compiler Explorer

#include "boost/thread/thread.hpp"
#include "boost/shared_ptr.hpp"
#include "boost/date_time/posix_time/posix_time.hpp"
#include "boost/utility.hpp"
#include "boost/thread/condition_variable.hpp"
#include <boost/thread/thread.hpp>
#include <thread>

#include <algorithm>
#include <cassert>
#include <atomic>


#include <vector>

class Dispatcher;

class Task
{
public:
   virtual void run() = 0;

   virtual ~Task() {};
};


class TaskPool : boost::noncopyable
{
public:
   typedef boost::shared_ptr<Task>            task_ptr_type;
   typedef boost::shared_ptr<Dispatcher>  dispatcher_ptr_type;
   typedef std::vector<dispatcher_ptr_type>         thread_pool_type;
   typedef boost::posix_time::time_duration         time_duration_type;
   typedef std::size_t                              size_type;

   TaskPool(const size_type Size);

   ~TaskPool();

   size_type maxSize() const { return max_size_; }

   size_type watermark() const { return watermark_; }

   void setInactivTm(const time_duration_type& Inactivity_Time_Out);

   const time_duration_type& getInactivTm() const { return inactivity_time_out_; }

   void execute(const task_ptr_type& Task);

private:
   typedef boost::mutex            mutex_type;
   typedef mutex_type::scoped_lock lock_type;

   mutable mutex_type mutex_;

   size_type          max_size_;
   thread_pool_type * thread_pool_;

   time_duration_type inactivity_time_out_;

   size_type          watermark_;
   size_type          invocations_;
   size_type          executions_;
   size_type          loops_;
};


class Dispatcher : boost::noncopyable
{
public:
   typedef TaskPool::task_ptr_type      task_ptr_type;
   typedef TaskPool::time_duration_type time_duration_type;
   typedef TaskPool::size_type          size_type;

   Dispatcher();

   ~Dispatcher();

   void setInactivTm(const time_duration_type& Inactivity_Time_Out);

   bool waitReady(const time_duration_type& Time_Out);

   void execute(const task_ptr_type& Task);

   void terminate();

   static time_duration_type defaultActivTm()
   {
      return boost::posix_time::milliseconds( 1 );
   }

   static time_duration_type minActivTm()
   {
      return boost::posix_time::milliseconds( 1 );
   }

private:
   typedef boost::mutex              mutex_type;
   typedef boost::condition_variable condition_variable_type;
   typedef mutex_type::scoped_lock   lock_type;

   friend class Runner;

   bool Queued_() const volatile;
   bool NotQueued_() const volatile;

   void execute_();

   boost::thread thread_;
   task_ptr_type task_;

   mutable mutex_type      mutex_;
   condition_variable_type task_busy_cond_;
   condition_variable_type task_available_cond_;

   volatile bool is_terminated_;

   time_duration_type inactivity_time_out_;

   size_type invocations_;
   size_type executions_;
   size_type thread_created_;
   size_type thread_terminated_;
};


class Runner
{
public:
   explicit Runner(Dispatcher * Disp)
    : disp_( Disp )
   { }

   void operator()()
   {
      disp_->execute_();
   }

private:
   Dispatcher * const disp_;
};


Dispatcher::Dispatcher()
 : is_terminated_( false ),
   inactivity_time_out_( defaultActivTm() ),
   invocations_( 0 ),
   executions_( 0 ),
   thread_created_( 0 ),
   thread_terminated_( 0 )
{ }


Dispatcher::~Dispatcher()
{
   terminate();
}


void Dispatcher::setInactivTm(const time_duration_type& Inactivity_Time_Out)
{
   lock_type lock( mutex_ );

   inactivity_time_out_ = Inactivity_Time_Out;
   assert( inactivity_time_out_ >= minActivTm() );
}


bool Dispatcher::waitReady(const time_duration_type& Time_Out)
{
   lock_type lock( mutex_ );

   if ( !is_terminated_ &&
        (thread_.get_id() == boost::thread::id()) )
   {
      return true;
   }
   while ( Queued_() )
   {
      if ( !task_busy_cond_.timed_wait(lock,
                                       Time_Out) )
      {
         return false;
      }
   }
   return !is_terminated_;
}


void Dispatcher::execute(const task_ptr_type& Task)
{
   lock_type lock( mutex_ );

   if ( thread_.get_id() == boost::thread::id() )
   {
      //std::cout << "new thread\n";
      thread_created_ += 1;
      thread_ = boost::thread( Runner(this) );
   }
   while ( Queued_() )
   {
      task_busy_cond_.wait(lock);
   }
   if ( !is_terminated_ )
   {
      task_ = Task;
      task_available_cond_.notify_one();
   }
   invocations_ += 1;
}


void Dispatcher::terminate()
{
   is_terminated_ = true;

   thread_.interrupt();
   thread_.join();
}


bool Dispatcher::Queued_() const volatile
{
   return const_cast<const task_ptr_type&>(task_) &&
          !is_terminated_;
}


bool Dispatcher::NotQueued_() const volatile
{
   return !const_cast<const task_ptr_type&>(task_) &&
          !is_terminated_;
}


void Dispatcher::execute_()
{
   {
      lock_type lock( mutex_ );
      is_terminated_ = false;
   }

   while ( 1 )
   {
      task_ptr_type tmp_task;

      // Critical section.
      //
      {
         lock_type lock( mutex_ );

         while ( NotQueued_() )
         {
            if ( !task_available_cond_.timed_wait(lock,
                                                  inactivity_time_out_) )
            {
               thread_terminated_ += 1;
               thread_.detach();
               return;
            }
         }
         if ( is_terminated_ )
         {
            thread_terminated_ += 1;
            return;
         }
         tmp_task.swap( task_ );
         task_busy_cond_.notify_one();
      }
      // Execute task.
      //
      executions_ += 1;

      try
      {
         ////std::cout << "execution in progress\n";
         tmp_task->run();
         ////std::cout << "execution done\n";
      }
      catch (const boost::thread_interrupted&)
      {
         thread_terminated_ += 1;
         thread_.detach();
         return;
      }
      catch (...)
      {
         // Unexpected exception, ignore...
      }
   }
}


TaskPool::TaskPool(const size_type Size)
 : max_size_( Size ),
   thread_pool_( 0 ),
   inactivity_time_out_( Dispatcher::defaultActivTm() ),
   watermark_( 0 ),
   invocations_( 0 ),
   executions_( 0 ),
   loops_( 0 )
{
   assert( max_size_ > 0 );
   thread_pool_ = new thread_pool_type( max_size_ );
}


TaskPool::~TaskPool()
{
   delete thread_pool_;
}


void TaskPool::setInactivTm(const time_duration_type& Inactivity_Time_Out)
{
   lock_type lock( mutex_ );

   inactivity_time_out_ = Inactivity_Time_Out;
   assert( inactivity_time_out_ >= Dispatcher::minActivTm() );

   for (thread_pool_type::iterator iter = thread_pool_->begin();
        thread_pool_->end() != iter;
        ++iter)
   {
      dispatcher_ptr_type& p( *iter );

      if ( p )
      {
         p->setInactivTm( inactivity_time_out_ );
      }
   }
}


void TaskPool::execute(const task_ptr_type& Task)
{
   lock_type lock( mutex_ );

   invocations_ += 1;

   const time_duration_type min_iteration_timeout( boost::posix_time::microsec( 100 ) );
   const time_duration_type max_iteration_timeout( boost::posix_time::microsec( 100000 ) );

   time_duration_type timeout( 1 == max_size_ ? time_duration_type( boost::posix_time::pos_infin )
                                              : time_duration_type( boost::posix_time::microsec(0) ) );

   while ( 1 )
   {
      for (thread_pool_type::iterator iter = thread_pool_->begin();
           thread_pool_->end() != iter;
           ++iter)
      {
         dispatcher_ptr_type& p( *iter );

         loops_ += 1;

         if ( !p )
         {
            //std::cout << "new Dispatcher instance\n";
            p.reset( new Dispatcher );
            p->setInactivTm( inactivity_time_out_ );

            watermark_ = iter - thread_pool_->begin();
         }
         if ( p->waitReady( timeout ) )
         {
            p->execute( Task );
            executions_ += 1;
            return;
         }
      }
      if ( timeout != boost::posix_time::pos_infin )
      {
         timeout *= 2;

         timeout = std::max(timeout,
                            min_iteration_timeout);
         timeout = std::min(timeout,
                            max_iteration_timeout);
      }
   }
}


static TaskPool threadPool = 10;

class Wrapper : public Task
{
public:
   Wrapper()
   {
      listener = new Listener;
   }

   virtual void run()
   {
      boost::this_thread::sleep( boost::posix_time::seconds(10) );
      listener->run();
   }

   struct Listener
   {
      std::string s;
      void run()
      {
         s = "Hello";
         //std::cout << s << '\n';
      }
   };

   Listener* listener;
};

struct Executer
{
   std::vector<std::thread> threads;

   void dispatch()
   {
      //std::cout << "dispatch\n";
      for (auto i=0; i<2; ++i)
      {
         threads.push_back(std::move(std::thread([&]()
         {
            int index = 0;
            while (true)
            {
               {
                  //std::cout << "begin\n";
                  boost::shared_ptr<Wrapper> task( new Wrapper );
                  threadPool.execute( task );
                  //std::cout << "end\n";
               }

               if (index % 1000 == 0) boost::this_thread::sleep( boost::posix_time::seconds(5) );
            }
         })));
      }
   }

   ~Executer()
   {
      for (auto i=0; i<2; ++i) threads[i].join();
   }
};

int main()
{
  std::thread t1([](){Executer a; a.dispatch();});
  std::thread t2([](){Executer a; a.dispatch();});
  t1.join();
  t2.join();
}

Общий указатель имеет use_count_ = 1 weak_count_ = 1. Я не знаю, почему слабый счет не равен 0.

Любая помощь, как найти первопричину?

Вы сами собирали библиотеки Boost? Возможно, его придется пересобрать с помощью нового компилятора, особенно после такого большого скачка версии.

Some programmer dude 14.04.2023 08:58

Да я сам собирал, тестировал 1.75 и 1.81

Peter 14.04.2023 09:07

Для чего показывать 4 случайных имени идентификатора, но без кода? Мы, очевидно, не можем видеть проблему без. И вопрос «Могу ли я использовать boost thread + atomic, построенный с флагом С++ 20», очевидно, «да». Голосование за закрытие.

sehe 14.04.2023 19:33

Вот адаптированная версия кода здесь: godbolt.org/z/44vEeoqe7

Peter 14.04.2023 20:20

@Peter Я только случайно увидел этот комментарий. Вы, вероятно, должны были отредактировать вопрос - если это улучшит его

sehe 15.04.2023 13:34

@sehe, да, я отредактировал это вчера после твоего комментария

Peter 15.04.2023 13:40

Я не заметил. Я только что отредактировал его, так что это действительно улучшило вопрос.

sehe 15.04.2023 13:55

Программа-минимум показывает проблему? Что он должен делать? Это просто кажется, что висит для меня.

sehe 15.04.2023 14:02

Я не могу воспроизвести проблему. Это не висит. Нет сообщений для вывода

Peter 15.04.2023 14:05

Это действительно не говорит мне, что он должен делать. Он должен прекратиться? Вот инструментальная, слегка переработанная версия, которая хотя бы показывает некоторую информацию о прогрессе/работе (о, и уменьшила задержки на 5 и исправила ошибку с index % 1000): coliru.stacked-crooked.com/a/a196eabc72baa985. С самого начала он пахнет слишком сложно, и злоупотребление volatile тоже нехороший знак.

sehe 15.04.2023 14:31

Я запускаю адрес потока дезинфицирующего средства undefined, но не получаю никаких предупреждений. Я думаю, проблема где-то еще в моем коде, может быть в создании задачи (Wrapper). Я не знаю, как продвигаться вперед, у вас есть расширенные статьи о том, как расследовать сбой.

Peter 15.04.2023 14:41

Давайте продолжим обсуждение в чате.

sehe 15.04.2023 14:42
Стоит ли изучать PHP в 2023-2024 годах?
Стоит ли изучать PHP в 2023-2024 годах?
Привет всем, сегодня я хочу высказать свои соображения по поводу вопроса, который я уже много раз получал в своем сообществе: "Стоит ли изучать PHP в...
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
В JavaScript одним из самых запутанных понятий является поведение ключевого слова "this" в стрелочной и обычной функциях.
Приемы CSS-макетирования - floats и Flexbox
Приемы CSS-макетирования - floats и Flexbox
Здравствуйте, друзья-студенты! Готовы совершенствовать свои навыки веб-дизайна? Сегодня в нашем путешествии мы рассмотрим приемы CSS-верстки - в...
Тестирование функциональных ngrx-эффектов в Angular 16 с помощью Jest
В системе управления состояниями ngrx, совместимой с Angular 16, появились функциональные эффекты. Это здорово и делает код определенно легче для...
Концепция локализации и ее применение в приложениях React ⚡️
Концепция локализации и ее применение в приложениях React ⚡️
Локализация - это процесс адаптации приложения к различным языкам и культурным требованиям. Это позволяет пользователям получить опыт, соответствующий...
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
0
12
148
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Мы обсуждали это, вероятно, связано с бесконечной рекурсией с операторами Boost в C++20. Тогда я заметил:

Я очень удивлен кодом в сочетании с компилятором С++ 20... Код пахнет "устаревшим" - почти Java-esque - и просто изобилует злоупотреблением необработанными указателями и форсированными обратными портами для вещей, которые имеют был стандартизирован в С++ 11? здесь

и

Хотели бы вы видеть версию своего кода только для стандартной библиотеки? Таким образом, вы можете полностью забыть о проблемах с совместимостью Boost. здесь

Вот эта версия: Coliru в 243 строчках кода. Это на 210 строк меньше, чем в оригинале, и с меньшим количеством запахов¹ и без Boost :)

Обратите внимание, что я изменил интерфейс Task::run на std::stop_token, потому что исходный код использовал нестандартное прерывание потока Boost. Если вы хотите подражать старому поведению, вы можете добавить throw boost::thread_interrupted изнутри двух помощников interruptible_XXX. Конечно, вам также придется обрабатывать их на верхнем уровне вашего потока.

Если прерывание когда-либо использовалось только для завершения цикла диспетчера, а не для фактического взаимодействия с пользовательскими реализациями Task, то просто удалите аргумент stop_token :)

Добавив немного причудливой трассировки и ограничив длину прогона (#define SHORT_DEMO), мы получим

Прямой эфир на Колиру

#include <algorithm>
#include <atomic>
#include <cassert>
#include <condition_variable>
#include <memory>
#include <thread>
#include <utility>
#include <vector>

#include <iomanip>
#include <iostream>
using namespace std::chrono_literals;

namespace { // diagnostics tracing helpers
    auto now = std::chrono::high_resolution_clock::now;

    static auto timestamp() {
        static auto start = now();
        return (now() - start) / 1.ms;
    }

    static std::atomic_int tid_gen = 0;
    thread_local int       tid     = tid_gen++;
    std::mutex             console_mx;

    void trace(auto const&... args) {
        std::lock_guard lk(console_mx);
        std::cout << "T:" << std::setw(2) << tid << std::right << std::setw(10) << timestamp() << "ms ";
        (std::cout << ... << args) << std::endl;
    }

    template <typename> struct CtxTracer {
        const std::string_view _ctx;
        int const              id = [] {
            static std::atomic_int idgen = 0;
            return ++idgen;
        }();
        void operator()(auto const&... args) const { ::trace(_ctx, " #", id, "\t", args...); }
    };

#define TRACE_CTX(T) CtxTracer<struct Tag ## T> trace{#T};

} // namespace

namespace {
    // helpers to help replace boost::thread_interrupted with std::stop_token
    template <typename Lockable, typename Duration, typename Predicate>
    bool interruptible_wait_for(std::condition_variable& cv, std::unique_lock<Lockable>& lock,
                                Duration const& dur, std::stop_token stoken, Predicate pred) {

        // see https://stackoverflow.com/a/66309629/85371
        std::stop_callback callback(stoken, [&cv, initial = true, &mx = *lock.mutex()]() mutable {
            if (std::exchange(initial, false)) // while constructing the callback
                return;                        // avoid dead-lock
            mx.lock();
            mx.unlock();
            cv.notify_all();
        });

        cv.wait_for(lock, dur, [&] { return stoken.stop_requested() || pred(); });
        return pred();
    }

    template <typename Duration> // returns true if stop requested
    static bool interruptible_sleep_for(Duration const& dur, std::stop_token stoken) {
        std::mutex       mutex_;
        std::unique_lock lk{mutex_};
#if 1
        std::condition_variable cv;
        interruptible_wait_for(cv, lk, dur, stoken, std::false_type{});
#else
        // cleaner, but trips up threadsan in many versions
        std::condition_variable_any cv;
        cv.wait_for(lk, stoken, dur, std::false_type{});
#endif
        return stoken.stop_requested();
    }
} // namespace

struct Task {
    virtual ~Task()                   = default;
    virtual void run(std::stop_token) = 0;
};

using mutex_type    = std::mutex;
using cond_var_type = std::condition_variable;
using lock_type     = std::unique_lock<mutex_type>;
using duration_type = std::chrono::steady_clock::duration;
using task_ptr_type = std::shared_ptr<Task>;

/*
 * Conceptually a single thread that services a queue of tasks, until no task is available for a given idle timeout.
 * The queue depth is 1. That is, at most one task can be queued while at most one task is running on the thread.
 * The idle timeout can be modified during execution
 */
class Dispatcher {
    TRACE_CTX(Dispatcher)
    Dispatcher(Dispatcher const&)            = delete;
    Dispatcher& operator=(Dispatcher const&) = delete;

  public:
    Dispatcher(duration_type t = default_idle_tm) : idle_timeout_(t) {}

    void idle_timeout(duration_type t) { idle_timeout_ = min(min_idle_tm, t); }

    // fails if queue slot taken and thread busy > timeout
    bool enqueue(duration_type timeout, task_ptr_type Task);

    static constexpr duration_type default_idle_tm = 1ms;
    static constexpr duration_type min_idle_tm     = 1ms;

  private:
    task_ptr_type pop(duration_type timeout) noexcept;
    void          worker_impl(std::stop_token stoken) noexcept;

    //////
    mutable mutex_type mutex_;
    cond_var_type      producers_, consumer_; // SEHE combine and `notify_all`?
    task_ptr_type      queued_;
    std::jthread       worker_; // the consumer thread

    //////
    std::atomic<duration_type> idle_timeout_;
    struct { std::atomic<size_t> queued, executed, created, terminated; } disp_stats;
};

bool Dispatcher::enqueue(duration_type timeout, task_ptr_type aTask) {
    lock_type lock(mutex_);

    if (!worker_.joinable()) {
        trace("new thread");
        disp_stats.created += 1;
        worker_ = std::jthread([this](std::stop_token stoken) { worker_impl(stoken); });
    }

    if (interruptible_wait_for(producers_, lock, timeout, worker_.get_stop_token(),
                               [this] { return !queued_; })) {
        queued_.swap(aTask);
        consumer_.notify_one();
        disp_stats.queued += 1;
        return true;
    } else {
        return false;
    }
}

task_ptr_type Dispatcher::pop(duration_type timeout) noexcept {
    task_ptr_type task;

    lock_type lock(mutex_);
    if (interruptible_wait_for(consumer_, lock, timeout, worker_.get_stop_token(), [this] { return !!queued_; })) {
        task.swap(queued_);
        producers_.notify_one();
    }
    return task;
}

void Dispatcher::worker_impl(std::stop_token stoken) noexcept {
    duration_type cur_timeout;
    while (auto task = pop((cur_timeout = idle_timeout_))) {
        try {
            disp_stats.executed += 1;
            task->run(stoken);
        } catch (...) {
            trace("unhandled exception ignored");
        }
    }

    disp_stats.terminated += 1;
    trace("stopped idle thread (after ", cur_timeout / 1ms, "ms)");
}

class TaskPool {
    TRACE_CTX(TaskPool)
    TaskPool(TaskPool const&)            = delete; // noncopyable
    TaskPool& operator=(TaskPool const&) = delete; // noncopyable

  public:
    using dispatcher_t  = std::shared_ptr<Dispatcher>;
    using dispatchers_t = std::vector<dispatcher_t>;

    TaskPool(size_t capacity);

    size_t        maxSize() const;
    size_t        watermark() const { return tp_stats.watermark; }
    duration_type idle_timeout() const { return idle_timeout_; }
    void          idle_timeout(duration_type t);

    void execute(task_ptr_type const& Task);

  private:
    mutable mutex_type mutex_;
    dispatchers_t      dispatchers_;
    duration_type      peak_backoff_;

    std::atomic<duration_type> idle_timeout_ = Dispatcher::default_idle_tm;
    struct { std::atomic<size_t> watermark, invocations, executions, scans; } tp_stats;
};

TaskPool::TaskPool(size_t capacity) : dispatchers_(capacity) { assert(capacity); }

void TaskPool::idle_timeout(duration_type t) {
    assert(t >= Dispatcher::min_idle_tm);
    idle_timeout_ = t;

    for (dispatcher_t const& p : dispatchers_)
        if (p)
            p->idle_timeout(t);
}

void TaskPool::execute(task_ptr_type const& Task) {
    lock_type lock(mutex_);

    bool const single = dispatchers_.size() == 1;
    tp_stats.invocations += 1;

    constexpr duration_type min = 100ms, max = 100s;
    for (duration_type w = !single ? 0s : 100s; /*true*/; w = clamp(w * 2, min, max)) {
        if (w > peak_backoff_) {
            trace("new peak backoff interval ", w / 1.0s);
            peak_backoff_ = w;
        }

        for (dispatcher_t& p : dispatchers_) {
            tp_stats.scans += 1;

            if (!p) {
                p = std::make_shared<Dispatcher>(idle_timeout_);
                tp_stats.watermark = &p - dispatchers_.data();
                trace("new Dispatcher (watermark ", tp_stats.watermark, ")");
            }

            if (p->enqueue(w, Task)) {
                tp_stats.executions += 1;
                return;
            }
        }
    }
}

size_t TaskPool::maxSize() const {
    lock_type lock(mutex_);
    return dispatchers_.size();
}

struct Wrapper : Task {
    virtual void run(std::stop_token stoken) override {
        if (!interruptible_sleep_for(10s, stoken))
            listener.run();
    }

    struct Listener {
        TRACE_CTX(Listener)
        void run() { trace("Hello"); }
    };

    Listener listener;
};

static void Demo(TaskPool& pool) {
    TRACE_CTX(Demo)

    std::stop_source stop;

    // emulated application logic that produces tasks
    auto app_logic = [&pool, stoken = stop.get_token()] {
        TRACE_CTX(app_logic)
        for (unsigned index = 0; !stoken.stop_requested(); ++index) {
            auto s = now();
            pool.execute(std::make_shared<Wrapper>());
            trace("index:", index, " enqueued in ", (now() - s) / 1.s, "s");

            if (index % 20 == 0) {
                trace("taking a break from producing tasks");
                std::this_thread::sleep_for(5s);
            }
        }
        trace("exit app_logic");
    };

    trace("start");
    std::vector<std::thread> threads;
    threads.emplace_back(app_logic);
    threads.emplace_back(app_logic);

#ifdef SHORT_DEMO
    std::this_thread::sleep_for(10s); // (2.5min);
    trace("Requesting shutdown for SHORT_DEMO");
    stop.request_stop();
#endif

    trace("joining app_logic threads");
    for (auto& th : threads)
        th.join();
    trace("joined app_logic threads");
}

int main() {
    TRACE_CTX(Main);

    std::cout << std::setprecision(2) << std::fixed;
    trace("main");

    {
        TaskPool threadPool{10};

        std::thread t1(Demo, std::ref(threadPool));
        std::thread t2(Demo, std::ref(threadPool));

        trace("joining t1..."); t1.join();
        trace("joining t2..."); t2.join();
        trace("awaiting task pool");
    }

    trace("bye");
}

С выводом как

g++ -std=c++20 -O2 -Wall -pedantic -pthread main.cpp -DSHORT_DEMO
./a.out
T: 0      0.00ms Main #1    main
T: 0      0.17ms Main #1    joining t1...
T: 1      0.22ms Demo #1    start
T: 2      0.27ms Demo #2    start
T: 3      0.48ms TaskPool #1    new Dispatcher (watermark 0)
T: 3      0.50ms Dispatcher #1  new thread
T: 3      0.67ms app_logiC#1   index:0 enqueued in 0.00s
T: 3      0.69ms app_logiC#1   taking a break from producing tasks
T: 4      0.72ms app_logiC#2   index:0 enqueued in 0.00s
T: 4      0.73ms app_logiC#2   taking a break from producing tasks
T: 5      0.88ms TaskPool #1    new Dispatcher (watermark 1)
T: 5      0.90ms Dispatcher #2  new thread
T: 5      0.97ms app_logiC#3   index:0 enqueued in 0.00s
T: 5      0.99ms app_logiC#3   taking a break from producing tasks
T: 6      1.17ms app_logiC#4   index:0 enqueued in 0.00s
T: 6      1.19ms app_logiC#4   taking a break from producing tasks
T: 4   5001.26ms TaskPool #1    new Dispatcher (watermark 2)
T: 4   5001.33ms Dispatcher #3  new thread
T: 4   5001.47ms app_logiC#2   index:1 enqueued in 0.00s
T: 3   5001.83ms app_logiC#1   index:1 enqueued in 0.00s
T: 5   5002.37ms TaskPool #1    new Dispatcher (watermark 3)
T: 5   5002.42ms Dispatcher #4  new thread
T: 5   5002.54ms app_logiC#3   index:1 enqueued in 0.00s
T: 5   5003.07ms app_logiC#3   index:2 enqueued in 0.00s
T: 4   5003.76ms TaskPool #1    new Dispatcher (watermark 4)
T: 4   5003.77ms Dispatcher #5  new thread
T: 4   5003.84ms app_logiC#2   index:2 enqueued in 0.00s
T: 3   5004.55ms app_logiC#1   index:2 enqueued in 0.00s
T: 6   5005.41ms TaskPool #1    new Dispatcher (watermark 5)
T: 6   5005.43ms Dispatcher #6  new thread
T: 6   5005.51ms app_logiC#4   index:1 enqueued in 0.00s
T: 6   5006.37ms app_logiC#4   index:2 enqueued in 0.00s
T: 4   5007.44ms TaskPool #1    new Dispatcher (watermark 6)
T: 4   5007.46ms Dispatcher #7  new thread
T: 4   5007.56ms app_logiC#2   index:3 enqueued in 0.00s
T: 3   5008.58ms app_logiC#1   index:3 enqueued in 0.00s
T: 5   5009.75ms TaskPool #1    new Dispatcher (watermark 7)
T: 5   5009.77ms Dispatcher #8  new thread
T: 5   5009.86ms app_logiC#3   index:3 enqueued in 0.01s
T: 6   5011.04ms app_logiC#4   index:3 enqueued in 0.00s
T: 4   5012.41ms TaskPool #1    new Dispatcher (watermark 8)
T: 4   5012.43ms Dispatcher #9  new thread
T: 4   5012.51ms app_logiC#2   index:4 enqueued in 0.00s
T: 3   5013.85ms app_logiC#1   index:4 enqueued in 0.01s
T: 5   5015.36ms TaskPool #1    new Dispatcher (watermark 9)
T: 5   5015.38ms Dispatcher #10 new thread
T: 5   5015.46ms app_logiC#3   index:4 enqueued in 0.01s
T: 6   5016.97ms app_logiC#4   index:4 enqueued in 0.01s
T: 6   5018.64ms TaskPool #1    new peak backoff interval 0.10
T: 6   6020.28ms TaskPool #1    new peak backoff interval 0.20
T: 6   8022.03ms TaskPool #1    new peak backoff interval 0.40
T: 1  10000.67ms Demo #1    Requesting shutdown for SHORT_DEMO
T: 1  10000.76ms Demo #1    joining app_logic threads
T: 2  10000.81ms Demo #2    Requesting shutdown for SHORT_DEMO
T: 2  10000.84ms Demo #2    joining app_logic threads
T: 7  10000.87ms Listener #1    Hello
T: 8  10001.11ms Listener #3    Hello
T: 6  12023.81ms TaskPool #1    new peak backoff interval 0.80
T: 6  12023.89ms app_logiC#4   index:5 enqueued in 7.01s
T: 6  12023.91ms app_logiC#4   exit app_logic
T: 3  12024.14ms app_logiC#1   index:5 enqueued in 7.01s
T: 3  12024.19ms app_logiC#1   exit app_logic
T: 9  15001.65ms Listener #6    Hello
T:10  15002.69ms Listener #7    Hello
T:11  15015.13ms Listener #9    Hello
T:12  15015.17ms Listener #8    Hello
T:13  15015.24ms Listener #13   Hello
T:14  15015.29ms Listener #12   Hello
T:15  15015.33ms Listener #17   Hello
T:16  15015.59ms Listener #19   Hello
T: 5  15015.65ms app_logiC#3   index:5 enqueued in 10.00s
T: 5  15015.67ms app_logiC#3   exit app_logic
T: 1  15015.73ms Demo #1    joined app_logic threads
T: 0  15015.80ms Main #1    joining t2...
T: 4  15016.00ms app_logiC#2   index:5 enqueued in 10.00s
T: 4  15016.02ms app_logiC#2   exit app_logic
T: 2  15016.11ms Demo #2    joined app_logic threads
T: 0  15016.20ms Main #1    awaiting task pool
T: 7  20001.13ms Dispatcher #1  stopped idle thread (after 1ms)
T: 8  20001.31ms Listener #4    Hello
T: 8  20013.48ms Dispatcher #2  stopped idle thread (after 1ms)
T: 9  25001.90ms Dispatcher #3  stopped idle thread (after 1ms)
T:10  25015.25ms Dispatcher #4  stopped idle thread (after 1ms)
T:11  25017.66ms Listener #10   Hello
T:12  25017.71ms Listener #15   Hello
T:13  25017.76ms Listener #14   Hello
T:14  25017.79ms Listener #16   Hello
T:15  25017.84ms Listener #18   Hello
T:16  25017.89ms Listener #20   Hello
T:11  25018.81ms Dispatcher #5  stopped idle thread (after 1ms)
T:13  25018.84ms Dispatcher #7  stopped idle thread (after 1ms)
T:12  25018.88ms Dispatcher #6  stopped idle thread (after 1ms)
T:14  25018.94ms Dispatcher #8  stopped idle thread (after 1ms)
T:15  25019.06ms Dispatcher #9  stopped idle thread (after 1ms)
T:16  35018.10ms Dispatcher #10 stopped idle thread (after 1ms)
T: 0  35018.30ms Main #1    bye

Вопросы дизайна

Я вижу ряд проблем с дизайном даже после улучшений

  • TaskPool — это объединенная очередь задач и пул потоков с фиксированной емкостью, где каждый «диспетчер» имеет 0-2 задачи: 0 или 1 выполняется в данный момент и 0 или 1 queued_

  • Нет кражи работы, у каждого диспетчера есть не более одного слота в очереди, независимо от того, сколько других ожидают в другом месте.

  • Постановка в очередь является узким местом. В худшем случае заблокируют на неопределенный срок. Существует время отсрочки, ведущее к тому, что время постановки в очередь одного диспетчера составляет 100 секунд за раз. Под мьютексом TaskPool. В этот момент вся операция становится фактически однопоточной.

  • Диспетчерское сканирование всегда начинается сверху, что вряд ли будет оптимальным. Рассмотрим двух клиентов, пытающихся опубликовать задачу (TaskPool::execute) одновременно.

    За счет мьютекса они будут выполняться последовательно. Первый просканирует полный список dispatchers_ и найдет ближайший доступный слот.

    Даже если предположить «хорошие» обстоятельства, когда нужен только один внутренний цикл (w == 0s), задача другого клиента будет помещена сразу после него, это означает, что первые слоты, которые только что были опробованы (и отклонены, потому что заняты) пробуют снова.

    По сути, TaskPool помешан на контроле, настаивая на том, что они единственные тот, кто может справляться с задачами, но делает плохо и сильно устает быстро, поэтому они делают все более длительные перерывы.

  • В конце концов, TaskPool имеет фиксированную емкость, например, 10 потоков, но для некоторых причина, по которой считается «выгодным» завершать потоки, когда они не очень Занятый. По сути, вы получаете больше накладных расходов, создавая/завершая потоки.

    Напротив, в большинстве операционных систем потоки, ожидающие примитивов синхронизации, не влекут за собой никаких затрат времени выполнения. Просто посмотрите на процесс/поток список любой работающей системы. Прямо сейчас в моей системе работает 1850 LWP («облегченные процессы»). Если бы они не были эффективно запланированы, это бы вообще не сработало.

  • В довершение всего, стол диспетчера никак не упорядочен, это означает, что если элемент изящно завершится, потому что он простаивает, он будет немедленно воссоздан при следующем вызове, независимо от сколько неработающих диспетчеров сидело в списке.

  • В конце концов, TaskPool нарушает ожидания FIFO.

Учитывая все это, я борюсь, в каком сценарии этот подход мог бы быть лучше. в любом отношении, чем более классическая очередь задач, разделенная с идентичным рабочим потоки (которые никогда не завершаются, потому что они не потребляют ресурсы при простое в любом случае):

  • Если не будет полной мощности, очередь НИКОГДА не будет ждать.
  • Даже при полной загрузке ожидание было бы оптимальным, потому что вместо произвольно выбирая диспетчера и ожидая его все более и более долго конкретный поток, который может стать доступным (для... постановки в очередь, даже не выполнение), теперь вы можете просто блокировать ровно до тех пор, пока какой-либо поток не будет удален из очереди задача.
  • В то же время емкость очереди больше не привязана искусственно к количество рабочих потоков

Альтернативный дизайн

Это альтернативная конструкция, рассчитанная на ту же вместимость:

в прямом эфире на Колиру

#define SHORT_DEMO
#include <atomic>
#include <cassert>
#include <condition_variable>
#include <deque>
#include <iomanip>
#include <iostream>
#include <thread>
#include <utility>
using namespace std::chrono_literals;

namespace { // diagnostics tracing helpers
    auto now = std::chrono::high_resolution_clock::now;
    static auto timestamp() {
        static auto start = now();
        return (now() - start) / 1.ms;
    }

    static std::atomic_int tid_gen = 0;
    thread_local int       tid     = tid_gen++;
    std::mutex             console_mx;

    void trace(auto const&... args) {
        std::lock_guard lk(console_mx);
        std::cout << "T:" << std::setw(2) << tid << std::right << std::setw(10) << timestamp() << "ms ";
        (std::cout << ... << args) << std::endl;
    }

    template <typename> struct CtxTracer {
        const std::string_view _ctx;
        int const              id = [] {
            static std::atomic_int idgen = 0;
            return ++idgen;
        }();
        void operator()(auto const&... args) const { ::trace(_ctx, " #", id, "\t", args...); }
    };

    #define TRACE_CTX(T) CtxTracer<struct Tag ## T> trace{#T};
} // namespace

namespace {
    // helpers to help replace boost::thread_interrupted with std::stop_token
    template <typename Lockable, typename Duration, typename Predicate>
    bool interruptible_wait_for(std::condition_variable& cv, std::unique_lock<Lockable>& lock,
                                Duration const& dur, std::stop_token stoken, Predicate pred) {

        // see https://stackoverflow.com/a/66309629/85371
        std::stop_callback callback(stoken, [&cv, initial = true, &mx = *lock.mutex()]() mutable {
            if (std::exchange(initial, false)) // while constructing the callback
                return;                        // avoid dead-lock
            mx.lock();
            mx.unlock();
            cv.notify_all();
        });

        cv.wait_for(lock, dur, [&] { return stoken.stop_requested() || pred(); });
        return pred();
    }

    template <typename Duration> // returns true if stop requested
    static bool interruptible_sleep_for(Duration const& dur, std::stop_token stoken) {
        std::mutex              mutex_;
        std::unique_lock lk{mutex_};
        std::condition_variable cv;
        interruptible_wait_for(cv, lk, dur, stoken, std::false_type{});

        return stoken.stop_requested();
    }
} // namespace

struct Task {
    virtual ~Task()                   = default;
    virtual void run(std::stop_token) = 0;
};
class TaskPool {
    TRACE_CTX(TaskPool)
    static constexpr std::chrono::steady_clock::duration externity = 999'999h; // duration::max() gives overflows in some implementations

  public:
    using task_ptr = std::shared_ptr<Task>;

    TaskPool(size_t capacity);
    ~TaskPool() noexcept;

    size_t maxSize()   const { return capacity_;  }
    size_t watermark() const { return watermark_; }

    void execute(task_ptr Task);

  private:
    mutable std::mutex      mutex_;
    std::condition_variable producers_, consumers_; // SEHE combine and `notify_all`?

    size_t const            capacity_;
    std::stop_source        stop_source_;
    std::deque<std::thread> workers_; // workers
    std::deque<task_ptr>    queue_;

    // former Dispatcher implementation
    task_ptr pop() noexcept;
    void     worker_impl(std::stop_token stoken) noexcept;
    size_t watermark_ = 0, invocations_ = 0, executed_ = 0;
};

TaskPool::TaskPool(size_t capacity) : capacity_(capacity) {
    assert(capacity);
    while (capacity--) // assuming same number of workers as queue capacity, for comparability with old design
        workers_.emplace_back(&TaskPool::worker_impl, this, stop_source_.get_token());
}

TaskPool::~TaskPool() noexcept {
    stop_source_.request_stop();
    for (auto& w : workers_)
        if (w.joinable())
            w.join();
}

void TaskPool::execute(task_ptr task) {
    std::unique_lock lock(mutex_);
    if (interruptible_wait_for(producers_, lock, externity, stop_source_.get_token(),
                                 [this] { return queue_.size() < capacity_; })) {
        queue_.push_back(std::move(task));
        consumers_.notify_one();

        invocations_ += 1;
        watermark_ = std::max(watermark_, queue_.size());
    } // else: stop was requested
}

TaskPool::task_ptr TaskPool::pop() noexcept {
    task_ptr task;
    std::unique_lock lock(mutex_);
    if (interruptible_wait_for(consumers_, lock, externity, stop_source_.get_token(),
                               [this] { return !queue_.empty(); })) {
        task.swap(queue_.front());
        queue_.pop_front();
        producers_.notify_one();
    }
    return task;
}

void TaskPool::worker_impl(std::stop_token stoken) noexcept {
    while (auto task = pop())
        try {
            executed_ += 1;
            task->run(stoken);
        } catch (...) { trace("unhandled exception ignored"); }
    trace("worker exit");
}

struct Wrapper : Task {
    virtual void run(std::stop_token stoken) override {
        if (!interruptible_sleep_for(10s, stoken))
            listener.run();
    }

    struct Listener {
        TRACE_CTX(Listener)
        void run() { trace("Hello"); }
    };
    Listener listener;
};

static void Demo(TaskPool& pool) {
    TRACE_CTX(Demo)
    std::stop_source stop;

    // emulated application logic that produces tasks
    auto app_logic = [&pool, stoken = stop.get_token()] {
        TRACE_CTX(app_logic)
        for (unsigned index = 0; !stoken.stop_requested(); ++index) {
            auto s = now();
            pool.execute(std::make_shared<Wrapper>());
            trace("index:", index, " enqueued in ", (now() - s) / 1.s, "s");

            if (index % 20 == 0) {
                trace("taking a break from producing tasks");
                std::this_thread::sleep_for(5s);
            }
        }
        trace("exit app_logic");
    };

    trace("start");
    std::deque<std::thread> threads;
    threads.emplace_back(app_logic);
    threads.emplace_back(app_logic);

#ifdef SHORT_DEMO
    std::this_thread::sleep_for(10s); // (2.5min);
    trace("Requesting shutdown for SHORT_DEMO");
    stop.request_stop();
#endif

    trace("joining app_logic threads");
    for (auto& th : threads)
        th.join();
    trace("joined app_logic threads");
}

int main() {
    TRACE_CTX(Main);

    std::cout << std::setprecision(2) << std::fixed;
    trace("main");

    {
        TaskPool threadPool{10};

        std::thread t1(Demo, std::ref(threadPool));
        std::thread t2(Demo, std::ref(threadPool));

        trace("joining t1..."); t1.join();
        trace("joining t2..."); t2.join();
        trace("awaiting task pool");
    }

    trace("bye");
}

Обратите внимание, что это завершается на целых 10 секунд раньше, несмотря на то, что генерирует такое же количество работы с одинаковым интервалом и с одинаковым количеством рабочих и очередью емкость. Мы потеряли целый тип (Dispatcher) и большую сложность.

Заключение / Резюме

Возможно, я страдал от недостатка воображения, когда думал о грузах, которые воспользоваться преимуществами специфической семантики очередей, представленной в оригинальном дизайне. Однако я перечислил изрядное количество объективных проблем. Также, если конструкция был преднамеренным, я чувствую, что по крайней мере не хватало четкого именования и (само)документация.

Несмотря на это, я надеюсь, что эти два подхода помогут вам в этом. Сравните поведение и выберите то, что лучше для вас.


¹ (слишком много классов не справляются со своей задачей, объединенные классы (Runner и Dispatcher - сиамские близнецы), ненужное использование необработанных указателей, злоупотребление volatile и const_cast...).

Другие вопросы по теме