Я хотел бы создать набор из N потоков под управлением исходного процесса. Я хотел бы управлять ими, как в этом псевдокоде:
create_n_threads();
While(1) {
main task modifies a global variable "phase" to control the type of work
trigger the N threads to wake up and do work based on the global "phase" variable
wait until all threads have completed their tasks
main task does meta-calculation on the partial results of all the workers
}
Я попробовал pthread_barrier_wait(). Он хорошо работает для запуска вычислительного цикла, но не дает мне возможности узнать, когда завершена каждая задача. Как мне узнать, что все потоки завершены, чтобы можно было безопасно выполнить метарасчет результата? Я не хочу использовать pthread_join, потому что эти рабочие циклы будут замкнуты в цикле, и мне не нужны накладные расходы на уничтожение и воссоздание задач в каждом цикле.
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h> // for sleep()
#define NTHREADS 4
pthread_barrier_t b;
typedef struct threadargs {
int id; // thread ID 0-\>N
int phase; // either 0 or non zero to set blk/red phase
} THREADARGS;
int phase=0;
int cycle=0;
// worker function
// gets called with a pointer to THREADARGS structure
// which tells worker their id, starting, ending column to relax, and the red/black phase
void *thread_func(void *x)
{
int tid; // thread id
int *retval = (int *) malloc(sizeof(int)); // so it persists across thread death
tid = ((THREADARGS *) x)->id;
while(1) { // wait to be triggered
printf("%d: %d %d\n", cycle, tid, phase);
pthread_barrier_wait(&b);
}
*retval = 2*tid;
pthread_exit((void *)retval);
}
int main(int argc, char *argv[])
{
pthread_t threadids[NTHREADS]; // store os thread ids
THREADARGS thread_args[NTHREADS]; // arguments to thread
int rc, i;
// initialize the multiprocess barrier
pthread_barrier_init(&b, NULL, NTHREADS+1);
/* spawn the threads */
for (i = 0; i < NTHREADS; ++i) {
thread_args[i].id = i;
printf("spawning thread %d\n", i);
if ((rc=pthread_create(&threadids[i], NULL, thread_func, (void *) &thread_args[i]))!=0) {
fprintf(stderr, "cannot create thread %d\n",i);
exit(8);
};
}
for (i=0; i<10; i++) { // do ten iterations
printf("cycle %d\n", i);
phase=(phase+1)%3;
cycle++;
pthread_barrier_wait(&b); // trigger all the workers and wait for all to complete
}
exit(2); // just kill everything
}
В этом примере выводятся такие результаты:
!) pthread
spawning thread 0
spawning thread 1
spawning thread 2
spawning thread 3
0: 0 0
0: 1 0
cycle 0
1: 2 1
1: 3 1
1: 3 1
1: 1 1
1: 2 1
1: 0 1
cycle 1
cycle 2
3: 1 0
3: 3 0
3: 2 0
3: 0 0
3: 0 0
3: 2 0
3: 3 0
cycle 3
3: 1 0
4: 1 1
4: 2 1
4: 0 1
4: 3 1
Вы можете видеть, что некоторые рабочие процессы запускаются несколько раз за цикл и что переменная «фаза» не учитывается должным образом от цикла к циклу. Я хочу что-то вроде:
cycle 1
1: 0 0
1: 1 0
1: 2 0
1: 3 0
cycle 2
2: 0 1
2: 1 1
2: 2 1
2: 2 1
cycle 3
3: 0 2
...
Конечно, операторы печати каждой задачи будут зашифрованы, но я хочу запустить все 4 потока pthread для выполнения задачи «0,1,2» и чтобы все они завершились, чтобы я мог работать с их результатами и безопасно изменять глобальные переменные для следующий цикл.
Извините, я просто вырезал и вставил свой пример. Я не знаю подходящей идиомы для добавления кода при переполнении стека. Я не видел никакого механизма для просмотра моей файловой системы в поисках файла кода.
Откуда вы взяли этот пример, содержащий все эти escape-последовательности? Если вы просто скопируете и вставите свой собственный код в окно браузера, все должно работать нормально.
Я отредактировал исходный код, удалив escape-последовательности. Он был опубликован путем вырезания и вставки из Linux Xterm в браузер Firefox виртуальной машины Windows. Прошу прощения, что не заметил добавленных символов.
Вы можете использовать cond var.
int done = 0;
int work = 0;
int working = 0;
int waiting = n;
Работник:
pthread_mutex_lock( &mutex );
while ( 1 ) {
// Wait for work to arrive.
while ( !work && !done )
pthread_cond_wait( &cond, &mutex );
if ( !work && done )
break;
--work
--waiting;
++working;
pthread_cond_signal( &cond );
pthread_mutex_unlock( &mutex );
// Do work.
pthread_mutex_lock( &mutex );
--working;
++waiting;
pthread_cond_signal( &cond );
}
pthread_mutex_unlock( &mutex );
Основной:
pthread_mutex_lock( &mutex );
while ( 1 ) {
work = n;
pthread_cond_signal( &cond );
// Wait for the workers to complete the work.
while ( work || working )
pthread_cond_wait( &cond, &mutex );
// Process result.
}
done = 1;
pthread_cond_broadcast( &cond );
pthread_mutex_unlock( &mutex );
// Join worker threads here.
Обратите внимание, что можно добавить любой объем работы. Меньше n
работа — это нормально. Даже больше, чем n
получится.
См. также: Потокобезопасная очередь, поддерживающая несколько производителей и потребителей
Ваш код действительно помог мне понять переменные cond. Как только я понял, что оба потока должны заблокировать переменную, и она разблокируется при вызове pthread_cond_wait(), это начало обретать смысл. Этот подход очень мощный, и я могу использовать его в будущем. Для моей нынешней проблемы достаточно ожидания с двойным барьером, и его легче понять. Спасибо за вашу помощь!
ОПИСАНИЕ
Вызывающий поток блокируется до тех пор, пока не будет вызвано необходимое количество потоков
pthread_barrier_wait()
с указанием барьера.Когда необходимое количество потоков вызвало
pthread_barrier_wait()
с указанием барьера, константаPTHREAD_BARRIER_SERIAL_THREAD
должна быть возвращена одному неуказанному потоку, а ноль должен быть возвращен каждому из оставшихся потоков.ВОЗВРАЩАЕМОЕ ЗНАЧЕНИЕ
После успешного завершения функция
pthread_barrier_wait()
должна вернутьPTHREAD_BARRIER_SERIAL_THREAD
для одного (произвольного) потока, синхронизированного на барьере, и ноль для каждого из остальных потоков. В противном случае должен быть возвращен номер ошибки, указывающий на ошибку.
Поэтому:
int res = pthread_barrier_wait(barrier);
if (res == PTHREAD_BARRIER_SERIAL_THREAD) {
//only one single (arbitrary) thread will reach this point
//and only if all other threads have reached the barrier
} else {
//all others will see this part
}
С двумя барьерами вы можете сделать следующее:
int done = 0;
void* thread_func(void *arg) {
while (!done) {
//wait until all threads have signal to go
pthread_barrier_wait(start_barrier);
//all green, lets do the work ...
//as last operation, store the result in a dedicated global variable
//as long as only this thread will access it, no need to
//protect it with a mutex
//after work is done, wait until all the others are done too
pthread_barrier_wait(stop_barrier);
}
}
int main()
{
//all the init stuff (inclusive thread creation)
do {
//do preparations before the threads starts doing their work
//set global vars etc.
//e.g. done = 1;
//all threads are waiting for the last thread to get going
//give green signal
pthread_barrier_wait(start_barrier);
//while the threads doing their job
pthread_barrier_wait(stop_barrier);
//at this point, each task has done its job
//and it is guaranteed, that all data will no longer
//accessed by any thread, therefore no protection needed
//do the main calculation
} while (!done);
}
Подход с двойным барьером является самым простым решением. Я не рассматривал возможность использования двух разных барьерных переменных и боялся использовать одну и ту же дважды и каким-то образом потерять синхронизацию. Использование разных переменных кажется надежным, и теперь мой код работает правильно. Спасибо!
Проблема в том, что как только потоки преодолели барьер, они хотят продолжить свою работу, но не могут продолжать, пока не будут выполнены основные вычисления (и не подготовятся данные для следующей итерации). Невозможно сделать это с помощью одного барьера. Конечно, использование мьютекса и условной переменной может решить эту проблему, но разве не именно это делает операция ожидания барьера?
Ваш код полон escape-символов, что является недопустимым синтаксисом C.