C# AsyncEnumerable, выполняющий/ожидающий несколько задач, никогда не завершается

Я хочу иметь функцию, которая получает Task<bool> и запускает ее в задачах X.

Для этого я написал следующий код:

public static class RetryComponent
{
    public static async Task RunTasks(Func<Task<bool>> action, int tasks, int retries, string method)
    {
        // Running everything
        var tasksPool = Enumerable.Range(0, tasks).Select(i => DoWithRetries(action, retries, method)).ToArray();
        await Task.WhenAll(tasksPool);
    }

    private static async Task<bool> DoWithRetries(Func<Task<bool>> action, int retryCount, string method)
    {
        while (true)
        {
            if (retryCount <= 0)
                return false;

            try
            {
                bool res = await action();
                if (res)
                    return true;
            }
            catch (Exception e)
            {
                // Log it
            }

            retryCount--;
            await Task.Delay(200); // retry in 200
        }
    }
}

И следующий код выполнения:

BlockingCollection<int> ints = new BlockingCollection<int>();
foreach (int i in Enumerable.Range(0, 100000))
{
    ints.Add(i);
}
ints.CompleteAdding();

int taskId = 0;
var enumerable = new AsyncEnumerable<int>(async yield =>
{
    await RetryComponent.RunTasks(async () =>
    {
        try
        {
            int myTaskId = Interlocked.Increment(ref taskId);

            // usually there are async/await operations inside the while loop, this is just an example

            while (!ints.IsCompleted)
            {
                int number = ints.Take();

                Console.WriteLine($"Task {myTaskId}: {number}");
                await yield.ReturnAsync(number);
            }
        }
        catch (InvalidOperationException)
        {
            return true;
        }
        catch (Exception e)
        {
            Console.WriteLine(e);
            throw;
        }

        return true;
    }, 10, 1, MethodBase.GetCurrentMethod().Name);
});

await enumerable.ForEachAsync(number =>
{
    Console.WriteLine(number);
});

где AsyncEnumerable происходит от System.Collections.Async.

В консоли отображается Task 10: X (где x — число в списке..).

Когда я удаляю AsyncEnumerable, все работает как задумано (все задачи печатаются и выполнение заканчивается).. По какой-то причине, которую я не могу найти в течение длительного времени, использование AsyncEnumerable просто все портит (в моем основном коде мне нужно использовать AsyncEnumerable... масштабируемость...), что означает, что код никогда не останавливается и только последняя задача (10 ) печатает. когда я добавил больше журналов, я вижу, что задачи 1-9 никогда не заканчиваются.

Итак, чтобы прояснить ситуацию, я хочу, чтобы несколько задач выполняли асинхронные операции и выдавали результаты одному объекту AsyncEnumerable, который действует как канал. (это была идея..)

Как это все портит?

ColinM 28.01.2019 14:02

потому что, когда его нет в AsyncEnumerable, все работает по назначению, и все задачи печатаются (а не только задача 10). Кроме того, эта задача не закончится, потому что задачи 1-9 никогда не заканчиваются по причине, которую я не знаю

Ori Refael 28.01.2019 14:04

Итак, при использовании AsyncEnumerable он печатает только Task 10: {number}? Пожалуйста, помните, что формулируя вопросы, только вы понимаете контекст, стоящий за этим, лучше всего описать проблему и ожидаемое/предполагаемое решение, чтобы помочь нам понять.

ColinM 28.01.2019 14:05

Я думаю, что сделал это более ясным сейчас.

Ori Refael 28.01.2019 14:07

@Selvin, да, я написал параллель по ошибке, я не это имел в виду

Ori Refael 28.01.2019 14:10

Все еще не очень понимаю, каков ожидаемый результат, следует ли перейти к Task 11 и Task 12? Вы указываете 10 задач для запуска, но ваш BlockingCollection имеет 100 000 элементов, внутри которых полностью обрабатывается ваш код, последняя задача выбирает все остальные элементы, полученные из ints. Можете ли вы привести примеры того, как это выглядит с неасинхронным перечислением и каково желаемое поведение?

ColinM 28.01.2019 14:45

Все 10 задач должны отображать числа. Это довольно прямолинейно из кода. Задача просто берет число и печатает его. Проблема в том, что когда я использую AsyncEnumerable, задачи 1-9 просто ничего не отображают. Как будто они сброшены. также выполнение не остановится, как будто задачи все еще выполняются, но ничего не делают

Ori Refael 28.01.2019 14:46

Поэтому я запускал код локально, задачи 1-9 находятся в самом верху командного окна.

ColinM 28.01.2019 15:07

Хорошо. Так почему же только задание 10 печатает результаты? Потому что, когда вы не используете IAsyncEnumerable, они также будут печатать результаты

Ori Refael 28.01.2019 15:10

Прошу прощения, я прокомментировал это сегодня во время обеденного перерыва. Посмотрев, я все еще озадачен тем, чего именно вы пытаетесь достичь с помощью различных фрагментов логики. Из локального запуска я вижу, что ваши первые 9 задач заблокированы, а 10 подхватывают остальные. Вы пытаетесь создать очередь pub-sub?

ColinM 28.01.2019 22:30

@ColinM представьте себе asyncEnumerable как канал, а задачи внутри как производители. они производят и отправляют данные, которые они делают, через конвейер. Проблема в том, что если вы запускаете программу, работает только последняя задача, а другая не завершается. если вы прокомментируете обертку «var enumerable = ...", вы увидите, что все задачи работают, как задумано, и программа завершает выполнение. с оберткой на asyncEnumerable (канал) работает только последняя задача, а остальные никогда не заканчиваются

Ori Refael 29.01.2019 10:54

Я не знаком с AsyncEnumerable, спасибо за описание контекста его выполнения. Что я обнаружил с этим кодом, так это то, что когда я вместо этого использовал foreach на ints без использования Take, он мог правильно выполнять итерацию задач, что заставило меня поверить, что использование BlockingCollection и Take блокирует / блокирует ваш первый задачи, пока они ждут, пока очередь снова станет доступной. Код также никогда не проходит дальше блока ForEachAsync.

ColinM 29.01.2019 12:02

@ColinM как? я изменил его, чтобы взять элемент из списка (вместо blockingCollection) и те же результаты.

Ori Refael 29.01.2019 14:07
pastebin.com/cJ6rLW7A — я переместил вызов Console.WriteLine после yield.ReturnAsync, чтобы он печатал Task {taskId}: {number} только после выполнения задания.
ColinM 29.01.2019 14:33
Стоит ли изучать PHP в 2026-2027 годах?
Стоит ли изучать PHP в 2026-2027 годах?
Привет всем, сегодня я хочу высказать свои соображения по поводу вопроса, который я уже много раз получал в своем сообществе: "Стоит ли изучать PHP в...
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
В JavaScript одним из самых запутанных понятий является поведение ключевого слова "this" в стрелочной и обычной функциях.
Приемы CSS-макетирования - floats и Flexbox
Приемы CSS-макетирования - floats и Flexbox
Здравствуйте, друзья-студенты! Готовы совершенствовать свои навыки веб-дизайна? Сегодня в нашем путешествии мы рассмотрим приемы CSS-верстки - в...
Тестирование функциональных ngrx-эффектов в Angular 16 с помощью Jest
В системе управления состояниями ngrx, совместимой с Angular 16, появились функциональные эффекты. Это здорово и делает код определенно легче для...
Концепция локализации и ее применение в приложениях React ⚡️
Концепция локализации и ее применение в приложениях React ⚡️
Локализация - это процесс адаптации приложения к различным языкам и культурным требованиям. Это позволяет пользователям получить опыт, соответствующий...
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
2
15
897
2
Перейти к ответу Данный вопрос помечен как решенный

Ответы 2

Ответ принят как подходящий

Проблема в том, что шаблон перечислителя/генератора является последовательным, но вы пытаетесь создать шаблон с несколькими производителями и одним потребителем. Поскольку вы используете вложенные анонимные функции, а переполнение стека не показывает номера строк, сложно точно описать, на какую часть кода я ссылаюсь, но я все равно попробую.

Принцип работы AsyncEnumerable заключается в том, чтобы дождаться, пока производитель произведет значение, затем дождаться, пока потребитель использует это значение, а затем повторить. Он не поддерживает производитель и потребитель, работающие с разной скоростью, поэтому я говорю, что этот шаблон является последовательным. У него нет очереди произведенных предметов, только текущее значение. ReturnAsync не ждет, чтобы потребитель использовал значение, вместо этого вы должны ждать задачи, которую он возвращает, что дает вам сигнал о том, что она готова. Поэтому мы можем сделать вывод, что это не потокобезопасно.

Однако RetryComponent.RunTasks выполняет 10 задач параллельно, и этот код вызывает yield.ReturnAsync, не проверяя, не вызывал ли его кто-либо еще, и если да, то завершена ли эта задача. Поскольку класс Yield хранит только текущее значение, ваши 10 одновременных задач перезаписывают текущее значение, не дожидаясь готовности объекта Yield для нового значения, поэтому 9 задач теряются и никогда не ожидаются. Поскольку эти 9 задач никогда не ожидаются, методы никогда не завершатся, и Task.WhenAll никогда не вернется, как и другие методы во всем стеке вызовов.

Я создал вопрос на github предлагает им улучшить свою библиотеку, чтобы в таких случаях возникали исключения. Если они это реализуют, ваш блок catch выведет сообщение на консоль и повторно выдаст ошибку, поставив задачу в состояние сбоя, что позволит task.WhenAll завершиться и, следовательно, ваша программа не зависнет.

Вы можете использовать многопоточные API-интерфейсы синхронизации, чтобы гарантировать, что только одна задача за раз вызывает yield.ReturnAsync и ожидает возврата задачи. Или вы можете избежать использования шаблона с несколькими производителями, поскольку один производитель может легко быть перечислителем. В противном случае вам придется полностью переосмыслить, как вы хотите реализовать шаблон с несколькими производителями. Я предлагаю Поток данных TPL, который встроен в .NET Core и доступен в .NET Framework в виде пакета NuGet.

я, черт возьми, знал, что с этим что-то не так.. я проверял свой код больше раз, чем могу сосчитать..

Ori Refael 04.02.2019 16:03

@zivkan абсолютно прав насчет последовательного шаблона производителя. Если вы хотите иметь параллельных производителей для одного потока, это все еще можно реализовать с помощью библиотеки AsyncEnumerable, но для этого потребуется дополнительный код.

Вот пример решения проблемы с параллельными производителями и потребителями (в данном случае только с одним потребителем):

        static void Main(string[] args)
        {
            var e = new AsyncEnumerable<int>(async yield =>
            {
                var threadCount = 10;
                var maxItemsOnQueue = 20;

                var queue = new ConcurrentQueue<int>();
                var consumerLimiter = new SemaphoreSlim(initialCount: 0, maxCount: maxItemsOnQueue + 1);
                var produceLimiter = new SemaphoreSlim(initialCount: maxItemsOnQueue, maxCount: maxItemsOnQueue);

                // Kick off producers
                var producerTasks = Enumerable.Range(0, threadCount)
                    .Select(index => Task.Run(() => ProduceAsync(queue, produceLimiter, consumerLimiter)));

                // When production ends, send a termination signal to the consumer.
                var endOfProductionTask = Task.WhenAll(producerTasks).ContinueWith(_ => consumerLimiter.Release());

                // The consumer loop.
                while (true)
                {
                    // Wait for an item to be produced, or a signal for the end of production.
                    await consumerLimiter.WaitAsync();

                    // Get a produced item.
                    if (queue.TryDequeue(out var item))
                    {
                        // Tell producers that they can keep producing.
                        produceLimiter.Release();
                        // Yield a produced item.
                        await yield.ReturnAsync(item);
                    }
                    else
                    {
                        // If the queue is empty, the production is over.
                        break;
                    }
                }
            });

            e.ForEachAsync((item, index) => Console.WriteLine($"{index + 1}: {item}")).Wait();
        }

        static async Task ProduceAsync(ConcurrentQueue<int> queue, SemaphoreSlim produceLimiter, SemaphoreSlim consumerLimiter)
        {
            var rnd = new Random();
            for (var i = 0; i < 10; i++)
            {
                await Task.Delay(10);
                var value = rnd.Next();

                await produceLimiter.WaitAsync(); // Wait for the next production slot
                queue.Enqueue(value); // Produce item on the queue
                consumerLimiter.Release(); // Notify the consumer
            }
        }

Другие вопросы по теме