Кольцевой буфер с доступом только для чтения из читателя

У меня проблема с кольцевым буфером, который я хочу построить, в котором читатель имеет доступ только для чтения. Чтобы добиться плавного ролловера, у меня есть писатель, чтобы установить идентификатор в итераторе + 1 структуры данных ролловера на 0, для чего я проверяю с помощью читателя. Мой алгоритм, кажется, работает нормально до первого переворота, затем по какой-то причине resder будет читать 0 из идентификатора, который, очевидно, установлен писателем. У меня есть компилируемый пример кода, чтобы продемонстрировать проблему прямо здесь:

#include <stdio.h>
#include <time.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
#include <unistd.h>

#define NUM_ALM 5
#define ERROR   -1
#define OK      0

//even IDs = alarm active
//odd IDs  = alarm clear
enum alarmid {
    BFD_ACT     = 0x02,
    BFD_CLR     = 0x03,
    LOS_ACT     = 0x0C
};
typedef struct alarm_s {
    long timestamp;
    int alarmid;
    int arg1;
    int arg2;
}alarm_t;
int alarm_add(int id, int arg1, int arg2);
int next_alarm_read(alarm_t *res);
void *alarm_reader(void *arg);

static alarm_t *roller;
pthread_cond_t cv;
pthread_mutex_t mutex;
int main (void)
{
    int i =0;
    alarm_t dat;
    pthread_t reader;
    int ret;

    roller = calloc(NUM_ALM,sizeof(alarm_t));
    printf("allocated memory: %lukB\n",(sizeof(alarm_t)*NUM_ALM)/1024);

    for (i = 1; i< NUM_ALM; i++){
        alarm_add(LOS_ACT,i,0);
    }
    ret = pthread_create(&reader,NULL,alarm_reader,NULL);
    if (ret){
        printf("Error - pthread_create() return code: %d\n",ret);
        return ERROR;
    }
    sleep(1);
    alarm_add(BFD_ACT,8,0);
    alarm_add(BFD_ACT,8,0);
    alarm_add(BFD_ACT,8,0);
    alarm_add(BFD_ACT,8,0);
    alarm_add(BFD_CLR,8,0);
    alarm_add(BFD_CLR,8,0);
    alarm_add(BFD_CLR,8,0);
    alarm_add(BFD_CLR,8,0);
    alarm_add(BFD_ACT,8,0);

    pthread_join(reader,NULL);
}

void *alarm_reader(void *arg)
{
    static alarm_t dat = {0};
    int err = 0;
    while(err <= 2)
    {
        if (next_alarm_read(&dat)== OK)
            printf("read alarm id %d, arg1 %d,arg2 %d\n",dat.alarmid,dat.arg1,dat.arg2);
        else{
            printf("alarm_reader() next_alarm_read() returned ERROR, wait\n");
            pthread_mutex_lock(&mutex);
            pthread_cond_wait(&cv, &mutex);
            pthread_mutex_unlock(&mutex);

            err++;
        }
    }
    printf("alarm_reader exit!\n");
}
int alarm_add(int id, int arg1, int arg2)
{
    static int i = 0;
    alarm_t dat = {0};
    if (i<NUM_ALM){
        dat.timestamp = time(NULL);
        dat.alarmid = id;
        dat.arg1 = arg1;
        dat.arg2 = arg2;

        if (&roller[i]){
            memcpy(&roller[i],&dat,sizeof(alarm_t));
            if (i+1<NUM_ALM)
                roller[i+1].alarmid = 0;
            else
                roller[0].alarmid = 0;
            pthread_cond_signal(&cv);
            printf("added id %d, arg1 %d, arg2 %d @%d\n",roller[i].alarmid,roller[i].arg1,roller[i].arg2,i);
            i++;
        }
    } else {
        i = 0;
    }
    return 0;
}

int next_alarm_read(alarm_t *res)
{
    static int i = 0;
    static long prev_time = 0;
    if (!res)
        return ERROR;

    if (i<NUM_ALM)
    {
        if (roller[i].alarmid!=0){
            printf("next_alarm_read() reading @%d\n",i);
            res->timestamp = roller[i].timestamp;
            res->alarmid = roller[i].alarmid;
            res->arg1 = roller[i].arg1;
            res->arg2 = roller[i].arg2;
            prev_time = roller[i].timestamp;
            i++;
        } else {
            printf("next_alarm_read() @%d is %d,return ERROR\n",i,roller[i].alarmid);

            return ERROR;
        }
    } else {
        i = 0;
    }
    return OK;
}

Где выглядит выход:

added id 12, arg1 1, arg2 0 @0
added id 12, arg1 2, arg2 0 @1
added id 12, arg1 3, arg2 0 @2
added id 12, arg1 4, arg2 0 @3
next_alarm_read() reading @0
read alarm id 12, arg1 1,arg2 0
next_alarm_read() reading @1
read alarm id 12, arg1 2,arg2 0
next_alarm_read() reading @2
read alarm id 12, arg1 3,arg2 0
next_alarm_read() reading @3
read alarm id 12, arg1 4,arg2 0
next_alarm_read() @4 is 0,return ERROR
alarm_reader() next_alarm_read() returned ERROR, wait
added id 2, arg1 8, arg2 0 @4
added id 2, arg1 8, arg2 0 @0
added id 2, arg1 8, arg2 0 @1
added id 3, arg1 8, arg2 0 @2
added id 3, arg1 8, arg2 0 @3
added id 3, arg1 8, arg2 0 @4
added id 2, arg1 8, arg2 0 @0
next_alarm_read() reading @4
read alarm id 3, arg1 8,arg2 0
read alarm id 3, arg1 8,arg2 0
next_alarm_read() reading @0
read alarm id 2, arg1 8,arg2 0
next_alarm_read() @1 is 0,return ERROR
alarm_reader() next_alarm_read() returned ERROR, wait

нижний отпечаток для next_alarm_read() @1 is 0,return ERROR неправильный, идентификатор должен быть 2. Почему это не работает так, как задумано? Мне интересно?

Стоит ли изучать PHP в 2026-2027 годах?
Стоит ли изучать PHP в 2026-2027 годах?
Привет всем, сегодня я хочу высказать свои соображения по поводу вопроса, который я уже много раз получал в своем сообществе: "Стоит ли изучать PHP в...
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
В JavaScript одним из самых запутанных понятий является поведение ключевого слова "this" в стрелочной и обычной функциях.
Приемы CSS-макетирования - floats и Flexbox
Приемы CSS-макетирования - floats и Flexbox
Здравствуйте, друзья-студенты! Готовы совершенствовать свои навыки веб-дизайна? Сегодня в нашем путешествии мы рассмотрим приемы CSS-верстки - в...
Тестирование функциональных ngrx-эффектов в Angular 16 с помощью Jest
В системе управления состояниями ngrx, совместимой с Angular 16, появились функциональные эффекты. Это здорово и делает код определенно легче для...
Концепция локализации и ее применение в приложениях React ⚡️
Концепция локализации и ее применение в приложениях React ⚡️
Локализация - это процесс адаптации приложения к различным языкам и культурным требованиям. Это позволяет пользователям получить опыт, соответствующий...
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
1
0
62
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Несколько вопросов:

Я не уверен, что должен делать / иметь в виду if (&roller[i]).

sleep в main на самом деле не нужен, и я подозреваю, что это попытка решить другие проблемы, указанные ниже.

alarm_add сбросит запись в точке опрокидывания.

Кроме того, он может переполнить считыватель и перезаписать записи до того, как считыватель сможет их увидеть (т. Е. Состояние гонки).

Читателю и записывающему устройству необходимо видеть текущие индексы очередей друг друга (т.е.они не должны иметь функциональную область видимости static), чтобы предотвратить переполнение / гонку.

Должны быть переменные условия два, а не одна:

  1. Писатель обнаруживает, что очередь заполнена, и ему необходимо заблокировать, пока читатель не опустошит запись.
  2. Читатель обнаруживает пустую очередь и должен блокировать, пока писатель не добавит новую запись.

Вот отредактированная версия вашего кода, которая должна решить эти проблемы. Я добавил отладочный код. Возможно, он не идеален [и может ошибаться в сторону консерватизма], но он должен продвинуть вас немного дальше [прошу прощения за бесплатную очистку стиля]:

#include <stdio.h>
#include <time.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
#include <unistd.h>

#define NUM_ALM 5
#define ERROR   -1
#define OK      0

double tvzero;

//even IDs = alarm active
//odd IDs  = alarm clear
enum alarmid {
    BFD_ACT = 0x02,
    BFD_CLR = 0x03,
    LOS_ACT = 0x0C
};

typedef struct alarm_s {
    long timestamp;
    int alarmid;
    int arg1;
    int arg2;
} alarm_t;

void alarm_add(int id, int arg1, int arg2);
int next_alarm_read(alarm_t * res);
void *alarm_reader(void *arg);

static alarm_t *roller;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;

// reader variables
pthread_cond_t cv_notempty;             // writer signals when queue not empty
volatile int need_notempty;             // reader sets this before waiting
volatile int idxdeq;                    // reader's queue index

// writer variables
pthread_cond_t cv_notfull;              // reader signals when queue not full
volatile int need_notfull;              // writer sets this before waiting
volatile int idxenq;                    // writer's queue index

volatile int stopall;

double
tvgetf(void)
{
    struct timespec ts;
    double sec;

    clock_gettime(CLOCK_REALTIME,&ts);

    sec = ts.tv_nsec;
    sec /= 1e9;
    sec += ts.tv_sec;

    sec -= tvzero;

    return sec;
}

#define DBG(_reason) \
    dbg(_reason)

void
dbg(const char *reason)
{
    double tvnow;

    tvnow = tvgetf();
    printf("[%.9f] %s\n",tvnow,reason);
}

int
main(void)
{
    int i = 0;
    pthread_t reader;
    int ret;

    tvzero = tvgetf();

    roller = calloc(NUM_ALM, sizeof(alarm_t));
    printf("allocated memory: %lukB\n", (sizeof(alarm_t) * NUM_ALM) / 1024);

    // NOTE: queuing more than a full queue here will cause writer to block
    // forever because reader is not yet started
    for (i = 1; i < NUM_ALM; i++) {
        alarm_add(LOS_ACT, i, 0);
    }

    ret = pthread_create(&reader, NULL, alarm_reader, NULL);
    if (ret) {
        printf("Error - pthread_create() return code: %d\n", ret);
        return ERROR;
    }

#if 0
    sleep(1);
#endif

    alarm_add(BFD_ACT, 8, 0);
    alarm_add(BFD_ACT, 8, 0);
    alarm_add(BFD_ACT, 8, 0);
    alarm_add(BFD_ACT, 8, 0);
    alarm_add(BFD_CLR, 8, 0);
    alarm_add(BFD_CLR, 8, 0);
    alarm_add(BFD_CLR, 8, 0);
    alarm_add(BFD_CLR, 8, 0);
    alarm_add(BFD_ACT, 8, 0);

    // tell reader that all items are queued and it should stop when it
    // processes the final item
    pthread_mutex_lock(&mutex);
    stopall = 1;
    if (need_notempty)
        pthread_cond_signal(&cv_notempty);
    pthread_mutex_unlock(&mutex);

    pthread_join(reader, NULL);

    return 0;
}

// RETURNS: queue index to process (-1=empty)
int
queue_notempty(void)
{
    int curidx;

    do {
        curidx = idxdeq;

        // queue is empty
        if (curidx == idxenq) {
            curidx = -1;
            break;
        }

        // advance dequeue index
        idxdeq += 1;
        idxdeq %= NUM_ALM;
    } while (0);

    return curidx;
}

// RETURNS: queue index to use (-1=full)
int
queue_notfull(void)
{
    int nxtidx;
    int curidx;

    do {
        // get current index
        curidx = idxenq;

        // advance to next slot (wrapping if necessary)
        nxtidx = curidx;
        nxtidx += 1;
        nxtidx %= NUM_ALM;

        // queue is full
        if (nxtidx == idxdeq) {
            curidx = -1;
            break;
        }

        // store back adjusted index
        idxenq = nxtidx;
    } while (0);

    return curidx;
}

void *
alarm_reader(void *arg)
{
    alarm_t dat = { 0 };

    while (1) {
        if (next_alarm_read(&dat))
            break;
        printf("read alarm id %d, arg1 %d,arg2 %d\n",
            dat.alarmid, dat.arg1, dat.arg2);
    }

    printf("alarm_reader exit!\n");

    return (void *) 0;
}

void
alarm_add(int id, int arg1, int arg2)
{
    int curidx;
    alarm_t *rol;

    pthread_mutex_lock(&mutex);

    while (1) {
        curidx = queue_notfull();

        // have an open slot -- store item into it
        if (curidx >= 0) {
            rol = &roller[curidx];

            rol->timestamp = time(NULL);
            rol->alarmid = id;
            rol->arg1 = arg1;
            rol->arg2 = arg2;

            printf("added id %d, arg1 %d, arg2 %d @%d\n",
                rol->alarmid, rol->arg1, rol->arg2, curidx);

            // unblock reader if necessary
            if (need_notempty) {
                DBG("writer signal notempty");
                need_notempty = 0;
                pthread_cond_signal(&cv_notempty);
            }

            break;
        }

        // queue is full -- wait for reader to free up some space
        DBG("writer need_notfull");
        need_notfull = 1;
        pthread_cond_wait(&cv_notfull,&mutex);
        DBG("writer wakeup");
    }

    pthread_mutex_unlock(&mutex);
}

// RETURNS: 1=stop, 0=normal
int
next_alarm_read(alarm_t *res)
{
    //static long prev_time = 0;
    int curidx;
    alarm_t *rol;
    int stopflg = 0;

    pthread_mutex_lock(&mutex);

    while (1) {
        curidx = queue_notempty();

        // queue has an entry -- process it
        if (curidx >= 0) {
            rol = &roller[curidx];

            printf("next_alarm_read() reading @%d\n", curidx);
            *res = *rol;
            //prev_time = rol->timestamp;

            // if writer is waiting/blocking, wake it up because we just
            // freed up a queue slot
            if (need_notfull) {
                DBG("reader signal notfull");
                need_notfull = 0;
                pthread_cond_signal(&cv_notfull);
            }

            break;
        }

        // stop when master has enqueued everything
        stopflg = stopall;
        if (stopflg)
            break;

        // queue is empty -- we must wait for writer to add something
        DBG("reader need_notempty");
        need_notempty = 1;
        pthread_cond_wait(&cv_notempty,&mutex);
    }

    pthread_mutex_unlock(&mutex);

    return stopflg;
}

Обновлено:

I don't understand the do while(0); "loops" in the two Q functions, can you elaboratea little, please?

do while(0) - это метод, который я часто использую для замены релейной логики if / else. Я не изобретал его [это обсуждается в некоторых руководствах по стилю, в частности, в «Code Complete»], но многим людям, которым я это показал, он, похоже, понравился. См. Мой ответ: Об исключительности случаев блока if для лучшего объяснения.

And I guessx what my initrial post didn't include is: the master should be able to enqueue things on an ongoing basis, there's no stopall and the reader should start reading as soon as something is available.

На самом деле, я, сделал, понимаю, что и код, который я опубликовал, позволяет это.

Вы можете отправить pthread_createперед в очередь любые сообщения, чтобы предотвратить потенциальную взаимоблокировку, о которой я упоминал в комментариях к коду.

A fix for this would be to remove stopall, the pthread_cond-signal() (from main) is already done inside alarm_add() so this should work fine.

stopall - это нет для синхронизации от переполнения / потери значимости. Это просто, если писатель (основной поток) хочет, чтобы получатель / поток закончил и остановился чисто. Это больше похоже на способ отправить читателю условие «EOF».

Если ваше приложение должно работать «вечно», вы можете удалить stopall.

Или более чистый способ сигнализировать «EOF»: основной поток может поставить в очередь специальное «стоп-сообщение» (например, сообщение с отметкой времени -1), чтобы сообщить получателю, что больше сообщений не будет отправляться Когда-либо, и мы хотим завершить программа.


Я предлагаю вам добавить «диагностический режим» для проверки вашей программы:

Попросите main выполнить pthread_create, а затем выполните:

    for (i = 1; i < 10000000; i++) {
        alarm_add(LOS_ACT, i, 0);
    }

Читатель должен проверить входящие значения arg1. Они должны приращение, как указано выше. В противном случае возникает логическая ошибка или состояние гонки.


Вот обновленная версия моего кода с опцией -D для режима диагностики / модульного тестирования. Обратите внимание, что вся печать отключена, чтобы позволить ей работать с экстремальной скоростью:

#include <stdio.h>
#include <time.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
#include <unistd.h>

#define NUM_ALM 5
#define ERROR   -1
#define OK      0

int opt_diag;
double tvzero;

//even IDs = alarm active
//odd IDs  = alarm clear
enum alarmid {
    BFD_ACT = 0x02,
    BFD_CLR = 0x03,
    LOS_ACT = 0x0C
};

typedef struct alarm_s {
    long timestamp;
    int alarmid;
    int arg1;
    int arg2;
} alarm_t;

void alarm_add(int id, int arg1, int arg2);
int next_alarm_read(alarm_t * res);
void *alarm_reader(void *arg);

static alarm_t *roller;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;

// reader variables
pthread_cond_t cv_notempty;             // writer signals when queue not empty
volatile int need_notempty;             // reader sets this before waiting
volatile int idxdeq;                    // reader's queue index

// writer variables
pthread_cond_t cv_notfull;              // reader signals when queue not full
volatile int need_notfull;              // writer sets this before waiting
volatile int idxenq;                    // writer's queue index

volatile int stopall;

double
tvgetf(void)
{
    struct timespec ts;
    double sec;

    clock_gettime(CLOCK_REALTIME,&ts);

    sec = ts.tv_nsec;
    sec /= 1e9;
    sec += ts.tv_sec;

    sec -= tvzero;

    return sec;
}

#define prtf(_fmt...) \
    do { \
        if (opt_diag) \
            break; \
        printf(_fmt); \
    } while (0)

#define DBG(_reason) \
    dbg(_reason)

void
dbg(const char *reason)
{
    double tvnow;

    if (! opt_diag) {
        tvnow = tvgetf();
        printf("[%.9f] %s\n",tvnow,reason);
    }
}

int
main(int argc,char **argv)
{
    int i = 0;
    char *cp;
    pthread_t reader;
    int ret;

    --argc;
    ++argv;

    for (;  argc > 0;  --argc, ++argv) {
        cp = *argv;
        if (*cp != '-')
            break;

        switch (cp[1]) {
        case 'D':
            cp += 2;
            opt_diag = (*cp != 0) ? atoi(cp) : 10000000;
            break;
        }
    }

    tvzero = tvgetf();

    roller = calloc(NUM_ALM, sizeof(alarm_t));
    printf("allocated memory: %lukB\n", (sizeof(alarm_t) * NUM_ALM) / 1024);

    // NOTE: queuing more than a full queue here will cause writer to block
    // forever because reader is not yet started
    if (! opt_diag) {
        for (i = 1; i < NUM_ALM; i++) {
            alarm_add(LOS_ACT, i, 0);
        }
    }

    ret = pthread_create(&reader, NULL, alarm_reader, NULL);
    if (ret) {
        printf("Error - pthread_create() return code: %d\n", ret);
        return ERROR;
    }

#if 0
    sleep(1);
#endif

    if (opt_diag) {
        for (i = 1; i < opt_diag; i++) {
            alarm_add(LOS_ACT, i, 0);
        }
    }
    else {
        alarm_add(BFD_ACT, 8, 0);
        alarm_add(BFD_ACT, 8, 0);
        alarm_add(BFD_ACT, 8, 0);
        alarm_add(BFD_ACT, 8, 0);
        alarm_add(BFD_CLR, 8, 0);
        alarm_add(BFD_CLR, 8, 0);
        alarm_add(BFD_CLR, 8, 0);
        alarm_add(BFD_CLR, 8, 0);
        alarm_add(BFD_ACT, 8, 0);
    }

    // tell reader that all items are queued and it should stop when it
    // processes the final item
    pthread_mutex_lock(&mutex);
    stopall = 1;
    if (need_notempty)
        pthread_cond_signal(&cv_notempty);
    pthread_mutex_unlock(&mutex);

    pthread_join(reader, NULL);

    return 0;
}

// RETURNS: queue index to process (-1=empty)
int
queue_notempty(void)
{
    int curidx;

    do {
        curidx = idxdeq;

        // queue is empty
        if (curidx == idxenq) {
            curidx = -1;
            break;
        }

        // advance dequeue index
        idxdeq += 1;
        idxdeq %= NUM_ALM;
    } while (0);

    return curidx;
}

// RETURNS: queue index to use (-1=full)
int
queue_notfull(void)
{
    int nxtidx;
    int curidx;

    do {
        // get current index
        curidx = idxenq;

        // advance to next slot (wrapping if necessary)
        nxtidx = curidx;
        nxtidx += 1;
        nxtidx %= NUM_ALM;

        // queue is full
        if (nxtidx == idxdeq) {
            curidx = -1;
            break;
        }

        // store back adjusted index
        idxenq = nxtidx;
    } while (0);

    return curidx;
}

void *
alarm_reader(void *arg)
{
    alarm_t dat = { 0 };
    static int expval = 1;

    while (1) {
        if (next_alarm_read(&dat))
            break;

        if (opt_diag) {
            if (dat.arg1 != expval) {
                printf("expected: %d got %d\n",expval,dat.arg1);
                exit(1);
            }
            ++expval;
        }

        prtf("read alarm id %d, arg1 %d,arg2 %d\n",
            dat.alarmid, dat.arg1, dat.arg2);
    }

    printf("alarm_reader exit!\n");

    return (void *) 0;
}

void
alarm_add(int id, int arg1, int arg2)
{
    int curidx;
    alarm_t *rol;

    pthread_mutex_lock(&mutex);

    while (1) {
        curidx = queue_notfull();

        // have an open slot -- store item into it
        if (curidx >= 0) {
            rol = &roller[curidx];

            rol->timestamp = time(NULL);
            rol->alarmid = id;
            rol->arg1 = arg1;
            rol->arg2 = arg2;

            prtf("added id %d, arg1 %d, arg2 %d @%d\n",
                rol->alarmid, rol->arg1, rol->arg2, curidx);

            // unblock reader if necessary
            if (need_notempty) {
                DBG("writer signal notempty");
                need_notempty = 0;
                pthread_cond_signal(&cv_notempty);
            }

            break;
        }

        // queue is full -- wait for reader to free up some space
        DBG("writer need_notfull");
        need_notfull = 1;
        pthread_cond_wait(&cv_notfull,&mutex);
        DBG("writer wakeup");
    }

    pthread_mutex_unlock(&mutex);
}

// RETURNS: 1=stop, 0=normal
int
next_alarm_read(alarm_t *res)
{
    //static long prev_time = 0;
    int curidx;
    alarm_t *rol;
    int stopflg = 0;

    pthread_mutex_lock(&mutex);

    while (1) {
        curidx = queue_notempty();

        // queue has an entry -- process it
        if (curidx >= 0) {
            rol = &roller[curidx];

            prtf("next_alarm_read() reading @%d\n", curidx);
            *res = *rol;
            //prev_time = rol->timestamp;

            // if writer is waiting/blocking, wake it up because we just
            // freed up a queue slot
            if (need_notfull) {
                DBG("reader signal notfull");
                need_notfull = 0;
                pthread_cond_signal(&cv_notfull);
            }

            break;
        }

        // stop when master has enqueued everything
        stopflg = stopall;
        if (stopflg)
            break;

        // queue is empty -- we must wait for writer to add something
        DBG("reader need_notempty");
        need_notempty = 1;
        pthread_cond_wait(&cv_notempty,&mutex);
    }

    pthread_mutex_unlock(&mutex);

    return stopflg;
}

Вот версия вашего исходного кода с добавленной опцией диагностики:

#include <stdio.h>
#include <time.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
#include <unistd.h>

int opt_diag;

#define NUM_ALM 5
#define ERROR   -1
#define OK      0

//even IDs = alarm active
//odd IDs  = alarm clear
enum alarmid {
    BFD_ACT = 0x02,
    BFD_CLR = 0x03,
    LOS_ACT = 0x0C
};
typedef struct alarm_s {
    long timestamp;
    int alarmid;
    int arg1;
    int arg2;
} alarm_t;
int alarm_add(int id, int arg1, int arg2);
int next_alarm_read(alarm_t * res);
void *alarm_reader(void *arg);

static alarm_t *roller;
pthread_cond_t cv;
pthread_mutex_t mutex;

#define prtf(_fmt...) \
    do { \
        if (opt_diag) \
            break; \
        printf(_fmt); \
    } while (0)

int
main(int argc,char **argv)
{
    int i = 0;
    char *cp;
    pthread_t reader;
    int ret;

    --argc;
    ++argv;

    for (;  argc > 0;  --argc, ++argv) {
        cp = *argv;
        if (*cp != '-')
            break;

        switch (cp[1]) {
        case 'D':
            cp += 2;
            opt_diag = (*cp != 0) ? atoi(cp) : 10000000;
            break;
        }
    }

    roller = calloc(NUM_ALM, sizeof(alarm_t));
    printf("allocated memory: %lukB\n", (sizeof(alarm_t) * NUM_ALM) / 1024);

    if (! opt_diag) {
        for (i = 1; i < NUM_ALM; i++) {
            alarm_add(LOS_ACT, i, 0);
        }
    }

    ret = pthread_create(&reader, NULL, alarm_reader, NULL);
    if (ret) {
        printf("Error - pthread_create() return code: %d\n", ret);
        return ERROR;
    }

    if (opt_diag) {
        for (i = 1; i < opt_diag; i++) {
            alarm_add(LOS_ACT, i, 0);
        }
    }
    else {
        sleep(1);
        alarm_add(BFD_ACT, 8, 0);
        alarm_add(BFD_ACT, 8, 0);
        alarm_add(BFD_ACT, 8, 0);
        alarm_add(BFD_ACT, 8, 0);
        alarm_add(BFD_CLR, 8, 0);
        alarm_add(BFD_CLR, 8, 0);
        alarm_add(BFD_CLR, 8, 0);
        alarm_add(BFD_CLR, 8, 0);
        alarm_add(BFD_ACT, 8, 0);
    }

    pthread_join(reader, NULL);
}

void *
alarm_reader(void *arg)
{
    static alarm_t dat = { 0 };
    int expval = 1;
    int err = 0;

    while (err <= 2) {
        if (next_alarm_read(&dat) == OK) {
            prtf("read alarm id %d, arg1 %d,arg2 %d\n", dat.alarmid, dat.arg1, dat.arg2);
            if (opt_diag) {
                if (dat.arg1 != expval) {
                    printf("expected: %d got %d\n",expval,dat.arg1);
                    exit(1);
                }
                ++expval;
            }
        }
        else {
            prtf("alarm_reader() next_alarm_read() returned ERROR, wait\n");
            pthread_mutex_lock(&mutex);
            pthread_cond_wait(&cv, &mutex);
            pthread_mutex_unlock(&mutex);

            err++;
        }
    }
    printf("alarm_reader exit!\n");

    return (void *) 0;
}

int
alarm_add(int id, int arg1, int arg2)
{
    static int i = 0;
    alarm_t dat = { 0 };
    if (i < NUM_ALM) {
        dat.timestamp = time(NULL);
        dat.alarmid = id;
        dat.arg1 = arg1;
        dat.arg2 = arg2;

        if (&roller[i]) {
            memcpy(&roller[i], &dat, sizeof(alarm_t));
            if (i + 1 < NUM_ALM)
                roller[i + 1].alarmid = 0;
            else
                roller[0].alarmid = 0;
            pthread_cond_signal(&cv);
            prtf("added id %d, arg1 %d, arg2 %d @%d\n", roller[i].alarmid, roller[i].arg1, roller[i].arg2, i);
            i++;
        }
    }
    else {
        i = 0;
    }
    return 0;
}

int
next_alarm_read(alarm_t * res)
{
    static int i = 0;
    //static long prev_time = 0;

    if (!res)
        return ERROR;

    if (i < NUM_ALM) {
        if (roller[i].alarmid != 0) {
            prtf("next_alarm_read() reading @%d\n", i);
            res->timestamp = roller[i].timestamp;
            res->alarmid = roller[i].alarmid;
            res->arg1 = roller[i].arg1;
            res->arg2 = roller[i].arg2;
            //prev_time = roller[i].timestamp;
            i++;
        }
        else {
            prtf("next_alarm_read() @%d is %d,return ERROR\n", i, roller[i].alarmid);

            return ERROR;
        }
    }
    else {
        i = 0;
    }
    return OK;
}

Ух ты! Большое спасибо! Позвольте мне немного поразмышлять над этим! Первое, что я нашел, это: PTHREAD_MUTEX_INITIALIZER - я не знаю об этом, круто!

stdcerr 18.06.2018 18:11

Я не понимаю "петель" do while(0); в двух функциях Q, не могли бы вы немного уточнить, пожалуйста?

stdcerr 18.06.2018 18:34

И я думаю, что в моем первоначальном посте не было: мастер должен иметь возможность ставить вещи в очередь на постоянной основе, нет stopall, и читатель должен начать читать, как только что-то станет доступным. Исправление для этого - удалить stopall, pthread_cond-signal() (из основного) уже сделан внутри alarm_add(), так что это должно работать нормально.

stdcerr 18.06.2018 18:59

do while(0) - это метод, который я часто использую для замены релейной логики if/else. Я не изобретал это, но многим людям, которым я показал это, кажется, это нравится. См. Мой ответ: stackoverflow.com/a/50685506/5382650 для лучшего объяснения

Craig Estey 18.06.2018 23:14

Другие вопросы по теме