Создание наблюдателя за асинхронными ресурсами в c# (ресурс очереди сервис-брокера)

Отчасти в качестве упражнения по изучению асинхронности, я решил попробовать создать класс ServiceBrokerWatcher. Идея во многом такая же, как у FileSystemWatcher - наблюдать за ресурсом и вызывать событие, когда что-то происходит. Я надеялся сделать это с помощью async, а не создавать поток, потому что природа зверя означает, что большую часть времени он просто ждет SQL-оператора waitfor (receive ...). Это казалось идеальным использованием async.

Я написал код, который «работает»: когда я отправляю сообщение через брокера, класс замечает это и запускает соответствующее событие. Я думал, что это было супер аккуратно.

Но я подозреваю, что где-то в моем понимании происходящего я обнаружил что-то принципиально неправильное, потому что, когда я пытаюсь остановить наблюдателя, он ведет себя не так, как я ожидал.

Сначала краткий обзор компонентов, а затем собственно код:

У меня есть хранимая процедура, которая выдает waitfor (receive...) и возвращает результат клиенту при получении сообщения.

Существует Dictionary<string, EventHandler>, который сопоставляет имена типов сообщений (в наборе результатов) с соответствующими обработчиками событий. Для простоты в примере у меня есть только один тип сообщения.

У класса наблюдателя есть асинхронный метод, который повторяется «бесконечно» (до тех пор, пока не будет запрошена отмена), который содержит выполнение процедуры и возникновение событий.

Так в чем проблема? Что ж, я попытался разместить свой класс в простом приложении winforms, и когда я нажимаю кнопку для вызова метода StopListening() (см. Ниже), выполнение не отменяется сразу, как я думал. Линия listener?.Wait(10000) фактически будет ждать 10 секунд (или сколько бы я ни установил тайм-аут). Если я смотрю, что происходит с профилировщиком SQL, я вижу, что событие «внимание» отправляется «сразу», но функция все равно не завершается.

Я добавил комментарии к коду, начинающиеся с "!" где я подозреваю, что я что-то неправильно понял.

Итак, главный вопрос: почему мой метод ListenAsync не "соблюдает" мой запрос на отмену?

Кроме того, правильно ли я думаю, что эта программа (большую часть времени) использует только один поток? Я сделал что-нибудь опасное?

Код следует, я старался урезать его, насколько мог:

// class members //////////////////////
private readonly SqlConnection sqlConnection;
private CancellationTokenSource cts;
private readonly CancellationToken ct;
private Task listener;
private readonly Dictionary<string, EventHandler> map;

public void StartListening()
{
    if (listener == null)
    {
        cts = new CancellationTokenSource();
        ct = cts.Token;
        // !I suspect assigning the result of the method to a Task is wrong somehow...
        listener = ListenAsync(ct); 
    }
}

public void StopListening()
{
    try
    {
        cts.Cancel(); 
        listener?.Wait(10000); // !waits the whole 10 seconds for some reason
    } catch (Exception) { 
        // trap the exception sql will raise when execution is cancelled
    } finally
    {
        listener = null;
    }
}

private async Task ListenAsync(CancellationToken ct)
{
    using (SqlCommand cmd = new SqlCommand("events.dequeue_target", sqlConnection))
    using (CancellationTokenRegistration ctr = ct.Register(cmd.Cancel)) // !necessary?
    {
        cmd.CommandTimeout = 0;
        while (!ct.IsCancellationRequested)
        {
            var events = new List<string>();    
            using (var rdr = await cmd.ExecuteReaderAsync(ct))
            {
                while (rdr.Read())
                {
                    events.Add(rdr.GetString(rdr.GetOrdinal("message_type_name")));
                }
            }
            foreach (var handler in events.Join(map, e => e, m => m.Key, (e, m) => m.Value))
            {
                if (handler != null && !ct.IsCancellationRequested)
                {
                    handler(this, null);
                }
            }
        }
    }
}

Приложение: если я запускаю профилировщик запросов, я вижу, что событие «внимание» попадает в SQL Server почти сразу после вызова StopListening(), так что эта часть, похоже, работает, но listenerTask все еще не завершается. Я чувствую, что, возможно, пропустил здесь что-то совершенно очевидное ...

allmhuran 26.10.2018 10:05

Звоните ct.ThrowIfCancellationRequested() вместо проверки ct.IsCancellationRequested в ListenAsync

Collin Dauphinee 26.10.2018 10:28

Спасибо, Коллин, но я, должно быть, делаю что-то неправильное. Я попытался добавить ct.ThrowIfCancellationRequested() в нескольких разных местах функции (в начале и в конце цикла while), но поведение осталось прежним.

allmhuran 26.10.2018 15:47
1
3
438
2
Перейти к ответу Данный вопрос помечен как решенный

Ответы 2

Ответ принят как подходящий

Вы не показываете, как вы связали его с приложением WinForms, но если вы используете обычные методы void button1click, вы можете столкнуться с Эта проблема.

Таким образом, ваш код будет нормально работать в консольном приложении (это произойдет, когда я попробую), но при вызове через поток пользовательского интерфейса возникнет тупик.

Я бы предложил изменить класс вашего контроллера, чтобы открыть методы запуска и остановки async и вызывать их, например:

    private async void btStart_Click(object sender, EventArgs e)
    {
        await controller.StartListeningAsync();
    }

    private async void btStop_Click(object sender, EventArgs e)
    {
        await controller.StopListeningAsync();
    }

Спасибо, Питер. Да, я использовал простые обработчики кнопок void, которые создаются автоматически, когда вы просто дважды щелкаете элемент управления в области конструктора. Приложение уже реагировало на нажатие кнопки запуска. Я могу сделать этот вызов асинхронным, но он ведет себя точно так же - события в очереди запускают обработчик, который добавляет строку текста в текстовое поле. Программу еще можно перетащить и так далее. Я не уверен, что я бы использовал await в методе StopListening(). Там нет ожидаемых, только cts.Cancel(), а затем я даю задаче шанс выйти.

allmhuran 26.10.2018 19:17

При дальнейшем исследовании кажется, что только метод остановки должен быть асинхронным. Программа по-прежнему работает правильно с исходным определением StartListening(). Кроме того, в методе StopListening() мне не нужно добавлять .ConfigureAwait(false). Я могу просто сделать cts.Cancel(); await listener;. Но я до сих пор не совсем понимаю, что вызывает 10-секундное ожидание в исходном определении. Какие два процесса за какие ресурсы конкурируют в тупике? Если одним из них является сам метод ListenAsync(), почему он также не вызывает тупик при вызове StartListening()?

allmhuran 26.10.2018 19:59

AFAIK, взаимоблокировка связана с тем, что поток пользовательского интерфейса выполняет блокирующее ожидание завершения задачи, задаче требуется асинхронный метод для запуска продолжения, прежде чем оно может быть завершено, и задача захватила контекст потока пользовательского интерфейса, чтобы запланировать завершение. Этого не может быть, потому что поток пользовательского интерфейса заблокирован.

Peter Wishart 26.10.2018 21:52

На StartListening() это не влияет, потому что на самом деле он ничего не ожидает, так что да, это может быть обычный метод void. Или он мог бы вернуть Task.CompletedTask и ожидать, не будучи на самом деле async, просто для симметрии.

Peter Wishart 26.10.2018 21:56

У Питера был правильный ответ. Несколько минут я был сбит с толку из-за того, что зашло в тупик, но потом я хлопнул себя лбом. Это продолжение ListenAsync после отмены ExecuteReaderAsync, потому что это просто задача, а не отдельный поток. В конце концов, в этом и был весь смысл!

Тогда я подумал ... Хорошо, а что, если я скажу асинхронной части ListenAsync(), что ей не нужен поток пользовательского интерфейса. Я буду называть ExecuteReaderAsync(ct) с .ConfigureAwait(false)! Ага! Теперь методы класса больше не должны быть асинхронными, потому что в StopListening() я могу просто listener.Wait(10000), ожидание продолжит внутреннюю задачу в другом потоке, и потребитель не станет мудрее. О мальчик, такой умный.

Но нет, я не могу этого сделать. По крайней мере, не в приложении веб-форм. Если я это сделаю, текстовое поле не обновится. И причина этого кажется достаточно ясной: внутренности ListenAsync вызывают обработчик событий, и этот обработчик событий представляет собой функцию, которая хочет обновить текст в текстовом поле, что, несомненно, должно происходить в потоке пользовательского интерфейса. Таким образом, он не блокируется, но он также не может обновлять пользовательский интерфейс. Если я установил точку останова в обработчике, который хочет обновить пользовательский интерфейс, будет выполнена строка кода, но пользовательский интерфейс не может быть изменен.

Так что, в конце концов, кажется, что единственное решение в этом случае - действительно «полностью перейти на асинхронный режим». Или в этом случае вверх!

Я надеялся, что мне не нужно этого делать. Тот факт, что внутренности моего Watcher используют асинхронные методологии, а не просто порождают поток, на мой взгляд, является «деталью реализации», о которой вызывающему не следует заботиться. Но у FileSystemWatcher точно такая же проблема (необходимость в control.Invoke, если вы хотите обновить графический интерфейс на основе события наблюдателя), так что это не так уж и плохо. Если бы я был потребителем, которому приходилось выбирать между использованием async или Invoke, я бы выбрал async!

Другие вопросы по теме