Периодически запускать рабочие потоки pthread и ждать завершения

Я хотел бы создать набор из N потоков под управлением исходного процесса. Я хотел бы управлять ими, как в этом псевдокоде:

create_n_threads();

While(1) {
    main task modifies a global variable "phase" to control the type of work
    trigger the N threads to wake up and do work based on the global "phase" variable
    wait until all threads have completed their tasks
    main task does meta-calculation on the partial results of all the workers
}

Я попробовал pthread_barrier_wait(). Он хорошо работает для запуска вычислительного цикла, но не дает мне возможности узнать, когда завершена каждая задача. Как мне узнать, что все потоки завершены, чтобы можно было безопасно выполнить метарасчет результата? Я не хочу использовать pthread_join, потому что эти рабочие циклы будут замкнуты в цикле, и мне не нужны накладные расходы на уничтожение и воссоздание задач в каждом цикле.

#include <stdio.h>
#include <stdlib.h> 
#include <pthread.h> 
#include <unistd.h>     // for sleep()

#define NTHREADS 4
pthread_barrier_t b;

typedef struct threadargs {
int id;             // thread ID 0-\>N
int phase;          // either 0 or non zero to set blk/red phase
} THREADARGS;

int phase=0;
int cycle=0;

// worker function
// gets called with a pointer to THREADARGS structure
// which tells worker their id, starting, ending column to relax, and the red/black phase

void *thread_func(void *x)
{
int tid;                    // thread id

    int *retval = (int *) malloc(sizeof(int));   // so it persists across thread death
    tid = ((THREADARGS *) x)->id;
    
    while(1) {                  // wait to be triggered
        printf("%d: %d %d\n", cycle, tid, phase);
        pthread_barrier_wait(&b);
    }
    
    *retval = 2*tid;
    pthread_exit((void *)retval);

}

int main(int argc, char *argv[])
{
pthread_t threadids[NTHREADS];      // store os thread ids
THREADARGS thread_args[NTHREADS];           // arguments to thread
int rc, i;

    // initialize the multiprocess barrier 
    pthread_barrier_init(&b, NULL, NTHREADS+1);
    
    /* spawn the threads */
    for (i = 0; i < NTHREADS; ++i) {
        thread_args[i].id = i;
        printf("spawning thread %d\n", i);
        if ((rc=pthread_create(&threadids[i], NULL, thread_func, (void *) &thread_args[i]))!=0) {
            fprintf(stderr, "cannot create thread %d\n",i);
            exit(8);
        };
    }
    
    for (i=0; i<10; i++) {              // do ten iterations
        printf("cycle %d\n", i);
        phase=(phase+1)%3;
        cycle++;
        pthread_barrier_wait(&b);       // trigger all the workers and wait for all to complete
    }
    
    exit(2);    // just kill everything

}

В этом примере выводятся такие результаты:

!) pthread
spawning thread 0
spawning thread 1
spawning thread 2
spawning thread 3
0: 0 0
0: 1 0
cycle 0
1: 2 1
1: 3 1
1: 3 1
1: 1 1
1: 2 1
1: 0 1
cycle 1
cycle 2
3: 1 0
3: 3 0
3: 2 0
3: 0 0
3: 0 0
3: 2 0
3: 3 0
cycle 3
3: 1 0
4: 1 1
4: 2 1
4: 0 1
4: 3 1

Вы можете видеть, что некоторые рабочие процессы запускаются несколько раз за цикл и что переменная «фаза» не учитывается должным образом от цикла к циклу. Я хочу что-то вроде:

cycle 1
1: 0 0
1: 1 0
1: 2 0
1: 3 0
cycle 2
2: 0 1
2: 1 1
2: 2 1
2: 2 1
cycle 3
3: 0 2
...

Конечно, операторы печати каждой задачи будут зашифрованы, но я хочу запустить все 4 потока pthread для выполнения задачи «0,1,2» и чтобы все они завершились, чтобы я мог работать с их результатами и безопасно изменять глобальные переменные для следующий цикл.

Ваш код полон escape-символов, что является недопустимым синтаксисом C.

paddy 01.03.2024 01:14

Извините, я просто вырезал и вставил свой пример. Я не знаю подходящей идиомы для добавления кода при переполнении стека. Я не видел никакого механизма для просмотра моей файловой системы в поисках файла кода.

rickwalker 01.03.2024 03:42

Откуда вы взяли этот пример, содержащий все эти escape-последовательности? Если вы просто скопируете и вставите свой собственный код в окно браузера, все должно работать нормально.

Gerhardh 01.03.2024 10:44

Я отредактировал исходный код, удалив escape-последовательности. Он был опубликован путем вырезания и вставки из Linux Xterm в браузер Firefox виртуальной машины Windows. Прошу прощения, что не заметил добавленных символов.

rickwalker 01.03.2024 22:20
Стоит ли изучать PHP в 2023-2024 годах?
Стоит ли изучать PHP в 2023-2024 годах?
Привет всем, сегодня я хочу высказать свои соображения по поводу вопроса, который я уже много раз получал в своем сообществе: "Стоит ли изучать PHP в...
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
В JavaScript одним из самых запутанных понятий является поведение ключевого слова "this" в стрелочной и обычной функциях.
Приемы CSS-макетирования - floats и Flexbox
Приемы CSS-макетирования - floats и Flexbox
Здравствуйте, друзья-студенты! Готовы совершенствовать свои навыки веб-дизайна? Сегодня в нашем путешествии мы рассмотрим приемы CSS-верстки - в...
Тестирование функциональных ngrx-эффектов в Angular 16 с помощью Jest
В системе управления состояниями ngrx, совместимой с Angular 16, появились функциональные эффекты. Это здорово и делает код определенно легче для...
Концепция локализации и ее применение в приложениях React ⚡️
Концепция локализации и ее применение в приложениях React ⚡️
Локализация - это процесс адаптации приложения к различным языкам и культурным требованиям. Это позволяет пользователям получить опыт, соответствующий...
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
0
4
72
2
Перейти к ответу Данный вопрос помечен как решенный

Ответы 2

Вы можете использовать cond var.

int done = 0;
int work = 0;
int working = 0;
int waiting = n;

Работник:

pthread_mutex_lock( &mutex );

while ( 1 ) {
   // Wait for work to arrive.
   while ( !work && !done )
      pthread_cond_wait( &cond, &mutex );

   if ( !work && done )
      break;

   --work
   --waiting;
   ++working;
   pthread_cond_signal( &cond );

   pthread_mutex_unlock( &mutex );

   // Do work.

   pthread_mutex_lock( &mutex );

   --working;
   ++waiting;
   pthread_cond_signal( &cond );
}

pthread_mutex_unlock( &mutex );

Основной:

pthread_mutex_lock( &mutex );

while ( 1 ) {
   work = n;
   pthread_cond_signal( &cond );

   // Wait for the workers to complete the work.
   while ( work || working )
      pthread_cond_wait( &cond, &mutex );

   // Process result.
}

done = 1;
pthread_cond_broadcast( &cond );

pthread_mutex_unlock( &mutex );

// Join worker threads here.

Обратите внимание, что можно добавить любой объем работы. Меньше n работа — это нормально. Даже больше, чем n получится.


См. также: Потокобезопасная очередь, поддерживающая несколько производителей и потребителей

Ваш код действительно помог мне понять переменные cond. Как только я понял, что оба потока должны заблокировать переменную, и она разблокируется при вызове pthread_cond_wait(), это начало обретать смысл. Этот подход очень мощный, и я могу использовать его в будущем. Для моей нынешней проблемы достаточно ожидания с двойным барьером, и его легче понять. Спасибо за вашу помощь!

rickwalker 01.03.2024 22:26
Ответ принят как подходящий

Из pthread_barrier_wait

ОПИСАНИЕ

Вызывающий поток блокируется до тех пор, пока не будет вызвано необходимое количество потоков pthread_barrier_wait() с указанием барьера.

Когда необходимое количество потоков вызвало pthread_barrier_wait() с указанием барьера, константа PTHREAD_BARRIER_SERIAL_THREAD должна быть возвращена одному неуказанному потоку, а ноль должен быть возвращен каждому из оставшихся потоков.

ВОЗВРАЩАЕМОЕ ЗНАЧЕНИЕ

После успешного завершения функция pthread_barrier_wait() должна вернуть PTHREAD_BARRIER_SERIAL_THREAD для одного (произвольного) потока, синхронизированного на барьере, и ноль для каждого из остальных потоков. В противном случае должен быть возвращен номер ошибки, указывающий на ошибку.

Поэтому:

int res = pthread_barrier_wait(barrier);
if (res == PTHREAD_BARRIER_SERIAL_THREAD) {
    //only one single (arbitrary) thread will reach this point
    //and only if all other threads have reached the barrier
} else {
   //all others will see this part
}

С двумя барьерами вы можете сделать следующее:

int done = 0;

void* thread_func(void *arg) {

    while (!done) {

        //wait until all threads have signal to go
        pthread_barrier_wait(start_barrier);

        //all green, lets do the work ...
        //as last operation, store the result in a dedicated global variable
        //as long as only this thread will access it, no need to
        //protect it with a mutex

        //after work is done, wait until all the others are done too
        pthread_barrier_wait(stop_barrier);

    }

}

int main()
{
    //all the init stuff (inclusive thread creation)

    do {
        
        //do preparations before the threads starts doing their work
        //set global vars etc.
        //e.g. done = 1;

        //all threads are waiting for the last thread to get going
        //give green signal
        pthread_barrier_wait(start_barrier);

        //while the threads doing their job
        pthread_barrier_wait(stop_barrier);

        //at this point, each task has done its job
        //and it is guaranteed, that all data will no longer
        //accessed by any thread, therefore no protection needed

        //do the main calculation

    } while (!done);

}

Подход с двойным барьером является самым простым решением. Я не рассматривал возможность использования двух разных барьерных переменных и боялся использовать одну и ту же дважды и каким-то образом потерять синхронизацию. Использование разных переменных кажется надежным, и теперь мой код работает правильно. Спасибо!

rickwalker 01.03.2024 22:23

Проблема в том, что как только потоки преодолели барьер, они хотят продолжить свою работу, но не могут продолжать, пока не будут выполнены основные вычисления (и не подготовятся данные для следующей итерации). Невозможно сделать это с помощью одного барьера. Конечно, использование мьютекса и условной переменной может решить эту проблему, но разве не именно это делает операция ожидания барьера?

Erdal Küçük 01.03.2024 23:08

Другие вопросы по теме