Синхронизируйте три потока в C++ один за другим

Это продолжение действительно хорошего: https://stackoverflow.com/a/33500554/22598822. Этот пост был посвящен последовательности (t1 и t2) -> t3. Допустим, у меня запущена программа, и есть три потока, которые я хочу сохранить во внутренней последовательности выполнения t1 --> t2 --> t3 (другие потоки могут выполняться или не выполняться в системе одновременно). Как это можно сделать?

Я взял код из ссылки выше и попытался переработать его для этой цели, но безуспешно. Моя реализация неверна, я привожу ее здесь для минимального воспроизводимого примера и, возможно, в качестве возможной отправной точки.

Заголовок.ч:

#include<thread>
#include<mutex>
#include<iostream>
#include <condition_variable>

Мультикласс.h

#include "Header.h"
#include "SynchObj.h"

class MultiClass {
public:
    void Run() {
        std::thread t1(&MultiClass::Calc1, this);
        std::thread t2(&MultiClass::Calc2, this);
        std::thread t3(&MultiClass::Calc3, this);
        t1.join();
        t2.join();
        t3.join();
    }
private:
    SyncObj obj;
    void Calc1() {
        for (int i = 0; i < 10; ++i) {
            obj.waitForCompletionOfT3();
            std::cout << "T1:" << i << std::endl;
            obj.signalCompletionOfT1();
        }           
    }
    void Calc2() {
        for (int i = 0; i < 10; ++i) {
            obj.waitForCompletionOfT1();
            std::cout << "T2:" << i << std::endl;
            obj.signalCompletionOfT2();
        }
    }
    void Calc3() {      
        for (int i = 0; i < 10; ++i) {
            obj.waitForCompletionOfT2();
            std::cout << "T3:" << i << std::endl;
            obj.signalCompletionOfT3();
        }       
    }
};

SynchObj.h

#include "Header.h"

class SyncObj {
    std::mutex mux;
    std::condition_variable cv;  
    bool completed[3]{ false, false, false };

public:

    /***** Original (t1 & t2) --> t3 *****/
    /*
    void signalCompetionT1T2(int id) {
        std::lock_guard<std::mutex> ul(mux);
        completed[id] = true;
        cv.notify_all();
    }
    void signalCompetionT3() {
        std::lock_guard<std::mutex> ul(mux);
        completed[0] = false;
        completed[1] = false;
        cv.notify_all();
    }
    void waitForCompetionT1T2() {
        std::unique_lock<std::mutex> ul(mux);             
        cv.wait(ul, [&]() {return completed[0] && completed[1]; });         
    }
    void waitForCompetionT3(int id) {
        std::unique_lock<std::mutex> ul(mux);         
        cv.wait(ul, [&]() {return !completed[id]; });           
    }
    */       
    /***********************************/
    
    /*** Unsuccessful attempt at t1 --> t2 --> t3 ***/

    void signalCompletionOfT1() {
        std::lock_guard<std::mutex> ul(mux);
        completed[0] = true;
        cv.notify_all();
    }

    void signalCompletionOfT2() {
        std::lock_guard<std::mutex> ul(mux);
        completed[0] = false;
        completed[1] = true;
        cv.notify_all();
    }

    void signalCompletionOfT3() {
        std::lock_guard<std::mutex> ul(mux);
        completed[0] = false;
        completed[1] = false;
        completed[2] = true;
        cv.notify_all();
    }

   void waitForCompletionOfT1() {
        std::unique_lock<std::mutex> ul(mux);             
        cv.wait(ul, [&]() {return !completed[2]; });         
    }

    void waitForCompletionOfT2() {
        std::unique_lock<std::mutex> ul(mux);             
        cv.wait(ul, [&]() {return !completed[0]; });         
    }

    void waitForCompletionOfT3() {
        std::unique_lock<std::mutex> ul(mux);         
        cv.wait(ul, [&]() {return !completed[1]; });           
    }

};

Источник.cpp:

#include "Header.h"
#include "MultiClass.h"

int main() {
    MultiClass m;
    m.Run();
    return 0;
}

Возможный результат №1 (желаемый):

T1:1
T1:2
T1:3
T1:4
T1:5
T1:6
T1:7
T1:8
T1:9
T2:0
T2:1
T2:2
T2:3
T2:4
T2:5
T2:6
T2:7
T2:8
T2:9
T3:0
T3:1
T3:2
T3:3
T3:4
T3:5
T3:6
T3:7
T3:8
T3:9

Возможный результат №2:

0
T2:1
T2:2
T2:3
T2:4
T2:5
T2:6
T2:7
T2:8
T2:9

Обычно, когда мне приходится последовательно выполнять многопоточную обработку данных, я использую кольцевые/циклические буферы для передачи данных из одного потока в другой. Буферы позволяют вам проверить, есть ли там данные для обработки или нет, тем самым предоставляя вам механизм синхронизации. Однако убедитесь, что доступ к памяти защищен, чтобы избежать состояний гонки. Кажется, у Boost есть библиотека, которая делает это: boost.org/doc/libs/1_45_0/libs/circular_buffer/doc/…

Valdez 26.07.2024 19:22

Похоже, вам вообще не нужна многопоточность, так зачем же усложнять ситуацию?

user4581301 26.07.2024 19:26

Если вы думаете о задачах, которые хотите запланировать, а не о потоках, для которых вы хотите их запланировать, у вас есть три строго последовательных шага, и вы спрашиваете, как добавить некоторую стоимость синхронизации/переключения контекста между ними. Как и просили, это ни к чему не приводит...

Useless 26.07.2024 19:38

@user4581301 user4581301 Я понимаю, почему ты это говоришь, попробую уточнить. Возможно, я что-то упускаю. У меня получилось 3 функции: первая редактирует текстовый файл. Второй использует system() для запуска исполняемого файла, который принимает этот текстовый файл в качестве входных данных. Третий считывает данные, созданные этим исполняемым файлом. Я пытался просто запустить 3 строки кода одну за другой, не получилось. Я предположил, что последовательная согласованность не сохраняется (?). Я решил запустить их как потоки, что позволит мне определять порядок выполнения.

barak 26.07.2024 19:42

@barak - судя по вашему описанию задачи, нет причин для отдельных тем. Опубликуйте код, который «не работает», и опишите, что он делает. «Сделай это, затем сделай это, затем сделай что-нибудь еще» по своей сути является однопоточным.

Pete Becker 26.07.2024 19:51

«Это не сработало» — это бесполезная информация. Вам нужно расследовать, что же произошло на самом деле. Когда вы поймете, ПОЧЕМУ «это не сработало[,]», вы сможете добиться прогресса. Если вы не хотите, чтобы эти три шага выполнялись одновременно (и, возможно, пара других эзотерических возможностей), многопоточность — неправильный выбор. Вы склоняетесь к отладке с помощью дробовика, внося в код неопровержимые изменения для решения проблемы, и это обычно вызывает больше проблем, чем решает.

user4581301 26.07.2024 19:59

Да, это классика: «У меня было N проблем, поэтому я добавил потоки, и теперь у меня 1+N! недетерминированных проблем». Если вы не понимаете первоначальную проблему, темы не помогут.

Useless 26.07.2024 21:03

Предположение, что однопоточный пример выполняется не по порядку, является большим шагом вперед по сравнению с идеей о том, что ваши шаги выполняются неправильно.

sweenish 26.07.2024 21:21

Во-первых, я ценю ответы ваших ребят. Я был бы рад улучшить свои методы, поэтому я здесь и благодарю вас за комментарии. Что касается публикации кода, это проект из нескольких сотен строк кода, я потратил несколько дней, пытаясь предоставить на сайте минимальные воспроизводимые примеры, отражающие мою проблему. Я думаю, что на данный момент это достигло конечной точки. Я бы предпочел не ссылаться на свой github, если он вообще адекватен. Правда, «не работает» никого никуда не ведет. Я принимаю эти комментарии. Далее я исследую, почему мой код работает неправильно, отлаживая версию без потоков.

barak 26.07.2024 21:46

Обычно ссылка на GitHub ни к чему не приведет. Слишком мало людей нажмут на ссылку, чтобы помочь вам сейчас, а в будущем ссылка будет заблокирована брандмауэрами или чем-то еще, сгниет и не будет полезна будущим программистам. MRE – это правильный путь. Обычно сделать MRE проще, если код модульный. Если все, что вам нужно сделать для создания MRE, — это выбрать одну функцию и добавить main для ее вызова, у вас, вероятно, не возникнет особых проблем с выяснением того, что пошло не так, самостоятельно, но если вы это сделаете, кто-нибудь здесь будет способен помочь довольно быстро, если проблема не является тонкой.

user4581301 26.07.2024 21:56

Возможно, вам захочется пересмотреть способ организации операторов включения. Header.h включает в себя SyncObj.h. SyncObj.h включает Header.h. Во время компиляции в каждом файле будет несколько определений Header.h и SyncObj.h.

Jim Rogers 26.07.2024 21:57

Следуя комментариям, я отладил глубже. Обнаружена ошибка в коде моделирования с открытым исходным кодом, который принимает входные данные, запускается и выдает выходные данные. Неинициализированный буфер символов[2] приводил к неправильному использованию sscanf() всякий раз, когда второй байт инициализировался цифрой. Это приводило к случайной (~ 1 из 20) ошибке сегментации → симуляция запускалась, но получала SIGSEGV → новый выходной файл не был создан → существующий выходной файл был прочитан → что ошибочно казалось проблемой синхронизации в программе чтения . В заключение: я быстро заподозрил последовательную несогласованность из-за незначительной ошибки в коде, который, по моему мнению, работал.

barak 27.07.2024 17:19

Я оставляю модераторам право решать, останется ли этот пост. Я думаю, это должно

barak 27.07.2024 17:19
Стоит ли изучать PHP в 2023-2024 годах?
Стоит ли изучать PHP в 2023-2024 годах?
Привет всем, сегодня я хочу высказать свои соображения по поводу вопроса, который я уже много раз получал в своем сообществе: "Стоит ли изучать PHP в...
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
В JavaScript одним из самых запутанных понятий является поведение ключевого слова "this" в стрелочной и обычной функциях.
Приемы CSS-макетирования - floats и Flexbox
Приемы CSS-макетирования - floats и Flexbox
Здравствуйте, друзья-студенты! Готовы совершенствовать свои навыки веб-дизайна? Сегодня в нашем путешествии мы рассмотрим приемы CSS-верстки - в...
Тестирование функциональных ngrx-эффектов в Angular 16 с помощью Jest
В системе управления состояниями ngrx, совместимой с Angular 16, появились функциональные эффекты. Это здорово и делает код определенно легче для...
Концепция локализации и ее применение в приложениях React ⚡️
Концепция локализации и ее применение в приложениях React ⚡️
Локализация - это процесс адаптации приложения к различным языкам и культурным требованиям. Это позволяет пользователям получить опыт, соответствующий...
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
0
13
105
2
Перейти к ответу Данный вопрос помечен как решенный

Ответы 2

Ответ принят как подходящий

Использование семафоров было бы более простым для этого приложения. Следующая программа выводит возможный результат №1:

Скомпилировано с помощью g++ 12.2.1 с помощью командной строки: g++ -std=c++20 -pthread MultiClass.cpp.

Обновлено: как отмечали другие, было бы лучше делать этот однопоточный процесс.

Мультикласс.cpp

#include<thread>
#include<mutex>
#include<iostream>
#include<semaphore>

class MultiClass {
public:
    void Run() {
        std::thread t1(&MultiClass::Calc1, this);
        std::thread t2(&MultiClass::Calc2, this);
        std::thread t3(&MultiClass::Calc3, this);
        t1.join();
        t2.join();
        t3.join();
    }
private:
    std::binary_semaphore t1{1};
    std::binary_semaphore t2{0};
    std::binary_semaphore t3{0};

    void Calc1() {
        t1.acquire();
        for (int i = 0; i < 10; ++i) {
            std::cout << "T1:" << i << std::endl;
        }
        t2.release();
    }
    void Calc2() {
        t2.acquire();
        for (int i = 0; i < 10; ++i) {
            std::cout << "T2:" << i << std::endl;
        }
        t3.release();
    }
    void Calc3() {
        t3.acquire();
        for (int i = 0; i < 10; ++i) {
            std::cout << "T3:" << i << std::endl;
        }
    }
};


int main() {
    MultiClass m;
    m.Run();
    return 0;
}

Рекомендуйте внести изменения, чтобы было ясно, что это худшее решение по сравнению с сохранением последовательности этапов без использования потоков вообще.

user4581301 26.07.2024 23:23

Я принимаю ответ, потому что это рабочее решение поставленного мною вопроса. Это хорошее решение плохого вопроса :) См. комментарии выше и комментарий пользователя @user4581301 здесь.

barak 27.07.2024 17:19

Если вы хотите запускать одновременно и более одного раза, вам нужно подумать, как имитировать канал Unix/Linux, где выходные данные одного процесса становятся входными данными для следующего процесса. Когда канал пуст, процесс чтения приостанавливается. Когда канал заполнен, процесс записи приостанавливается. В многопоточности это разновидность модели производитель-потребитель.

Jim Rogers 28.07.2024 04:35

Это расширенный комментарий, а не ответ.

Ваш SyncObj класс мог бы быть проще. Экземпляр SyncObj имеет три возможных состояния, которые вы представляете с помощью трех отдельных переменных bool. В электротехнике этот шаблон известен как горячее кодирование. В разработке программного обеспечения (насколько мне известно) никто не дал этому названию, потому что почти никто его не использует.

Если экземпляр имеет три возможных состояния, самый простой способ представить это состояние — с помощью переменной int, которой можно присвоить значение 0, 1 или 2, или с помощью переменной enum, имеющей три возможных значения. В вашем случае вы всегда хотите, чтобы состояния развивались в одной и той же последовательности, поэтому числовое представление, вероятно, будет более естественным. Что-то вроде этого, например:

#include "Header.h"

constexpr int NUMPARTICIPANTS=3;

class SyncObj {
    std::mutex mux;
    std::condition_variable cv;  
    int whoseTurn=0;

public:

    void awaitMyTurn(int myID) {
        std::lock_guard<std::mutex> ul(mux);
        cv.wait(ul, [&]() {return whoseTurn == myID; });         
    }

    void signalNext() {
        std::lock_guard<std::mutex> ul(mux);
        whoseTurn = (whoseTurn + 1) % NUMPARTICIPANTS;
        cv.notify_all();
    }
};

Используйте это следующим образом:

    void Calc2() {
        for (int i = 0; i < 10; ++i) {
            obj.awaitMyTurn(2);
            std::cout << "T2:" << i << std::endl;
            obj.signalNext();
        }
    }

Вы могли бы придумать это, сделав NUMPARTICIPANTS аргументом конструктора вместо того, чтобы сделать его глобальной константой, выдав ошибку из awaitMyTurn, если заданный myID вышел за пределы и т. д.

Другие вопросы по теме