Проблема, которую я хочу решить, заключается в том, что я хочу запустить асинхронное действие из нескольких потоков, но разрешено выполнение только одного действия за раз.
Действие заключается в обмене данными с аппаратным устройством, и устройство может обрабатывать только один запрос за раз.
Одна из моих идей заключалась в том, чтобы синхронизировать это с System.Reactive.Subjects.Subject
. Несколько потоков могли вызывать OnNext
, и субъект должен был выполнять один запрос за другим. Я написал этот (возможно, очень наивный) код:
static void Main(string[] args)
{
var source = new System.Reactive.Subjects.Subject<Func<Task<int>>>();
source
// from http://code.fitness/post/2016/11/rx-selectmany-deep-dive.html
.SelectMany(async x => await x.Invoke())
.Subscribe(result => Console.WriteLine($"Work of index {result} completed"));
var noOfThreads = 3;
for (var i = 0; i < noOfThreads; i++)
{
var i1 = i;
var t = new Thread(() => source.OnNext(() => doWork(i1)));
t.Start();
}
Console.ReadKey();
}
static async Task<int> doWork(int index)
{
Console.WriteLine($"Start work {index}");
await Task.Delay(1000);
Console.WriteLine($"Stop work {index}");
return index;
}
Я надеялся, что вывод будет таким:
Start work 2
Stop work 2
Work of index 2 completed
Start work 0
Stop work 0
Work of index 0 completed
Вместо этого я получаю:
Start work 0
Start work 1
Start work 2
Stop work 1
Stop work 0
Work of index 1 completed
Work of index 0 completed
Stop work 2
Work of index 2 completed
Это показывает, что все действия запускаются с самого начала и не нужно ждать завершения других. Интересно, Reactive
это правильный способ сделать это или есть какой-то другой умный способ выполнить мою задачу.
Редактировать: чтобы дать больше справочной информации, зачем мне это нужно: Приложение взаимодействует с устройством. Это устройство имеет последовательный интерфейс и может обрабатывать только одну команду за раз. Итак, у меня есть поток, который постоянно получает обновления статуса, например:
while (true)
{
ReadPosition();
ReadTempereatures();
ReadErrors();
}
Затем есть пользовательский интерфейс, где пользователи могут инициировать какое-либо действие на устройстве. Мое текущее решение - это очередь, в которой я ставлю в очередь свои команды. Это работает, но мне было интересно, будет ли работать событийный подход.
Но почему? Я имею в виду ... решение может быть таким же простым, как изменение количества потоков материала Task
... но все же, почему вы хотите это сделать? Не могли бы вы предоставить более подробную информацию здесь, пожалуйста? Поскольку правильное решение полностью зависит от того, какова конкретная цель
Один поток используется для постоянного извлечения данных (информации о состоянии) с устройства, а другой поток — это пользовательский интерфейс, в котором пользователи могут запускать некоторые действия.
Но зачем вам те, которые точно обрабатываются по одному?
Спасибо за ваш вопрос! Ваш интерес очень приветствуется. Я написал правку, чтобы решить эту проблему.
Ну... краткий ответ - нет, вам нужно обработать этот сериал или (в качестве альтернативы) использовать мьютексы. Очередь, вероятно, является лучшим решением, поскольку она является наиболее надежной (только один поток, о котором нужно беспокоиться). Однако вы можете изменить это на что-то, что позволяет упростить создание (например, специальный объект, автоматически добавляющийся в очередь и выполняющий функцию)
Вы смешиваете Rx, Tasks и многопоточность. Неудивительно, что он сходит с рельсов. Выбирайте на подходе - Rx лучший, ИМХО - и все будет хорошо.
Вы смешиваете Rx, Tasks и многопоточность. Неудивительно, что он сходит с рельсов. Выберите один подход - Rx лучший, ИМХО - и все будет в порядке.
Достаточно ли этого:
static void Main(string[] args)
{
var source = new Subject<Func<int>>();
source
.Synchronize()
.Select(x => x())
.Subscribe(result => Console.WriteLine($"Work of index {result} completed"));
var noOfThreads = 3;
for (var i = 0; i < noOfThreads; i++)
{
var i1 = i;
var t = new Thread(() => source.OnNext(() => doWork(i1)));
t.Start();
}
Console.ReadLine();
}
static int doWork(int index)
{
Console.WriteLine($"Start work {index}");
Thread.Sleep(1000);
Console.WriteLine($"Stop work {index}");
return index;
}
Это дает:
Start work 0 Stop work 0 Work of index 0 completed Start work 2 Stop work 2 Work of index 2 completed Start work 1 Stop work 1 Work of index 1 completed
Ключевым моментом является вызов .Synchronize()
, чтобы привести все вызывающие потоки в соответствие с контрактом Rx.
Я знаю, что это большая путаница :). Результат грамотно то, что я ищу! Спасибо за совет с Synchronize()
. Мой код в doWork
представляет собой операцию асинхронного запроса, но я думаю, в этом случае я не выиграю, поэтому я мог бы выполнить рефакторинг для синхронной версии.
@ChristianMurschall Вместо рефакторинга просто подождите.
Спасибо за отличное предложение! Сейчас я использую BlockingCollection
, который немного лучше подходит для моего варианта использования.
Зачем вам писать многопоточный код, если вы хотите, чтобы это происходило последовательно? Конечно, вы можете добавить в свой цикл ожидание завершения потока.