Приложение WPF, выполняющее параллельные операции

Я разрабатываю небольшое программное приложение. общая логика того, как это будет работать, такова.

  1. Строка UDP транслируется в локальной сети.
  2. Приложение захватит эту строку в фоновом режиме и на основе ее содержимого выполнит несколько простых операций (обновление локальной базы данных — копирование файлов из a в b).

Прямо сейчас я использую этот метод, чтобы начать прослушивание в фоновом режиме, и я все еще могу использовать свое приложение, пока это происходит.

public async Task StartListeningAsync()
{
    Debug.WriteLine("UDP LISTENER IS ACTIVE");
    udpClient = new UdpClient(udpPort);

    try
    {
        while (true)
        {
            UdpReceiveResult result = await udpClient.ReceiveAsync();
            string receivedString = Encoding.UTF8.GetString(result.Buffer);
            OnUdpMessageReceived(receivedString);
        }
    }
    catch (Exception ex)
    {
        Debug.WriteLine($"Error in UDP listener: {ex.Message}");
    }
    finally
    {
        udpClient.Close();
    }
}

Мой вопрос: Чтобы запустить операции, которые я объяснил в пункте 2, каков наилучший подход, которому мне следует следовать, чтобы позволить мне использовать приложение во время выполнения этих задач.

Сейчас я использую обработчик событий, который инициализируется при запуске приложения:

private async void UdpMessageReceivedHandler(object sender, UdpMessageEventArgs e)
{
    string receivedMessage = e.Message;

    // Handle the received message (e.g., trigger specific actions)
    if (receivedMessage.StartsWith("A"))
    {
        // Trigger Action A
        Debug.WriteLine("Action A triggered!");
        // Implement your action logic here
        string sourceDirectory = @"C:\Users\Documents...";
        string destinationDirectory = @"C:\Users\Documents...";

        await Task.Run(() => CopyFiles(sourceDirectory, destinationDirectory));
    }
    else if (receivedMessage.StartsWith("B"))
    {
        // Trigger Action B
        Debug.WriteLine("Action B triggered!");
        // Implement your action logic here
    }
}

Проблема в том, что я знаю, что Task.Run может вызвать задачу, которая никогда не будет завершена, и меня беспокоит, что это может вызвать ошибки.

Правильный ли этот подход или есть более элегантный способ достижения моей цели?

Вы можете использовать прослушиватель событий asnc void или AsyncRelayCommand, чтобы начать прослушивание. Может быть, посмотрите MVVM.

Daniel W. 25.04.2024 12:58

Это асинхронные, а не параллельные операции. Есть огромная разница. Параллельность означает использование нескольких ядер для обработки большого количества уже доступных данных. Асинхронность означает выполнение чего-то, что не требует процессора, например ввода-вывода, без блокировки и обработки результатов.

Panagiotis Kanavos 25.04.2024 12:58

НИКОГДА не следует использовать async void ни для чего, кроме обработчиков асинхронных событий для устаревших компонентов. Таких методов нельзя ожидать. Вы также должны объяснить, что вы на самом деле хотите сделать, а не то, как вы ожидаете, что это будет работать. Непонятно, что OnUdpMessageReceived, но если это каким-либо образом связано с UdpMessageReceivedHandler, ваш код, скорее всего, потерпит неудачу.

Panagiotis Kanavos 25.04.2024 13:00

Похоже, что на самом деле вам нужно обрабатывать входящие сообщения, а не запускать что-либо. Вы можете использовать ActionBlock<T> или Channel и Parallel.ForEachAsync для реализации рабочих очередей даже в длинных асинхронных конвейерах. Предполагая, что у вас есть способ правильно читать сообщения (сейчас это, по сути, случайные символы), вы можете использовать, например, await copyWorker.SendAsync(message); для отправки сообщений разным работникам для обработки или для головного блока, который отправляет сообщения нужному работнику.

Panagiotis Kanavos 25.04.2024 13:04

Но сейчас у вас нет возможности узнать, получали ли вы какие-либо сообщения. Длинные сообщения могут быть разделены на несколько датаграмм. UDP ненадежен, а это значит, что датаграммы очень легко потерять.

Panagiotis Kanavos 25.04.2024 13:07
Стоит ли изучать PHP в 2026-2027 годах?
Стоит ли изучать PHP в 2026-2027 годах?
Привет всем, сегодня я хочу высказать свои соображения по поводу вопроса, который я уже много раз получал в своем сообществе: "Стоит ли изучать PHP в...
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
В JavaScript одним из самых запутанных понятий является поведение ключевого слова "this" в стрелочной и обычной функциях.
Приемы CSS-макетирования - floats и Flexbox
Приемы CSS-макетирования - floats и Flexbox
Здравствуйте, друзья-студенты! Готовы совершенствовать свои навыки веб-дизайна? Сегодня в нашем путешествии мы рассмотрим приемы CSS-верстки - в...
Тестирование функциональных ngrx-эффектов в Angular 16 с помощью Jest
В системе управления состояниями ngrx, совместимой с Angular 16, появились функциональные эффекты. Это здорово и делает код определенно легче для...
Концепция локализации и ее применение в приложениях React ⚡️
Концепция локализации и ее применение в приложениях React ⚡️
Локализация - это процесс адаптации приложения к различным языкам и культурным требованиям. Это позволяет пользователям получить опыт, соответствующий...
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
1
5
128
2
Перейти к ответу Данный вопрос помечен как решенный

Ответы 2

проблема в том, что я знаю, что Task.Run может вызвать задачу, которая никогда не будет завершена, и я беспокоюсь, что это может вызвать ошибки.

Очевидная проблема с этим кодом заключается в отсутствии обработки исключений. С этим довольно легко справиться, поместив все внутри UdpMessageReceivedHandler в try-catch и обработав любые исключения. Как с ними справиться, зависит от вас, опубликуйте сообщение в ветке пользовательского интерфейса? Зафиксировать неудачу? Просто позволить приложению вылететь?

Вы также можете просто вызвать обработчик сообщений непосредственно из потока приема, например:

string receivedString = Encoding.UTF8.GetString(result.Buffer);
await UdpMessageReceivedHandler(receivedString);

...
private async Task UdpMessageReceivedHandler(string receivedString){
...

Но учтите, что это приведет к сериализации всей работы, поэтому, если это займет больше времени, чем отправка сообщений о скорости, вы столкнетесь с проблемами. В этой модели любые исключения будут передаваться тому, что вызывается StartListeningAsync, поэтому оно может их обработать.

Другой вариант — поместить все полученные сообщения в очередь (например, BlockingCollection) и использовать отдельную задачу для обработки сообщений из очереди. Это должно позволить вам проверить, завершается ли задача процесса очереди преждевременно с ошибкой, и обработать ее. Поток данных — это один из способов настроить конвейер очередей для обработки, но есть много других вариантов. Преимущество DataFlow в том, что он очень гибок, поэтому выполнять параллельную обработку сообщений довольно просто.

Также подумайте, действительно ли UDP является подходящим протоколом для вашего приложения. Встроенной надежности нет, поэтому следует ожидать потери пакетов. Вместо этого я бы рассмотрел какую-нибудь библиотеку, которая может передавать сообщения, например http, gRPC, MQTT или любой из многих других протоколов.

Я добавлю еще немного контекста, потому что вижу, что это было неясно: 1) UDP у меня работает нормально, строка, которую я буду транслировать, будет повторяться несколько раз, прежде чем измениться. 2) строка будет такой: $id,name,fileName Я разделю ее (',') и инициирую действие 1 = заполню локальную базу данных идентификатором и именем. действие 2 = переместить имя файла из A в B @Panagiotis Kanavos @jonasH

federico 25.04.2024 15:37

@federico Тот факт, что вы отправляете его несколько раз, не означает, что он будет успешным. И это вызовет проблемы, если операции не идемпотентны. Мне кажется, гораздо проще позволить какому-нибудь автору библиотеки побеспокоиться о доставке сообщений ровно один раз или гарантировать, что будет сообщено об ошибке.

JonasH 25.04.2024 16:04
Ответ принят как подходящий

Судя по тому, что вы показали, я бы использовал своего рода модель производитель-потребитель. На стороне потребителя вы можете решить, хотите ли вы обрабатывать очередь асинхронно или параллельно. Асинхронная обработка очереди позволит вашему пользовательскому интерфейсу оставаться отзывчивым, а параллельная обработка в параллельном потоке позволяет ускорить обработку (для очистки очереди используется больше ядер ЦП). Обратите внимание: не все задачи выигрывают от параллелизма. Параллелизм может даже усугубить проблему с точки зрения производительности и целостности.

Поскольку вы недостаточно описали исходную проблему, неясно, возможен ли параллелизм. Учитывая жестко закодированные пути к файлам в контексте операции копирования файлов, похоже, что это не вариант. В этом случае вы можете легко преобразовать приведенный ниже пример в чисто асинхронное решение, удалив Parallel.ForEachAsync из метода ProcessQueueAsync.

То, как вы читаете сетевой поток, также выглядит странно. В общем, нельзя полагаться на то, что каждый полученный пакет содержит разумный контент. Обычно вам приходится объединять несколько пакетов. Это означает, что вам нужна буферизация.
Вам также понадобится протокол (например, JSON-RPC), который позволит вам безопасно идентифицировать символ, описывающий действие. Вы не можете случайным образом выбирать символы из пакета или серии пакетов. В общем. На тот случай, если у вас этого не было на радаре.

Простое решение с использованием Channel<T> и Parallel.ForEachAsync может выглядеть следующим образом:

Параллельная версия

public class ChannelExample
{
  private Channel<byte[]>? channel;

  public async Task StartListeningAsync(CancellationToken cancellationToken = default)
  {
    Debug.WriteLine("UDP LISTENER IS ACTIVE");

    // Use using statement or expression over try-finally
    using var udpClient = new UdpClient();

    InitializeChannel();

    // Process queue in the background
    var backgroundTask = Task.Run(() => ProcessQueueAsync(cancellationToken), cancellationToken);

    try
    {
      while (true)
      {
        UdpReceiveResult result = await udpClient.ReceiveAsync(cancellationToken).ConfigureAwait(false);
        await EnqueueItemAsync(result.Buffer, cancellationToken).ConfigureAwait(false);
      }

      // Simply await the single background task to unwrap 
      // its exceptions and to catch them as usual...

      // ...use this line to wait for multiple tasks
      //await Task.WhenAll(backgroundTask).ConfigureAwait(false);

      // ...or simply await the single background task to unwrap 
      // its exceptions and to catch them as usual
      await backgroundTask.ConfigureAwait(false);
    }
    catch (/* TODO::Catch exceptions thrown by the backgroundTask Task if necessary */)
    {
    }
    catch (OperationCanceledException)
    {
      // TODO::Handle cancellation then recover or rethrow.
      throw;
    }
    catch (SocketException ex)
    {
      Debug.WriteLine($"Error in UDP listener: {ex.Message}");

      // We must rethrow the unhandled exception
      throw;
    }
    finally
    {
      FinalizeChannel();
    }
  }

  private async Task EnqueueItemAsync(byte[] data, CancellationToken cancellationToken)
  {
    ChannelWriter<byte[]> channelWriter = this.channel!.Writer;
    await channelWriter.WriteAsync(data, cancellationToken).ConfigureAwait(false);
  }

  private async Task ProcessQueueAsync(CancellationToken cancellationToken)
  {
    ChannelReader<byte[]> channelReader = this.channel!.Reader;
    var parallelOptions = new ParallelOptions()
    {
      CancellationToken = cancellationToken,
      MaxDegreeOfParallelism = Environment.ProcessorCount, // Adjust to optimize parallelism        
    };
    IAsyncEnumerable<byte[]> queue = channelReader.ReadAllAsync(cancellationToken);

    // Process queue in parallel
    await Parallel.ForEachAsync(queue, parallelOptions, ProcessItemAsync).ConfigureAwait(false);
  }

  private async ValueTask ProcessItemAsync(byte[] buffer, CancellationToken cancellationToken)
  {
    string bufferTextContent = Encoding.UTF8.GetString(buffer);
    switch (bufferTextContent[0])
    {
      // Trigger Action A
      case 'A':
        {
          Debug.WriteLine("Action A triggered!");
          // Implement your action logic here
          break;
        }
      // Trigger Action B
      case 'B':
        {
          Debug.WriteLine("Action B triggered!");
          // Implement your action logic here}
          break;
        }
      default:
        throw new NotSupportedException();
    }
  }

  private void InitializeChannel()
  {
    var channelOptions = new UnboundedChannelOptions()
    {
      AllowSynchronousContinuations = false,
      SingleReader = true,
      SingleWriter = true
    };

    this.channel = Channel.CreateUnbounded<byte[]>(channelOptions);
  }

  private void FinalizeChannel()
    => this.channel!.Writer.Complete();
}

Асинхронная версия

public class ChannelExample
{
  private Channel<byte[]>? channel;

  public async Task StartListeningAsync(CancellationToken cancellationToken = default)
  {
    Debug.WriteLine("UDP LISTENER IS ACTIVE");

    // Use using statement or expression over try-finally
    using var udpClient = new UdpClient();

    InitializeChannel();

    // Process queue in the background
    var backgroundTask = Task.Run(() => ProcessQueueAsync(cancellationToken), cancellationToken);

    try
    {
      while (true)
      {
        UdpReceiveResult result = await udpClient.ReceiveAsync(cancellationToken).ConfigureAwait(false);
        await EnqueueItemAsync(result.Buffer, cancellationToken).ConfigureAwait(false);
      }

      // Simply await the single background task to unwrap 
      // its exceptions and to catch them as usual...

      // ...use this line to wait for multiple tasks
      //await Task.WhenAll(backgroundTask).ConfigureAwait(false);

      // ...or simply await the single background task to unwrap 
      // its exceptions and to catch them as usual
      await backgroundTask.ConfigureAwait(false);
    }
    catch (/* TODO::Catch exceptions thrown by the backgroundTask Task if necessary */)
    {
    }
    catch (OperationCanceledException)
    {
      // TODO::Handle cancellation then recover or rethrow.
      throw;
    }
    catch (SocketException ex)
    {
      Debug.WriteLine($"Error in UDP listener: {ex.Message}");

      // We must rethrow the unhandled exception
      throw;
    }
    finally
    {
      FinalizeChannel();
    }
  }

  private async Task EnqueueItemAsync(byte[] data, CancellationToken cancellationToken)
  {
    ChannelWriter<byte[]> channelWriter = this.channel!.Writer;
    await channelWriter.WriteAsync(data, cancellationToken).ConfigureAwait(false);
  }

  private async Task ProcessQueueAsync(CancellationToken cancellationToken)
  {
    ChannelReader<byte[]> channelReader = this.channel!.Reader;
    
    // Process queue asynchronously
    while (await channelReader.WaitToReadAsync(cancellationToken).ConfigureAwait(false))
    {
      while (channelReader.TryRead(out byte[] buffer))
      {
        await ProcessItemAsync(buffer, cancellationToken);
      } 
    }
  }

  private async Task ProcessItemAsync(byte[] buffer, CancellationToken cancellationToken)
  {
    string bufferTextContent = Encoding.UTF8.GetString(buffer);
    switch (bufferTextContent[0])
    {
      // Trigger Action A
      case 'A':
        {
          Debug.WriteLine("Action A triggered!");
          // Implement your action logic here
          break;
        }
      // Trigger Action B
      case 'B':
        {
          Debug.WriteLine("Action B triggered!");
          // Implement your action logic here}
          break;
        }
      default:
        throw new NotSupportedException();
    }
  }

  private void InitializeChannel()
  {
    var channelOptions = new UnboundedChannelOptions()
    {
      AllowSynchronousContinuations = false,
      SingleReader = true,
      SingleWriter = true
    };

    this.channel = Channel.CreateUnbounded<byte[]>(channelOptions);
  }

  private void FinalizeChannel()
    => this.channel!.Writer.Complete();
}

спасибо, насколько я понимаю, основные проблемы с моим кодом (и предлагаемыми решениями): 1) Отсутствие обработки исключений - это ясно, и я легко это решу 2) решить, использовать ли параллелизм или асинхронность - я пытаюсь чтобы вникнуть в это, но теперь я не могу понять основную разницу между ними. Мне просто нужно, чтобы мое приложение продолжало реагировать и активно выполнять другую работу, пока listener выполняет свою работу 3) UDP ненадежен - я думал, что уже буферизовал захваченную строку данных, полученнуюString = Encoding.UTF8.GetString(result.Buffer);

federico 26.04.2024 10:41

1) Вам просто нужно дождаться фоновой задачи, чтобы развернуть возникшие исключения. Я обновил пример.

BionicCode 26.04.2024 11:37

2) Асинхронный означает, что асинхронная операция не будет блокировать ваш основной поток, который продолжает выполнять другие операции во время выполнения асинхронной операции. Похоже, что асинхронная операция выполняется как фоновый поток (на самом деле фонового потока нет). Параллельность означает, что параллельная операция не будет блокировать ваш основной поток, поскольку она использует как минимум два потока, потенциально используя несколько ядер ЦП. Оба избегают замораживания потока пользовательского интерфейса. В то время как асинхронность использует один поток, параллелизм обычно использует несколько потоков ЦП, чтобы реализовать преимущества многоядерного ЦП.

BionicCode 26.04.2024 11:37

Параллелизм проблематичен, если вам нужно объединить результат или потоки получают доступ к общим ресурсам (условия гонки). Опять же, вы так и не объяснили, в чем заключаются различные действия. Использование асинхронного решения наверняка обеспечит отзывчивость вашего пользовательского интерфейса. Параллельное решение может быть плохим выбором, но это полностью зависит от ваших действий.

BionicCode 26.04.2024 11:37

3) Вы ничего не буферизировали. Encoding.UTF8.GetString просто возвращает строку из массива байтов UTF8. Он не знает, содержит ли этот буфер только, например. «Напиши мне» или «текстовое сообщение». Вам придется буферизовать ответ до тех пор, пока вы не сможете преобразовать его в разумное сообщение. Но для этого алгоритм должен знать, что такое сообщение. Здесь на помощь приходят протоколы. Протокол описывает анатомию ответа.

BionicCode 26.04.2024 11:38

Простой протокол мог бы заключаться в отправке стартовой преамбулы, которая сообщает получателю, какое сообщение он собирается получить. Если есть только один вид, преамбула не нужна. Но вам необходимо определить начало и конец сообщения. Например, сообщение начинается с «$» и заканчивается «&». Таким образом, вы узнаете, когда можно прочитать данные из буферов, чтобы получить полезное сообщение. До этого полученные массивы байтов должны храниться в массиве байтов большего размера.

BionicCode 26.04.2024 11:38

Обычно протокол описывает сообщение по его длине, при этом сообщение делится на разные блоки, содержащие разную информацию. Каждый блок имеет заранее определенную длину. Таким образом, получатель знает, сколько блоков ему нужно прочитать, чтобы получить полное сообщение. И исходя из количества блоков и их длины он знает, как получить информацию из сообщения. Возможно, вы захотите отправить JSON (более продвинутые протоколы на основе JSON см. в разделе JSON-RPC), поскольку это позволяет избежать обработки сообщений на основе байтов.

BionicCode 26.04.2024 11:42

Вы не ошибетесь, используя асинхронное решение.

BionicCode 26.04.2024 11:48

в обновленной версии вашего кода у меня проблема: ошибка CS1929 «ConfiguredTaskAwaitable» не содержит определения для «ConfigureAwait», а лучшая перегрузка метода расширения TaskAsyncEnumerableExtensions.ConfigureAwait(IAsyncDisposabl‌​e, bool)' требует приемника типа «System.IAsyncDisposable» также я не совсем понимаю, кто такой await backgroundTask.ConfigureAwait(false); в этом контексте. кстати, ваше последнее решение пока работает нормально @BionicCode

federico 26.04.2024 13:52

Спасибо за ответ. Я просто вставил повсюду ConfigurationAwait, не проверяя код дальше. Конечно, мы настроили задачу там, где мы ее ожидаем. Это означает, что в моем примере Task.Run не должен быть настроен. Я исправил оба решения.

BionicCode 26.04.2024 15:23

Извините за путаницу. ExceptionsThrownByTheBAckgroundtask — всего лишь мнимое исключение. Название этого исключения должно говорить вам, что вы можете перехватывать любые ожидаемые исключения вместо воображаемого ExceptionsThrownByTheBAckgroundtask исключения.

BionicCode 26.04.2024 15:25

Я улучшил часть catch и заменил ExceptionsThrownByTheBAckgroundtask встроенным комментарием. надеюсь, это прояснит ситуацию.

BionicCode 26.04.2024 15:27

Другие вопросы по теме