Как выполнять функции в теме синхронно?

Проблема, которую я хочу решить, заключается в том, что я хочу запустить асинхронное действие из нескольких потоков, но разрешено выполнение только одного действия за раз.

Действие заключается в обмене данными с аппаратным устройством, и устройство может обрабатывать только один запрос за раз.

Одна из моих идей заключалась в том, чтобы синхронизировать это с System.Reactive.Subjects.Subject. Несколько потоков могли вызывать OnNext, и субъект должен был выполнять один запрос за другим. Я написал этот (возможно, очень наивный) код:

static void Main(string[] args)
{
    var source = new System.Reactive.Subjects.Subject<Func<Task<int>>>();

    source
        // from http://code.fitness/post/2016/11/rx-selectmany-deep-dive.html
        .SelectMany(async x => await x.Invoke()) 
        .Subscribe(result => Console.WriteLine($"Work of index {result} completed"));

    var noOfThreads = 3;
    for (var i = 0; i < noOfThreads; i++)
    {
        var i1 = i;
        var t = new Thread(() => source.OnNext(() => doWork(i1)));
        t.Start();
    }

    Console.ReadKey();
}

static async Task<int> doWork(int index)
{
    Console.WriteLine($"Start work {index}");
    await Task.Delay(1000);
    Console.WriteLine($"Stop work {index}");
    return index;
}

Я надеялся, что вывод будет таким:

Start work 2
Stop work 2
Work of index 2 completed
Start work 0
Stop work 0
Work of index 0 completed

Вместо этого я получаю:

Start work 0
Start work 1
Start work 2
Stop work 1
Stop work 0
Work of index 1 completed
Work of index 0 completed
Stop work 2
Work of index 2 completed

Это показывает, что все действия запускаются с самого начала и не нужно ждать завершения других. Интересно, Reactive это правильный способ сделать это или есть какой-то другой умный способ выполнить мою задачу.

Редактировать: чтобы дать больше справочной информации, зачем мне это нужно: Приложение взаимодействует с устройством. Это устройство имеет последовательный интерфейс и может обрабатывать только одну команду за раз. Итак, у меня есть поток, который постоянно получает обновления статуса, например:

while (true)
{
    ReadPosition();
    ReadTempereatures();
    ReadErrors();
}

Затем есть пользовательский интерфейс, где пользователи могут инициировать какое-либо действие на устройстве. Мое текущее решение - это очередь, в которой я ставлю в очередь свои команды. Это работает, но мне было интересно, будет ли работать событийный подход.

Зачем вам писать многопоточный код, если вы хотите, чтобы это происходило последовательно? Конечно, вы можете добавить в свой цикл ожидание завершения потока.

BugFinder 13.05.2019 11:03

Но почему? Я имею в виду ... решение может быть таким же простым, как изменение количества потоков материала Task ... но все же, почему вы хотите это сделать? Не могли бы вы предоставить более подробную информацию здесь, пожалуйста? Поскольку правильное решение полностью зависит от того, какова конкретная цель

X39 13.05.2019 11:04

Один поток используется для постоянного извлечения данных (информации о состоянии) с устройства, а другой поток — это пользовательский интерфейс, в котором пользователи могут запускать некоторые действия.

ChristianMurschall 13.05.2019 11:09

Но зачем вам те, которые точно обрабатываются по одному?

X39 13.05.2019 11:10

Спасибо за ваш вопрос! Ваш интерес очень приветствуется. Я написал правку, чтобы решить эту проблему.

ChristianMurschall 13.05.2019 11:23

Ну... краткий ответ - нет, вам нужно обработать этот сериал или (в качестве альтернативы) использовать мьютексы. Очередь, вероятно, является лучшим решением, поскольку она является наиболее надежной (только один поток, о котором нужно беспокоиться). Однако вы можете изменить это на что-то, что позволяет упростить создание (например, специальный объект, автоматически добавляющийся в очередь и выполняющий функцию)

X39 13.05.2019 11:32

Вы смешиваете Rx, Tasks и многопоточность. Неудивительно, что он сходит с рельсов. Выбирайте на подходе - Rx лучший, ИМХО - и все будет хорошо.

Enigmativity 13.05.2019 13:34
Стоит ли изучать PHP в 2023-2024 годах?
Стоит ли изучать PHP в 2023-2024 годах?
Привет всем, сегодня я хочу высказать свои соображения по поводу вопроса, который я уже много раз получал в своем сообществе: "Стоит ли изучать PHP в...
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
В JavaScript одним из самых запутанных понятий является поведение ключевого слова "this" в стрелочной и обычной функциях.
Приемы CSS-макетирования - floats и Flexbox
Приемы CSS-макетирования - floats и Flexbox
Здравствуйте, друзья-студенты! Готовы совершенствовать свои навыки веб-дизайна? Сегодня в нашем путешествии мы рассмотрим приемы CSS-верстки - в...
Тестирование функциональных ngrx-эффектов в Angular 16 с помощью Jest
В системе управления состояниями ngrx, совместимой с Angular 16, появились функциональные эффекты. Это здорово и делает код определенно легче для...
Концепция локализации и ее применение в приложениях React ⚡️
Концепция локализации и ее применение в приложениях React ⚡️
Локализация - это процесс адаптации приложения к различным языкам и культурным требованиям. Это позволяет пользователям получить опыт, соответствующий...
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
0
7
124
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Вы смешиваете Rx, Tasks и многопоточность. Неудивительно, что он сходит с рельсов. Выберите один подход - Rx лучший, ИМХО - и все будет в порядке.

Достаточно ли этого:

static void Main(string[] args)
{
    var source = new Subject<Func<int>>();

    source
        .Synchronize()
        .Select(x => x())
        .Subscribe(result => Console.WriteLine($"Work of index {result} completed"));

    var noOfThreads = 3;
    for (var i = 0; i < noOfThreads; i++)
    {
        var i1 = i;
        var t = new Thread(() => source.OnNext(() => doWork(i1)));
        t.Start();
    }

    Console.ReadLine();
}

static int doWork(int index)
{
    Console.WriteLine($"Start work {index}");
    Thread.Sleep(1000);
    Console.WriteLine($"Stop work {index}");
    return index;
}

Это дает:

Start work 0
Stop work 0
Work of index 0 completed
Start work 2
Stop work 2
Work of index 2 completed
Start work 1
Stop work 1
Work of index 1 completed

Ключевым моментом является вызов .Synchronize(), чтобы привести все вызывающие потоки в соответствие с контрактом Rx.

Я знаю, что это большая путаница :). Результат грамотно то, что я ищу! Спасибо за совет с Synchronize(). Мой код в doWork представляет собой операцию асинхронного запроса, но я думаю, в этом случае я не выиграю, поэтому я мог бы выполнить рефакторинг для синхронной версии.

ChristianMurschall 13.05.2019 14:36

@ChristianMurschall Вместо рефакторинга просто подождите.

Felix Keil 17.05.2019 08:36

Спасибо за отличное предложение! Сейчас я использую BlockingCollection, который немного лучше подходит для моего варианта использования.

ChristianMurschall 17.05.2019 13:18

Другие вопросы по теме