Как узнать, когда std::thread завершился в С++?

Я только учусь использовать std::thread в С++ 11. По сути, у меня есть длинный список данных (представьте себе цикл for между 0-15000) и 1568 потоков в оборудовании, которое я использую. Я хочу, чтобы каждый образец обрабатывался отдельным потоком. Я понимаю, как создать первые 1568 потоков, он отлично работает. Но как только я достиг образца N_thread+1, я хочу проверить, есть ли какие-либо доступные потоки. Если есть, отправьте этот образец данных в этот поток. Каждый поток отправляется в функцию, заблокированную мьютексом, которая разблокируется в конце. Возможно, я неправильно понял, как работают потоки, и не могу делать это таким образом? Или, возможно, есть лучшая библиотека назначения потоков/ЦП, которая может помочь?

Как я уже сказал, я могу добраться до точки, где потоки 1568 создаются, запускаются и присоединяются, и конечные результаты хороши. Просто нужно немного больше информации.

это мой главный

int main(){
  cout<<"In main"<<endl;
  CSVReaderUpdatedStructure reader("data.csv");
  vector<STMDataPacket> DataList = reader.GetData();

  thread_pool Pool(THREAD_COUNT);
  auto startT0 = chrono::high_resolution_clock::now();
   for(unsigned s=0; s<DataList.size()-1; s++){
      cout<<"analysing sample "<<s<<endl;
      auto done = Pool.add_task([s= s, Sample= DataList[s], t_inf = time_info,wf=writefile, f=factor]{GetDMWPulses(s, Sample, t_inf, wf,f);});
      done.wait();

    }

  auto stop = chrono::high_resolution_clock::now();
  cout<<"pulses "<<pulses.size()<<endl;
  auto duration = chrono::duration_cast<chrono::microseconds>(stop - startT0); 
  cout <<"time for MWD full process = "<< duration.count() <<" microseconds "<< endl;

  return 0;

}

Если вы не говорите об очень специализированном оборудовании, 1568 потоков, вероятно, слишком много. Кроме того, вам нужен пул потоков для того, чего вы хотите достичь. std::thread может быть строительным блоком для пула потоков, но одного этого будет недостаточно.

SergeyA 12.07.2019 20:37

То, что вы хотите, звучит как пул потоков. Вы можете сделать его самостоятельно или использовать библиотеку, чтобы получить его. Если вы погуглите пул потоков, вы получите много информации.

NathanOliver 12.07.2019 20:38

Создание и уничтожение потоков занимает много времени. Как правило, вы хотите использовать пул потоков, который не требует создания/уничтожения потоков и просто использует одни и те же потоки. VS использует этот метод реализации std::async и будет использовать ресурсы ЦП гораздо эффективнее, хотя могут быть проблемы с thread_local, поскольку их трудно правильно обрабатывать с помощью пулов потоков.

doug 12.07.2019 21:08

В std::thread нет ничего, чтобы проверить, сколько потоков поддерживает ваше оборудование. Вам нужен API для конкретной платформы или сторонняя библиотека, которая его обертывает (например, TBB).

n. 1.8e9-where's-my-share m. 12.07.2019 22:02

Re: «Каждый поток отправляется в заблокированную функцию мьютекса, которая разблокируется в конце». Это звучит почти так, как будто вы хотите, чтобы вся работа, выполняемая потоком, выполнялась с заблокированным мьютексом. Но это означало бы, что в любой момент времени не может работать более одного потока, что звучит так, как будто вы хотите нет.

Solomon Slow 12.07.2019 23:07

Если вы пытаетесь выполнить многопроцессорность (т. е. использовать несколько потоков для ускорения вычислений на многопроцессорной машине), вам необходимо минимизировать количество времени, в течение которого любой данный поток удерживает любой заданный мьютекс заблокированным. См. en.wikipedia.org/wiki/Амдал%27s_law

Solomon Slow 12.07.2019 23:11

Похоже, использование потоков для этого является неправильным решением проблемы. Вы должны использовать пул потоков только с таким количеством потоков, сколько у вас есть ядер (если, к счастью, у вас нет 1568 ядер на вашей машине).

Martin York 13.07.2019 01:46
Стоит ли изучать PHP в 2026-2027 годах?
Стоит ли изучать PHP в 2026-2027 годах?
Привет всем, сегодня я хочу высказать свои соображения по поводу вопроса, который я уже много раз получал в своем сообществе: "Стоит ли изучать PHP в...
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
В JavaScript одним из самых запутанных понятий является поведение ключевого слова "this" в стрелочной и обычной функциях.
Приемы CSS-макетирования - floats и Flexbox
Приемы CSS-макетирования - floats и Flexbox
Здравствуйте, друзья-студенты! Готовы совершенствовать свои навыки веб-дизайна? Сегодня в нашем путешествии мы рассмотрим приемы CSS-верстки - в...
Тестирование функциональных ngrx-эффектов в Angular 16 с помощью Jest
В системе управления состояниями ngrx, совместимой с Angular 16, появились функциональные эффекты. Это здорово и делает код определенно легче для...
Концепция локализации и ее применение в приложениях React ⚡️
Концепция локализации и ее применение в приложениях React ⚡️
Локализация - это процесс адаптации приложения к различным языкам и культурным требованиям. Это позволяет пользователям получить опыт, соответствующий...
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
1
7
220
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Вы, вероятно, не хотите 1568 потоков. Возможно, вы хотите 1568+ задач.

Вероятно, вам нужен пул потоков. TBB имеет пул потоков и доступен почти на каждой платформе.

Написать собственный пул потоков не так сложно. Вот набросок одного из них:

template<class T>
struct threadsafe_queue {
  optional<T> pop() {
    auto l = lock();
    cv.wait( l, [&]{
      return abort || !data.empty();
    });
    if (abort) return {};
    T retval = std::move(data.front());
    data.pop();
    return retval;
  }
  void push( T in ) {
    auto l = lock();
    data.push( std::move(in) );
    cv.notify_one();
  }
  void abort_queue() {
    auto l = lock();
    abort = true;
    cv.notify_all();
  }
private:
  mutable std::mutex m;
  std::condition_variable cv;
  std::queue<T> data;
  bool abort = false;

  std::unique_lock<std::mutex> lock() const {
    return std::unique_lock<std::mutex>(m);
  }
};

struct thread_pool {
  template<class F, class R=typename std::decay< typename std::result_of< F&() >::type>::type>
  auto add_task( F&& f )
  -> std::future< R >
  {
     std::packaged_task<R()> task( std::forward<F>(f) );
     auto retval = task.get_future();
     tasks.push( std::packaged_task<void()>(std::move(task)) );
     return retval;
  }

  void start_thread( std::size_t N=1 )
  {
    if (shutdown) return;
    for (std::size_t i = 0; i < N; ++i)
    {
      threads.emplace_back( [this]{
        while (true)
        {
          if (shutdown) return;
          auto task = tasks.pop();
          if (!task)
            return;
          (*task)();
        }
      } );
    }
  }
  void cleanup() {
    shutdown = true;
    tasks.abort_queue();
    for (auto&& t:threads)
      t.join();
    threads.clear();
  }
  ~thread_pool() {
    cleanup();
  }

  thread_pool():thread_pool( std::thread::hardware_concurrency() ) {}
  explicit thread_pool( std::size_t N ) {
    start_thread(N);
  }
private:
  threadsafe_queue<std::packaged_task<void()>> tasks;
  std::vector<std::thread> threads;
  std::atomic<bool> shutdown = false;
};

теперь создайте thread_pool.

Запихайте туда задачи. Получите фьючерсы.

Попросите рабочие задачи увеличить std::atomic<unsigned int> и подождите, пока он достигнет максимума, или сделайте что-нибудь более интересное.

struct counting_barrier {
  explicit counting_barrier( std::size_t n ):count(n) {}
  void operator--() {
    --count;
    if (count <= 0)
    {
       std::unique_lock<std::mutex> l(m);
       cv.notify_all();
    }
  }
  void wait() {
    std::unique_lock<std::mutex> l(m);
    cv.wait( l, [&]{ return count <= 0; } );
  }
private:
  std::mutex m;
  std::condition_variable cv;
  std::atomic<std::ptrdiff_t> count = 0;
};

Создайте counting_barrier barrier( 15000 ) или что-то подобное. Потоки после завершения могут --barrier (это потокобезопасно). Основной поток может barrier.wait(), и он будет разбужен, когда будет вызвано 15000 --.

В приведенном выше коде могут быть опечатки, но дизайн на высоте. Для промышленного использования вам также понадобится более совершенная процедура отключения.

Живой пример.

Если у вас нет опционального или опционального повышения, используйте это:

template<class T>
struct optional {
  T* get() { return static_cast<T*>( static_cast<void*>( & data ) ); };
  T const* get() const { return static_cast<T*>( static_cast<void*>( & data ) ); };

  T& operator*() & { return *get(); }
  T&& operator*() && { return std::move(*get()); }
  T const& operator*() const & { return *get(); }
  T const&& operator*() const&& { return std::move(*get()); }

  explicit operator bool() const { return engaged; }
  bool has_value() const { return (bool)*this; }
  template< class U >
  T value_or( U&& default_value ) const& {
    if (*this) return **this;
    return std::forward<U>(default_value);
  }
  template< class U >
  T value_or( U&& default_value ) && {
    if (*this) return std::move(**this);
    return std::forward<U>(default_value);
  }

  optional(T const& t) {
    emplace(t);
  }
  optional(T&& t) {
    emplace(std::move(t));
  }
  optional() = default;
  optional(optional const& o) {
    if (o) {
      emplace( *o );
    }
  }
  optional(optional && o) {
    if (o) {
      emplace( std::move(*o) );
    }
  }
  optional& operator=(optional const& o) & {
    if (!o) {
      reset();
    } else if (*this) {
      **this = *o;
    } else {
      emplace( *o );
    }
    return *this;
  }
  optional& operator=(optional && o) & {
    if (!o) {
      reset();
    } else if (*this) {
      **this = std::move(*o);
    } else {
      emplace( std::move(*o) );
    }
    return *this;
  }
  template<class...Args>
  T& emplace(Args&&...args) {
    if (*this) reset();
    ::new( static_cast<void*>(&data) ) T(std::forward<Args>(args)...);
    engaged = true;
    return **this;
  }
  void reset() {
    if (*this) {
      get()->~T();
      engaged = false;
    }
  }
  ~optional() { reset(); }
private:
  using storage = typename std::aligned_storage<sizeof(T), alignof(T)>::type;
  bool engaged = false;
  storage data;
};

обратите внимание, что этот необязательный параметр не является промышленным; Я буквально написал это и не проверял. В нем отсутствуют многие функции промышленной прочности, которые есть у настоящего дополнительного оборудования. Но вы можете заменить его реальным необязательным параметром и получить почти такое же или лучшее поведение, поэтому его можно использовать, если он вам не нужен.

counting_barrier barrier(100);
thread_pool p(10);
for (int i = 0; i < 100; ++i)
{
  p.add_task([&barrier,i]{
    std::stringstream ss;
    ss << i << ",";
    std::cout << ss.str();
    --barrier;
  });
}
barrier.wait();
std::cout << "\n";
auto done1 = p.add_task([]{ std::cout << "hello" << std::endl; });
done1.wait();
auto done2 = p.add_task([]{ std::cout << "world" << std::endl; });
done2.wait();

«Вы не хотите 1568 потоков». Возможно, ты не хочет 1568 потоков. Я тестировал машины с большим количеством аппаратных потоков. Кажется, у OP есть один из них.

n. 1.8e9-where's-my-share m. 12.07.2019 22:04

@н.м. Конечно. И если я ошибаюсь, остальная часть ответа все еще работает и решает их проблему. :)

Yakk - Adam Nevraumont 12.07.2019 22:07

Привет, возвращаясь к этому сейчас - если я хочу добавить функцию, например. void Finder(int x, float y) в очередь, как мне это сделать в основной функции в другом месте?

smid 17.07.2019 15:43
add_task( [x=x_expression, y=y_expression]{ Finder(x,y); } ), где x_expression — это выражение, которое вы запускаете, чтобы вычислить то, что вы хотите для x, а y_expression — это выражение, которое вы запускаете, чтобы вычислить то, что вы хотите для y. То есть: add_task( [x=-1, y=3.14]{ Finder(x,y); } )
Yakk - Adam Nevraumont 17.07.2019 16:12

спасибо, я попытался запустить код, он не позволил мне использовать "atomics": ошибка: использование удаленной функции âstd::atomic<bool>::atomic(const std::atomic<bool>&)â, поэтому я попытался без, но это вызвало: завершение, вызванное после создания экземпляра 'std::future_error', завершение, вызванное рекурсивно

smid 17.07.2019 16:54

@smid Ну, живой пример работает. Было около 2 опечаток, которые я должен был исправить.

Yakk - Adam Nevraumont 17.07.2019 17:10

хорошо, да, теперь это работает. Я до сих пор не уверен, как управлять этим для большего количества потоков. Я еще не установил счетный барьер, будет ли он определен в основной функции и увеличен в функции многопоточности или только в основной?

smid 17.07.2019 18:01

На данный момент кажется, что количество потоков достигает «N», затем приостанавливается… я предполагаю, что это «ожидание», но я не думаю, что там есть что-то, что могло бы переназначить потоки следующему образцу для анализа, как только они освободятся. ..если это имеет смысл. Итак, если у меня есть, например, 50 потоков, и я хочу проанализировать 100 образцов, первые 50 легко представить, а затем, как только этот поток будет завершен, я хочу, чтобы образец 51 был выполнен в этом потоке... я думаю, что-то вроде нужен счетный барьер

smid 17.07.2019 18:11

@smid Исправлено. Мне нужна была опция, чтобы сделать это чисто, а вы хотели С++ 11. Поэтому я написал необязательный, или вы можете использовать boost:: необязательный. Я забыл цикл while, и для чистого завершения работы мне нужно было добавить возможность прерывания очереди, а для чистого прерывания очереди я хотел опционально вернуться из pop. :)

Yakk - Adam Nevraumont 17.07.2019 18:57

спасибо, кажется, теперь работает без сбоев и проходит через все образцы. Затраченное время, однако, похоже, существенно не меняется, независимо от того, сколько потоков я выбираю .... возможно, просто мой код нельзя улучшить с помощью многопоточности, но я чувствую, что должны быть некоторые изменения

smid 17.07.2019 20:19

Я понятия не имею, какое у вас оборудование. Так что не могу посоветовать.

Yakk - Adam Nevraumont 17.07.2019 20:30

lscpu: Архитектура: x86_64 Режимы работы ЦП: 32-битный, 64-битный Порядок байтов: Процессоры с прямым порядком байтов: 56 Список процессоров в сети: 0-55 Поток на ядро: 2 ядра на сокет: 14 сокетов: 2 узла (узлов) NUMA: 2 идентификатор производителя: семейство ЦП GenuineIntel: 6 модель: 79 название модели: ЦП Intel(R) Xeon(R) E5-2680 v4 @ 2,40 ГГц Степпинг: 1 ЦП МГц: 2423,343 ЦП макс. МГц: 3300,0000 ЦП мин. МГц: 1200,0000

smid 17.07.2019 20:44

Ах. У вас нехватка пропускной способности памяти, как догадка. Разделите данные так, чтобы данный поток обрабатывал кучу последовательных данных, вместо того, чтобы получать случайные фрагменты тут и там. Вы также можете посмотреть в своих спецификациях, сколько МБ/с может обрабатывать ваша оперативная память, и сравнить ее со скоростью, с которой вы решаете проблему; это может сказать вам, есть ли возможности для дальнейшего повышения производительности с увеличением вычислительных ресурсов или нет.

Yakk - Adam Nevraumont 17.07.2019 20:53

ах, да, я думаю, вы правы, я только что сделал «верх» и вижу, что использую весь ЦП %.... хотя % памяти не превышает 5%. Я думаю, мне нужно уменьшить мой алгоритм, если я хочу улучшить время или изменить его, как вы предлагаете.

smid 17.07.2019 21:25

Если вы используете весь ЦП и мало памяти, то есть надежда. Разбейте данные на ~ 4500 частей (скажем, чуть менее чем в 3 раза больше количества потоков). Попросите выполнить целую часть (более одной «линии» или «ряда»). Посмотрим, станет ли это лучше.

Yakk - Adam Nevraumont 17.07.2019 21:31

Я пытаюсь создать 3 задачи, как вы упомянули. У меня была игра с параметрами «fork()», но это привело к некоторым запутанным результатам. Я не хочу родительско-дочерней структуры... я думаю, это не лучший способ запустить ее. Я могу посмотреть на «htop» и увидеть, что с текущим кодом (тот, что описан выше) используется 400 задач на. Я попытался сделать несколько разветвлений, чтобы увидеть, как это изменилось, и я вижу, что это создает дополнительные задачи, чего я и хочу, но, возможно, есть лучший способ? Я вижу много вариантов с С++ 17, я думаю, может быть, перейти на него, но я бы предпочел использовать то, что у меня есть, если смогу.

smid 19.07.2019 16:22

@smid Я не понимаю, почему вы вызываете форк. У вас есть 15000 элементов. И ~1500 потоков. Таким образом, каждый поток будет обрабатывать ~ 10 элементов. Возможно, какие-то элементы будут быстрее/медленнее. Решение 1: каждый поток обрабатывает ~10 элементов подряд. Решение 2: каждый поток обрабатывает ~3-4 элемента подряд, затем возвращается и берет другую задачу из очереди, выполняя ~3 раза. Текущее решение: каждый поток обрабатывает 1 элемент, затем возвращается назад, выполняясь примерно 10 раз. Я говорю, что мы можем уменьшить конкуренцию (в очереди) и улучшить локальность памяти (тот же поток, смежный), выполняя задачи в больших пакетах.

Yakk - Adam Nevraumont 19.07.2019 16:28
done.wait(); -- который ждет, пока задача не будет выполнена. Вы делаете это перед тем, как начнете вторую задачу. Затем вы ждете, пока это будет сделано, прежде чем начинать третью задачу. Это кажется плохим планом.
Yakk - Adam Nevraumont 19.07.2019 16:48

@smid: добавьте свое редактирование / уточнение к своему вопросу, а не к ответу

Solarflare 19.07.2019 16:48

Я добавил свой основной для обсуждения. Этот список данных представляет собой 15000 образцов. У меня сейчас 1500 потоков для простоты. Итак, я должен добавить в цикл/в то время как где-то, чтобы отправить 10 образцов в add_task в этот момент, а затем увеличить до следующих 10?

smid 19.07.2019 16:51

Я добавил пример правильного использования counting_barrier и выполнения 100 задач в 10 потоках в ответе выше, включая ссылку на живой пример. @smid редактировать ответ с ответом неправильно. На самом деле, на этом этапе рассмотрите возможность задать еще один вопрос, используя кнопку [задать вопрос].

Yakk - Adam Nevraumont 19.07.2019 16:58

да, это была ошибка, я хотел добавить код к вопросу. Я попробую это сейчас и посмотрю, как все обернется. Спасибо.

smid 19.07.2019 17:09

Другие вопросы по теме