Я хочу иметь функцию, которая получает Task<bool> и запускает ее в задачах X.
Для этого я написал следующий код:
public static class RetryComponent
{
public static async Task RunTasks(Func<Task<bool>> action, int tasks, int retries, string method)
{
// Running everything
var tasksPool = Enumerable.Range(0, tasks).Select(i => DoWithRetries(action, retries, method)).ToArray();
await Task.WhenAll(tasksPool);
}
private static async Task<bool> DoWithRetries(Func<Task<bool>> action, int retryCount, string method)
{
while (true)
{
if (retryCount <= 0)
return false;
try
{
bool res = await action();
if (res)
return true;
}
catch (Exception e)
{
// Log it
}
retryCount--;
await Task.Delay(200); // retry in 200
}
}
}
И следующий код выполнения:
BlockingCollection<int> ints = new BlockingCollection<int>();
foreach (int i in Enumerable.Range(0, 100000))
{
ints.Add(i);
}
ints.CompleteAdding();
int taskId = 0;
var enumerable = new AsyncEnumerable<int>(async yield =>
{
await RetryComponent.RunTasks(async () =>
{
try
{
int myTaskId = Interlocked.Increment(ref taskId);
// usually there are async/await operations inside the while loop, this is just an example
while (!ints.IsCompleted)
{
int number = ints.Take();
Console.WriteLine($"Task {myTaskId}: {number}");
await yield.ReturnAsync(number);
}
}
catch (InvalidOperationException)
{
return true;
}
catch (Exception e)
{
Console.WriteLine(e);
throw;
}
return true;
}, 10, 1, MethodBase.GetCurrentMethod().Name);
});
await enumerable.ForEachAsync(number =>
{
Console.WriteLine(number);
});
где AsyncEnumerable происходит от System.Collections.Async.
В консоли отображается Task 10: X (где x — число в списке..).
Когда я удаляю AsyncEnumerable, все работает как задумано (все задачи печатаются и выполнение заканчивается)..
По какой-то причине, которую я не могу найти в течение длительного времени, использование AsyncEnumerable просто все портит (в моем основном коде мне нужно использовать AsyncEnumerable... масштабируемость...), что означает, что код никогда не останавливается и только последняя задача (10 ) печатает. когда я добавил больше журналов, я вижу, что задачи 1-9 никогда не заканчиваются.
Итак, чтобы прояснить ситуацию, я хочу, чтобы несколько задач выполняли асинхронные операции и выдавали результаты одному объекту AsyncEnumerable, который действует как канал. (это была идея..)
потому что, когда его нет в AsyncEnumerable, все работает по назначению, и все задачи печатаются (а не только задача 10). Кроме того, эта задача не закончится, потому что задачи 1-9 никогда не заканчиваются по причине, которую я не знаю
Итак, при использовании AsyncEnumerable он печатает только Task 10: {number}? Пожалуйста, помните, что формулируя вопросы, только вы понимаете контекст, стоящий за этим, лучше всего описать проблему и ожидаемое/предполагаемое решение, чтобы помочь нам понять.
Я думаю, что сделал это более ясным сейчас.
@Selvin, да, я написал параллель по ошибке, я не это имел в виду
Все еще не очень понимаю, каков ожидаемый результат, следует ли перейти к Task 11 и Task 12? Вы указываете 10 задач для запуска, но ваш BlockingCollection имеет 100 000 элементов, внутри которых полностью обрабатывается ваш код, последняя задача выбирает все остальные элементы, полученные из ints. Можете ли вы привести примеры того, как это выглядит с неасинхронным перечислением и каково желаемое поведение?
Все 10 задач должны отображать числа. Это довольно прямолинейно из кода. Задача просто берет число и печатает его. Проблема в том, что когда я использую AsyncEnumerable, задачи 1-9 просто ничего не отображают. Как будто они сброшены. также выполнение не остановится, как будто задачи все еще выполняются, но ничего не делают
Поэтому я запускал код локально, задачи 1-9 находятся в самом верху командного окна.
Хорошо. Так почему же только задание 10 печатает результаты? Потому что, когда вы не используете IAsyncEnumerable, они также будут печатать результаты
Прошу прощения, я прокомментировал это сегодня во время обеденного перерыва. Посмотрев, я все еще озадачен тем, чего именно вы пытаетесь достичь с помощью различных фрагментов логики. Из локального запуска я вижу, что ваши первые 9 задач заблокированы, а 10 подхватывают остальные. Вы пытаетесь создать очередь pub-sub?
@ColinM представьте себе asyncEnumerable как канал, а задачи внутри как производители. они производят и отправляют данные, которые они делают, через конвейер. Проблема в том, что если вы запускаете программу, работает только последняя задача, а другая не завершается. если вы прокомментируете обертку «var enumerable = ...", вы увидите, что все задачи работают, как задумано, и программа завершает выполнение. с оберткой на asyncEnumerable (канал) работает только последняя задача, а остальные никогда не заканчиваются
Я не знаком с AsyncEnumerable, спасибо за описание контекста его выполнения. Что я обнаружил с этим кодом, так это то, что когда я вместо этого использовал foreach на ints без использования Take, он мог правильно выполнять итерацию задач, что заставило меня поверить, что использование BlockingCollection и Take блокирует / блокирует ваш первый задачи, пока они ждут, пока очередь снова станет доступной. Код также никогда не проходит дальше блока ForEachAsync.
@ColinM как? я изменил его, чтобы взять элемент из списка (вместо blockingCollection) и те же результаты.
Console.WriteLine после yield.ReturnAsync, чтобы он печатал Task {taskId}: {number} только после выполнения задания.





Проблема в том, что шаблон перечислителя/генератора является последовательным, но вы пытаетесь создать шаблон с несколькими производителями и одним потребителем. Поскольку вы используете вложенные анонимные функции, а переполнение стека не показывает номера строк, сложно точно описать, на какую часть кода я ссылаюсь, но я все равно попробую.
Принцип работы AsyncEnumerable заключается в том, чтобы дождаться, пока производитель произведет значение, затем дождаться, пока потребитель использует это значение, а затем повторить. Он не поддерживает производитель и потребитель, работающие с разной скоростью, поэтому я говорю, что этот шаблон является последовательным. У него нет очереди произведенных предметов, только текущее значение. ReturnAsync не ждет, чтобы потребитель использовал значение, вместо этого вы должны ждать задачи, которую он возвращает, что дает вам сигнал о том, что она готова. Поэтому мы можем сделать вывод, что это не потокобезопасно.
Однако RetryComponent.RunTasks выполняет 10 задач параллельно, и этот код вызывает yield.ReturnAsync, не проверяя, не вызывал ли его кто-либо еще, и если да, то завершена ли эта задача. Поскольку класс Yield хранит только текущее значение, ваши 10 одновременных задач перезаписывают текущее значение, не дожидаясь готовности объекта Yield для нового значения, поэтому 9 задач теряются и никогда не ожидаются. Поскольку эти 9 задач никогда не ожидаются, методы никогда не завершатся, и Task.WhenAll никогда не вернется, как и другие методы во всем стеке вызовов.
Я создал вопрос на github предлагает им улучшить свою библиотеку, чтобы в таких случаях возникали исключения. Если они это реализуют, ваш блок catch выведет сообщение на консоль и повторно выдаст ошибку, поставив задачу в состояние сбоя, что позволит task.WhenAll завершиться и, следовательно, ваша программа не зависнет.
Вы можете использовать многопоточные API-интерфейсы синхронизации, чтобы гарантировать, что только одна задача за раз вызывает yield.ReturnAsync и ожидает возврата задачи. Или вы можете избежать использования шаблона с несколькими производителями, поскольку один производитель может легко быть перечислителем. В противном случае вам придется полностью переосмыслить, как вы хотите реализовать шаблон с несколькими производителями. Я предлагаю Поток данных TPL, который встроен в .NET Core и доступен в .NET Framework в виде пакета NuGet.
я, черт возьми, знал, что с этим что-то не так.. я проверял свой код больше раз, чем могу сосчитать..
@zivkan абсолютно прав насчет последовательного шаблона производителя. Если вы хотите иметь параллельных производителей для одного потока, это все еще можно реализовать с помощью библиотеки AsyncEnumerable, но для этого потребуется дополнительный код.
Вот пример решения проблемы с параллельными производителями и потребителями (в данном случае только с одним потребителем):
static void Main(string[] args)
{
var e = new AsyncEnumerable<int>(async yield =>
{
var threadCount = 10;
var maxItemsOnQueue = 20;
var queue = new ConcurrentQueue<int>();
var consumerLimiter = new SemaphoreSlim(initialCount: 0, maxCount: maxItemsOnQueue + 1);
var produceLimiter = new SemaphoreSlim(initialCount: maxItemsOnQueue, maxCount: maxItemsOnQueue);
// Kick off producers
var producerTasks = Enumerable.Range(0, threadCount)
.Select(index => Task.Run(() => ProduceAsync(queue, produceLimiter, consumerLimiter)));
// When production ends, send a termination signal to the consumer.
var endOfProductionTask = Task.WhenAll(producerTasks).ContinueWith(_ => consumerLimiter.Release());
// The consumer loop.
while (true)
{
// Wait for an item to be produced, or a signal for the end of production.
await consumerLimiter.WaitAsync();
// Get a produced item.
if (queue.TryDequeue(out var item))
{
// Tell producers that they can keep producing.
produceLimiter.Release();
// Yield a produced item.
await yield.ReturnAsync(item);
}
else
{
// If the queue is empty, the production is over.
break;
}
}
});
e.ForEachAsync((item, index) => Console.WriteLine($"{index + 1}: {item}")).Wait();
}
static async Task ProduceAsync(ConcurrentQueue<int> queue, SemaphoreSlim produceLimiter, SemaphoreSlim consumerLimiter)
{
var rnd = new Random();
for (var i = 0; i < 10; i++)
{
await Task.Delay(10);
var value = rnd.Next();
await produceLimiter.WaitAsync(); // Wait for the next production slot
queue.Enqueue(value); // Produce item on the queue
consumerLimiter.Release(); // Notify the consumer
}
}
Как это все портит?