Асинхронные задачи застряли в ожидании активации

Я написал простой инструмент нагрузочного тестирования, позволяющий отправлять 1000 запросов на медленную конечную точку в течение двух секунд. Большинство задач Async завершается, но в конечном итоге инструмент зависает, поскольку некоторые задачи зависают в состоянии WaitingForActivation, и я не могу понять, почему.

Возможно ли это из-за исчерпания потоков пула потоков? Любые рекомендации по устранению этой неполадки в дальнейшем.

Ниже мой код.

// See https://aka.ms/new-console-template for more information
using System.Threading;
using System;
using System.Collections.Generic;
using System.Net.Http;
using System.Threading;
using System.Threading.Tasks;
using System.Collections.Concurrent;


CancellationTokenSource cts = new CancellationTokenSource();

await new Client().PerformRequestsAsync("https://www.slowendpoint.test", 1000, 1000, cts.Token);
Console.WriteLine("All Done");
public class Client
{

    public static int completedCount = 0;
    public static int headerCompletedCount = 0;
    public static int allCompletedCount = 0;
    public static int failedCompletedCount = 0;
    public async Task PerformRequestsAsync(string url, int maxConcurrency, int totalRequests, CancellationToken cancellationToken)
    {

            try
            {
                var tasks = new List<Task>();
                for (int i = 0; i < totalRequests; i++)
                {
                    if (i==500)
                        await Task.Delay(1000);

                    // Wait to proceed until it is safe to do so
                    tasks.Add(SendAsync(url, cancellationToken));
                }

                // Wait for all tasks to complete
                await Task.WhenAll(tasks);
            }
            catch (Exception ex)
            {
                Console.WriteLine($"An error occurred: {ex.Message}");
            }
            finally
            {
                // Release the semaphore whether success or fail
            }
        
    }

    async Task SendAsync(string url, CancellationToken cancellationToken)
    {
        try
        {
            using (var httpClient = new HttpClient())
            {
                // Send the request and instruct HttpClient to complete as soon as headers are read
                var request = new HttpRequestMessage(HttpMethod.Get, url);
                var response = await httpClient.SendAsync(request, HttpCompletionOption.ResponseHeadersRead, cancellationToken);
                Console.WriteLine("HCC: " + Interlocked.Increment(ref headerCompletedCount) + " BCC: " + completedCount + " ACC: " + allCompletedCount + " FCC: " + failedCompletedCount);

                Console.WriteLine($"Headers received: {response.StatusCode}");

                // Now read the response body in chunks of 64 KB
                const int bufferSize = 64 * 1024; // 64 KB
                var buffer = new byte[bufferSize];
                var totalRead = 0;

                using (var responseStream = await response.Content.ReadAsStreamAsync())
                {
                    int bytesRead;
                    while ((bytesRead = await responseStream.ReadAsync(buffer, 0, buffer.Length, cancellationToken)) > 0)
                    {
                        totalRead += bytesRead;
                        //Console.WriteLine($"Read {bytesRead} bytes this chunk, total {totalRead} bytes read.");
                        // Process the chunk as needed (omitted for brevity)
                    }
                }
                Console.WriteLine("BCC: " + Interlocked.Increment(ref completedCount) + " HCC: " + headerCompletedCount + " ACC: " + allCompletedCount + " FCC: " + failedCompletedCount);


                Console.WriteLine("Completed reading the response.");

                Console.WriteLine("ACC: " + Interlocked.Increment(ref allCompletedCount) + " BCC: " + completedCount + " HCC: " + headerCompletedCount + " FCC: " + failedCompletedCount);

            }
        }
        catch (Exception ex)
        {
            Console.WriteLine($"An error occurred: {ex.Message}");
            Console.WriteLine("FCC: " + Interlocked.Increment(ref failedCompletedCount) + " BCC: " + completedCount + " HCC: " + headerCompletedCount + " ACC: " + allCompletedCount);
        }
    }
}

Программа в какой-то момент зависает с выводом, как показано ниже.

Вывод начнется примерно с этого

FCC: 2 BCC: 3 HCC: 997 ACC: 3
Произошла ошибка: Невозможно прочитать данные из транспортного соединения: существующее соединение было принудительно закрыто удаленным хостом..
FCC: 3 BCC: 3 HCC: 997 ACC: 3
BCC: 4 HCC: 997 ACC: 3 FCC: 3
Дочитал ответ.
ACC: 4 BCC: 4 HCC: 997 FCC: 3
HCC: 998 BCC: 4 ACC: 4 FCC: 3
Получены заголовки: ОК
HCC: 999 BCC: 4 ACC: 4 FCC: 3
Получены заголовки: ОК
HCC: 1000 BCC: 4 ACC: 4 FCC: 3
Получены заголовки: ОК
BCC: 5 HCC: 1000 ACC: 4 FCC: 3
Дочитал ответ.
ACC: 5 BCC: 5 HCC: 1000 FCC: 3
Произошла ошибка: Невозможно прочитать данные из транспортного соединения: существующее соединение было принудительно закрыто удаленным хостом..

.
.
.
.
.
.
.

А затем заморозьте что-то вроде ниже

ACC: 980 BCC: 980 HCC: 1000 FCC: 5
BCC: 981 HCC: 1000 ACC: 980 FCC: 5
Дочитал ответ.
ACC: 981 BCC: 981 HCC: 1000 FCC: 5
BCC: 982 HCC: 1000 ACC: 981 FCC: 5
ACC: 982 BCC: 982 HCC: 1000 FCC: 5

Привет @GuruStron. Я знаю, это не лучшая практика, но здесь важнее обеспечить установление связи TLS при каждом запросе. Но даже при повторном использовании одного и того же клиента у меня возникает та же проблема. Цель этого вопроса — понять такое поведение. Спасибо за предложение других инструментов нагрузочного тестирования.

mohaidar 17.04.2024 21:46

Я получил ответ на свой вопрос в github github.com/dotnet/runtime/issues/101411 Я опубликую это как ответ чуть позже

mohaidar 24.04.2024 15:04
Стоит ли изучать PHP в 2023-2024 годах?
Стоит ли изучать PHP в 2023-2024 годах?
Привет всем, сегодня я хочу высказать свои соображения по поводу вопроса, который я уже много раз получал в своем сообществе: "Стоит ли изучать PHP в...
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
В JavaScript одним из самых запутанных понятий является поведение ключевого слова "this" в стрелочной и обычной функциях.
Приемы CSS-макетирования - floats и Flexbox
Приемы CSS-макетирования - floats и Flexbox
Здравствуйте, друзья-студенты! Готовы совершенствовать свои навыки веб-дизайна? Сегодня в нашем путешествии мы рассмотрим приемы CSS-верстки - в...
Тестирование функциональных ngrx-эффектов в Angular 16 с помощью Jest
В системе управления состояниями ngrx, совместимой с Angular 16, появились функциональные эффекты. Это здорово и делает код определенно легче для...
Концепция локализации и ее применение в приложениях React ⚡️
Концепция локализации и ее применение в приложениях React ⚡️
Локализация - это процесс адаптации приложения к различным языкам и культурным требованиям. Это позволяет пользователям получить опыт, соответствующий...
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
1
3
147
2
Перейти к ответу Данный вопрос помечен как решенный

Ответы 2

Вы отправляете слишком много запросов одновременно, не дожидаясь ни одного из них. В какой-то момент сервер скажет: «Этот парень не отвечает, давайте просто закроем сокет».

Вам нужно передать отправку в пул потоков для обработки, используя Task.Run. В результате у вас останется отдельный цикл, который может быть занят добавлением дополнительных запросов.

var tasks = new List<Task>();
for (int i = 0; i < totalRequests; i++)
{
    if (i % 500 == 499)   // delay after only the first 500 or after every 500??
        await Task.Delay(1000);

    // Wait to proceed until it is safe to do so
    tasks.Add(Task.Run(() => SendAsync(url, cancellationToken), cancellationToken));
}

// Wait for all tasks to complete
await Task.WhenAll(tasks);

Вместо этого рассмотрите возможность использования Parallel.ForAsync или Parallel.ForEachAsync.

await Parallel.ForAsync(0, totalRequests,
    new ParallelOptions
    {
        MaxDegreeOfParallelism = 500,
        CancellationToken = cancellationToken,
    },
    async (i, ct) => await SendAsync(url, ct));

Кроме того, в вашей функции SendAsync отсутствуют using. Отсутствие using также может быть причиной вашей проблемы, поскольку оно не гарантирует надлежащую очистку. Также каждый раз используется новый HttpClient, что может привести к исчерпанию сокетов.

static HttpClient _httpClient = new HttpClient();

Также это могло бы быть более эффективно. Неясно, что именно вы делаете с этим циклом чтения, но вы можете использовать stream.CopyToAsync.

Честно говоря, не понимаю цели Task.Run(() => SendAsync(, разве await httpClient.SendAsync(request, ... не должен в принципе иметь тот же эффект (и, судя по HCC: 1000, похоже)?

Guru Stron 17.04.2024 03:42

Не знаю, что вы имеете в виду, но SendAsync сам по себе не ожидается автоматически. Если вы перейдете на Task.Run, пул потоков будет ждать вас.

Charlieface 17.04.2024 03:47

@Charlieface Parallel.ForAsync работал отлично, а при использовании Task.run я получил такое же поведение?? В чем разница, которую внес Parallel.ForAsync, все ли дело в управлении потоками пула потоков?

mohaidar 18.04.2024 01:28
Task.Run должен был работать аналогично, хотя Parallel намного лучше.
Charlieface 18.04.2024 02:02

@Charlieface, слишком много запросов одновременно, не ожидая ни одного из них - отсутствие ожидания задачи не означает, что она не была запущена. Это принцип «выстрелил и забыл» для параллельного выполнения.

Ryan 18.04.2024 03:05

Я тоже не понимаю необходимости Task.Run(() => SendAsync) в цикле. Зачем запускать поток ThreadPool всего на пару строк (создать запрос, отправить запрос), после await он возвращается обратно в клиентский код, т.е. обратно в цикл. Лучше оставить один поток для отправки запросов, поэтому Task.Run будет лишним. Тестирование на Manjaro Linux — оба варианта работают одинаково. Однако проблема ОП не воспроизводится.

Ryan 18.04.2024 03:13

Судя по моим тестам, кажется, что потоки портов завершения ввода-вывода исчерпаны, а максимальное количество доступных потоков ввода-вывода составляет 1000 (я написал простой код, чтобы это выяснить). Но я еще не понял, почему Parallel.ForAsync решил эту проблему. Похоже, что управление потоками здесь лучше, но мне нужно углубиться в него, чтобы понять, как оно работает.

mohaidar 18.04.2024 12:19

Да, он контролирует пул потоков и замедляет его до исчерпания. Потоки IOCP управляются пулом потоков.

Charlieface 18.04.2024 12:25

Я получил ответ на свой вопрос в github github.com/dotnet/runtime/issues/101411 Я опубликую это как ответ чуть позже

mohaidar 24.04.2024 15:04
Ответ принят как подходящий

После сбора дампа памяти команда .NET помогла мне разобраться в проблеме, и ниже приведена ее основная причина.

Одна из задач ожидает ответаStream.ReadAsync. Это может теоретически это займет вечность в зависимости от сети, если вы не укажете токен отмены. (Есть ожидающий ContentLengthReadStream.ReadAsync => SslStream.EnsureFullTlsFrameAsync => Задача Socket.ReadAsync ожидает дополнительных данных о неутилизированном сокете в вашем дампе)

В этом случае SureFullTlsFrameAsync ожидает базового Чтение сокета завершено. Это чтение не имеет собственного тайм-аута. поэтому сокет будет ждать:

  1. данные действительно поступают
  2. активация токена отмены
  3. ОС сообщая нам, что чтение не удалось

Когда вы просто читаете данные, ОС может не обнаружить, что соединение не работает, пока вы не попытаетесь записать на него запись. Один из способов сделать это может заключаться в том, чтобы включить поддержку TCP на уровне сокета.

В качестве обходного пути я передал токен отмены с тайм-аутом методу ReadAsync.

var linkedCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
timeoutCts.CancelAfter(TimeSpan.FromSeconds(120));

while (true)
{
    try
    {
        int bytesRead = await responseStream.ReadAsync(buffer, linkedCts.Token);
        if (bytesRead > 0)
        {
            totalRead += bytesRead;
        }
        else
        {
            break; // No more data to read, exit the loop
        }
    }
    catch (OperationCanceledException)
    {
        Console.WriteLine("Timeout occurred");
        break; // Exit the loop
    }
}

Более подробную информацию можно найти в следующем выпуске GitHub https://github.com/dotnet/runtime/issues/101411

Другие вопросы по теме