Как избежать возникновения исключения InvalidOperationException, когда ChannelReader.WaitToReadAsync?

Я написал асинхронную очередь, используя System.Threading.Channels. но когда я запустил программу для тестирования, в случайное время было выбрано следующее исключение, и рабочий поток был остановлен.

System.InvalidOperationException: The asynchronous operation has not completed.
   at System.Threading.Channels.AsyncOperation.ThrowIncompleteOperationException()
   at System.Threading.Channels.AsyncOperation`1.GetResult(Int16 token)
   at AsyncChannels.Worker() in g:\src\gitrepos\dotnet-sandbox\channelstest\AsyncChannelsTest.cs:line 26

Если Exception было перехвачено и проигнорировано, код работает. Но я хочу избавиться от ошибки, причина которой не ясна.

вот моя среда и минимум кода.

  • TargetFramework = сетевое приложение2.1
  • Версия System.Threading.Channels = 4.5.0
using System.Threading.Channels;
using System.Threading;
using System.Threading.Tasks;
using System;
using System.Linq;

class AsyncChannels : IDisposable
{
    Channel<TaskCompletionSource<bool>> _Channel;
    Thread _Thread;
    CancellationTokenSource _Cancellation;
    public AsyncChannels()
    {
        _Channel = Channel.CreateUnbounded<TaskCompletionSource<bool>>();
        _Thread = new Thread(Worker);
        _Thread.Start();
        _Cancellation = new CancellationTokenSource();
    }
    private void Worker()
    {
        while (!_Cancellation.IsCancellationRequested)
        {
            // System.InvalidOperationException is thrown
            if (!_Channel.Reader.WaitToReadAsync(_Cancellation.Token).Result)
            {
                break;
            }
            while (_Channel.Reader.TryRead(out var item))
            {
                item.TrySetResult(true);
            }
        }
    }
    public void Dispose()
    {
        _Cancellation.Cancel();
        _Channel.Writer.TryComplete();
        _Thread.Join();
    }
    public Task<bool> Enqueue()
    {
        var tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
        _Channel.Writer.TryWrite(tcs);
        return tcs.Task;
    }
    public static async Task Test()
    {
        using (var queue = new AsyncChannels())
        {
            for (int i = 0; i < 100000; i++)
            {
                await queue.Enqueue().ConfigureAwait(false);
            }
        }
    }
}

Вы отлаживали, вы уверены, что исключение не возникает, когда вы отменяете задачу во время Dispose?

peeyush singh 25.04.2019 04:54
Стоит ли изучать PHP в 2026-2027 годах?
Стоит ли изучать PHP в 2026-2027 годах?
Привет всем, сегодня я хочу высказать свои соображения по поводу вопроса, который я уже много раз получал в своем сообществе: "Стоит ли изучать PHP в...
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
В JavaScript одним из самых запутанных понятий является поведение ключевого слова "this" в стрелочной и обычной функциях.
Приемы CSS-макетирования - floats и Flexbox
Приемы CSS-макетирования - floats и Flexbox
Здравствуйте, друзья-студенты! Готовы совершенствовать свои навыки веб-дизайна? Сегодня в нашем путешествии мы рассмотрим приемы CSS-верстки - в...
Тестирование функциональных ngrx-эффектов в Angular 16 с помощью Jest
В системе управления состояниями ngrx, совместимой с Angular 16, появились функциональные эффекты. Это здорово и делает код определенно легче для...
Концепция локализации и ее применение в приложениях React ⚡️
Концепция локализации и ее применение в приложениях React ⚡️
Локализация - это процесс адаптации приложения к различным языкам и культурным требованиям. Это позволяет пользователям получить опыт, соответствующий...
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
2
1
1 016
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Вам не может напрямую заблокировать ValueTask<T> нравится это:

_Channel.Reader.WaitToReadAsync(_Cancellation.Token).Result

Вы можете сделать только две вещи с ValueTask<T>: await его (только один раз) или преобразовать его в Task<T>, вызвав AsTask(). Если вам нужно сделать что-то сложное, например, awaitвыполнить более одного раза или заблокировать его, вам нужно использовать AsTask().

Или, в этом случае, просто используйте await в стандартной схеме потребления каналов:

class AsyncChannels : IDisposable
{
  Channel<TaskCompletionSource<bool>> _Channel;
  Task _Thread;
  CancellationTokenSource _Cancellation;
  public AsyncChannels()
  {
    _Channel = Channel.CreateUnbounded<TaskCompletionSource<bool>>();
    _Thread = Task.Run(() => WorkerAsync());
    _Cancellation = new CancellationTokenSource();
  }
  private async Task WorkerAsync()
  {
    try
    {
      while (await _Channel.Reader.WaitToReadAsync(_Cancellation.Token))
        while (_Channel.Reader.TryRead(out var item))
          item.TrySetResult(true);
    }
    catch (OperationCanceledException)
    {
    }
  }
  public void Dispose()
  {
    _Cancellation.Cancel();
    _Channel.Writer.TryComplete();
  }
  ...
}

Другие вопросы по теме