Определение завершения длительных асинхронных задач

[Обновлено 18 апреля 2018 г. с примером LinqPad — см. конец]

Мое приложение получает список заданий:

var jobs = await myDB.GetWorkItems();

(Примечание: мы используем .ConfigureAwait(false) везде, я просто не показываю его в этих фрагментах псевдокода.)

Для каждого задания мы создаем долгоиграющую задачу. Однако мы не хотим ждать завершения этой длительной задачи.

jobs.ForEach(job =>
{
    var module = Factory.GetModule(job.Type);
    var task = Task.Run(() => module.ExecuteAsync(job.Data));
    this.NonAwaitedTasks.Add(task, module);
};

Задача и связанный с ней экземпляр модуля добавляются в ConcurrentDictionary, чтобы они не выходили за рамки.

В другом месте у меня есть другой метод, который иногда вызывается и содержит следующее:

foreach (var entry in this.NonAwaitedTasks.Where(e => e.Key.IsCompleted))
{
    var module = entry.Value as IDisposable;
    module?.Dispose();
    this.NonAwaitedTasks.Remove(entry.Key);
}

(Обратите внимание, что NonAwaitedTasks дополнительно блокируется с помощью SemaphoreSlim...)

Итак, идея состоит в том, что этот метод найдет все те задачи, которые были завершены, а затем избавится от связанного с ними модуля и удалит их из этого словаря.

Однако....

Во время отладки в Visual Studio 2017 я извлекаю одно задание из БД, и пока я трачу время на отладку в одном экземпляре модуля, в этом модуле вызывается Dispose. Глядя в стек вызовов, я вижу, что Dispose был вызван в методе выше, и это потому, что задача имеет IsCompleted == true. Но, очевидно, это не может быть завершено, потому что я все еще отлаживаю его.

  • Является ли свойство .IsCompleted неправильным свойством для проверки?
  • Это просто артефакт отладки в Visual Studio?
  • Я иду об этом неправильно?

Дополнительная информация

В комментариях ниже меня попросили предоставить некоторую дополнительную информацию о потоке, потому что то, что я описал, казалось невозможным (и я действительно надеялся, что этого не может быть). Ниже приведена урезанная версия моего кода (я удалил проверки токена отмены и защитного кодирования, но ничего, что влияет на поток).

Точка входа приложения

Это служба Windows. В OnStart() есть следующая строка:

this.RunApplicationTask = 
Task.Run(() => myApp.DoWorkAsync().ConfigureAwait(false), myService.CancelSource.Token);

«RunApplicationTask» — это просто свойство, позволяющее сохранять выполняемую задачу в области действия в течение всего срока службы службы.

ДоворкАсинк()

public async Task DoWorkAsync()
{
    do
    {
        await this.ExecuteSingleIterationAsync().ConfigureAwait(false);
        await Task.Delay(TimeSpan.FromSeconds(5)).ConfigureAwait(false);
    }
    while (myApp.ServiceCancellationToken.IsCancellationRequested == false);

    await Task.WhenAll(this.NonAwaitedTasks.Keys).ConfigureAwait(false);
    await this.ClearCompletedTasksAsync().ConfigureAwait(false);
    this.WorkItemsTaskCompletionSource.SetResult(true);

    return;
}

Итак, пока я отлаживаю, это итерация DO-LOOP, она не попадает в Task.WhenAll(....).

Также обратите внимание, что после вызова запроса на отмену и завершения всех задач я вызываю ClearCompletedTasksAsync(). Об этом позже....

ExecuteSingleIterationAsync

private async Task ExecuteSingleIterationAsync()
{
    var getJobsResponse = await DB.GetJobsAsync().ConfigureAwait(false);
    await this.ProcessWorkLoadAsync(getJobsResponse.Jobs).ConfigureAwait(false);
    await this.ClearCompletedTasksAsync().ConfigureAwait(false);
}

Процессворковлоадасинк

private async Task ProcessWorkLoadAsync(IList<Job> jobs)
{
    if (jobs.NoItems())
    {
        return ;
    }

    jobs.ForEach(job =>
    {
        // The processor instance is disposed of when removed from the NonAwaitedTasks collection.
        IJobProcessor processor = ProcessorFactory.GetProcessor(workItem, myApp.ServiceCancellationToken);
        try
        {
            var task = Task.Run(() => processor.ExecuteAsync(job).ConfigureAwait(false), myApp.ServiceCancellationToken);
            this.NonAwaitedTasks.Add(task, processor);
        }
        catch (Exception e)
        {
            ...
        }
    });

    return;
}

Каждый процессор реализует следующий метод интерфейса: Задача ExecuteAsync (рабочее задание);

Когда я нахожусь в ExecuteAsync, .Dispose() вызывается для экземпляра процессора, который я использую.

ProcessorFactory.GetProcessor()

public static IJobProcessor GetProcessor(Job job, CancellationToken token)
{
    .....
    switch (someParamCalculatedAbove)
    {
        case X:
            {
                return new XProcessor(...);
            }

        case Y:
            {
                return new YProcessor(...);
            }

        default:
            {
                return null;
            }
    }
}

Итак, здесь мы получаем экземпляр новый.

ClearCompletedTasksAsync()

private async Task ClearCompletedTasksAsync()
{
    await myStatic.NonAwaitedTasksPadlock.WaitAsync().ConfigureAwait(false);
    try
    {
        foreach (var taskEntry in this.NonAwaitedTasks.Where(entry => entry.Key.IsCompleted).ToArray())
        {
            var processorInstance = taskEntry.Value as IDisposable;
            processorInstance?.Dispose();
            this.NonAwaitedTasks.Remove(taskEntry.Key);
        }
    }
    finally
    {
        myStatic.NonAwaitedTasksPadlock.Release();
    }
}

Это называется каждой итерацией Do-Loop. Его цель состоит в том, чтобы гарантировать, что список неожидаемых задач остается небольшим.

Вот и все... Кажется, что Dispose вызывается только при отладке.

Пример LinqPad

async Task Main()
{
    SetProcessorRunning();

    await Task.Delay(TimeSpan.FromSeconds(1)).ConfigureAwait(false);

    do
    {
        foreach (var entry in NonAwaitedTasks.Where(e => e.Key.IsCompleted).ToArray())
        {
            "Task is completed, so will dispose of the Task's processor...".Dump();
            var p = entry.Value as IDisposable;
            p?.Dispose();
            NonAwaitedTasks.Remove(entry.Key);
        }
    }
    while (NonAwaitedTasks.Count > 0);
}

// Define other methods and classes here

public void SetProcessorRunning()
{
    var p = new Processor();

    var task = Task.Run(() => p.DoWorkAsync().ConfigureAwait(false));
    NonAwaitedTasks.Add(task, p);
}

public interface IProcessor
{
    Task DoWorkAsync();
}

public static Dictionary<Task, IProcessor> NonAwaitedTasks = new Dictionary<Task, IProcessor>();

public class Processor : IProcessor, IDisposable
{
    bool isDisposed = false;
    public void Dispose()
    {
        this.isDisposed = true;
        "I have been disposed of".Dump();
    }

    public async Task DoWorkAsync()
    {
        await Task.Delay(TimeSpan.FromSeconds(5)).ConfigureAwait(false);

        if (this.isDisposed)
        {
            $"I have been disposed of (isDispose = {this.isDisposed}) but I've not finished work yet...".Dump();
        }

        await Task.Delay(TimeSpan.FromSeconds(5)).ConfigureAwait(false);
    }
}

Выход:

Task is completed, so will dispose of the Task's processor...

I have been disposed of

I have been disposed of (isDispose = True) but I've not finished work yet...

Примечание: называть предположительно синхронный метод ExecuteAsync с суффиксом ...Async очень запутанно. Пожалуйста, не делайте этого, по крайней мере, для общедоступных образцов, которые вы показываете здесь (он же минимальный воспроизводимый пример).

Alexei Levenkov 09.04.2019 20:38

Точка зрения @AlexeiLevenkov может указывать и на реальную проблему. Является ли Module.ExecuteAsync() асинхронным методом? Если это так, то Task.Run() буду завершается до завершения ExecuteAsync() (при условии, что у вас есть ожидания в ExecuteAsync()).

sellotape 09.04.2019 21:37

Задача не будет отмечена как выполненная, пока она не будет завершена, поэтому то, что вы описываете, невозможно. Должно быть что-то еще, например, другое задание с тем же Type (и, следовательно, тем же модулем), или async void, или код «запустил и забыл».

Stephen Cleary 10.04.2019 01:43

Стивен прав; Я (ошибочно) думал о конструкторе Task, где ваша лямбда может выглядеть как функция, но на самом деле является действием, поэтому игнорируйте мой комментарий выше. Я думаю, вам нужно показать нам подпись ExecuteAsync().

sellotape 10.04.2019 08:43

Спасибо за комментарии. Я добавил гораздо больше деталей в исходный пост.

DrGriff 10.04.2019 10:07

Попробуйте избавиться от всех Task.Run и работать только с Task, которые возвращают ваши методы. В большинстве хороших асинхронных кодов почти нет Task.Run, особенно для вызова методов, которые уже возвращают Task.

Damien_The_Unbeliever 10.04.2019 11:52

@Дэмиен. Это не так просто. У меня есть жесткий цикл, который вытягивает рабочую нагрузку из БД и запускает множество длительных задач. Он не хочет торчать, ожидая их завершения, а идет к БД и смотрит, есть ли еще какая-то работа.

DrGriff 18.04.2019 21:11

@ Стивен Клири. Я отредактировал свой пост, включив в него урезанный скрипт LinqPad, демонстрирующий это. Надеюсь, кому-то понятно, в чем моя ошибка...

DrGriff 18.04.2019 21:17

@DrGriff Task.Run не обязательно не ждать задачи. Чтобы не ждать задачи, вы можете просто не ждать ее, даже если вы не планируете поток пула потоков для запуска асинхронной операции за вас.

Servy 18.04.2019 22:06
Стоит ли изучать PHP в 2023-2024 годах?
Стоит ли изучать PHP в 2023-2024 годах?
Привет всем, сегодня я хочу высказать свои соображения по поводу вопроса, который я уже много раз получал в своем сообществе: "Стоит ли изучать PHP в...
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
В JavaScript одним из самых запутанных понятий является поведение ключевого слова "this" в стрелочной и обычной функциях.
Приемы CSS-макетирования - floats и Flexbox
Приемы CSS-макетирования - floats и Flexbox
Здравствуйте, друзья-студенты! Готовы совершенствовать свои навыки веб-дизайна? Сегодня в нашем путешествии мы рассмотрим приемы CSS-верстки - в...
Тестирование функциональных ngrx-эффектов в Angular 16 с помощью Jest
В системе управления состояниями ngrx, совместимой с Angular 16, появились функциональные эффекты. Это здорово и делает код определенно легче для...
Концепция локализации и ее применение в приложениях React ⚡️
Концепция локализации и ее применение в приложениях React ⚡️
Локализация - это процесс адаптации приложения к различным языкам и культурным требованиям. Это позволяет пользователям получить опыт, соответствующий...
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
1
9
296
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Ваша проблема в этой строке:

var task = Task.Run(() => p.DoWorkAsync().ConfigureAwait(false));

Наведите курсор на var и посмотрите, что это за тип.

Task.Run понимает async делегатов благодаря специальным правилам «распаковки задач» для Func<Task<Task>> и друзей. Но особой распаковки для Func<ConfiguredTaskAwaitable> не будет.

Вы можете думать об этом таким образом; с кодом выше:

  1. p.DoWorkAsync() возвращает Task.
  2. Task.ConfigureAwait(false) возвращает ConfiguredTaskAwaitable.
  3. Таким образом, Task.Run запрашивается запуск этой функции, которая создает ConfiguredTaskAwaitable в потоке пула потоков.
  4. Таким образом, возвращаемый тип Task.Run — это Task<ConfiguredTaskAwaitable> — задача, которая завершается, как только создается ConfiguredTaskAwaitable. Когда это созданный - не когда он завершается.

В этом случае ConfigureAwait(false) все равно ничего не делает, потому что await не нужно настраивать. Итак, вы можете удалить его:

var task = Task.Run(() => p.DoWorkAsync());

Кроме того, как упоминал Серви, если вы не используете нужно для запуска DoWorkAsync в потоке пула потоков, вы также можете пропустить Task.Run:

var task = p.DoWorkAsync();

Стивен Клири и Серви оба считают, что я могу отказаться от Task.Run, если мне не нужно запускать свою задачу в потоке пула потоков. Мой — это служба Windows, которая реализует цикл, который сначала получает n-количество рабочих элементов из БД, затем устанавливает обработку каждого рабочего элемента (асинхронно, поскольку они включают ввод-вывод) в режиме «запустить и забыть», а затем выполняет другой повторение цикла. В этом случае я думаю, что асинхронный режим «выстрелил и забыл» должен быть в потоках poop. Вы бы согласились?

DrGriff 23.04.2019 12:19

Я почти никогда не рекомендую принцип «выстрелил и забыл». Вы абсолютно уверены, что хотите, чтобы исключения поглощались молча?

Stephen Cleary 23.04.2019 15:25

Понял. В этом случае асинхронный метод, который вызывается по принципу «запустил и забыл», является автономным; он выполняет множество действий, обрабатывает исключения и регистрирует состояние в БД, поэтому ничего не проглатывается молча. Нечего «отчитываться» — мы хотим только сохранить теги в Задаче, чтобы Служба не могла остановиться, пока она не будет завершена. Поэтому я думаю, что это должен быть поток пула потоков...

DrGriff 24.04.2019 10:18

«чтобы Служба не могла остановиться, пока не будет завершена» — это еще одна причина, по которой я не рекомендую «выстрелил и забыл». Если бы вы сохранили Tasks, вы могли бы await на них во время выключения. Мне это кажется чище, чем «сохранение тегов». Другими словами, вы уже есть сигнал (Task), который уведомляет вас о завершении фоновой задачи; зачем строить еще один?

Stephen Cleary 24.04.2019 14:48

Это то, что мы делаем. Асинхронный метод, выполняемый в задаче, является автономным (как описано выше). Ему нечего возвращать, когда он будет завершен. Однако мы добавляем задачу в словарь (this.NonAwaitedTasks) и периодически удаляем все те задачи, которые завершились. Единственная цель этого словаря состоит в том, чтобы, когда кто-то нажимает [STOP], мы ждали WHENALL для оставшихся задач. Таким образом, это эффективно «выстрелил и забыл, пока кто-то не щелкнет СТОП». Но остается вопрос, должен ли я использовать Task.Run(...) специально для их запуска из пула потоков?

DrGriff 24.04.2019 20:31

Конечно, это нормально. Task.Run приятно разгрузить все в пул потоков и вообще не блокировать вызывающий код. Если вы не игнорируете задание из Task.Run, то это вовсе не «выстрелил-забыл».

Stephen Cleary 24.04.2019 20:56

Другие вопросы по теме