Быстрая параллельная загрузка множества больших двоичных объектов в Azure в .NET

Я ищу производительный (быстрый) и надежный (без каких-либо исключений в обычный день, кроме регулирования) способ загрузки большого количества (скажем, 1000) небольших (скажем, 2 КБ) больших двоичных объектов в Azure. Блоки Azure кажутся ненужными, поскольку мои большие двоичные объекты достаточно малы, чтобы их можно было использовать в отдельном http-вызове.

AZCopy — не вариант, поскольку я ищу решение .net, а не CLI.

Я пробовал простые подходы (например, стандартный Task.WhenAll в сочетании с UploadAsync) и более сложные подходы с использованием SemaphoreSlim и повторных попыток регулирования исключений. Несмотря на это, я всегда получаю либо случайные исключения, причину которых я не понимаю (например, System.ObjectDisposeException для объекта System.Net.Sockets.Socket), либо разочаровывающую производительность (лучшее, что я получил, было 16 секунд, что кажется очень долго ждать пакетной загрузки 2-мегабайтных объектов). Я не знаю, разумны ли мои ожидания, но я надеялся получить что-то около (numberOfBlobs/параллелизм) * 200 мс, то есть 4 секунды для 1000 больших двоичных объектов и 50 одновременных потоков.

Ниже я делюсь своим неэффективным и ненадежным кодом. Убедитесь, что вы не используете его для своих приложений.

    const int numberOfBlobs = 1000;
    private static readonly string LogFilePath = @"C:\log.txt";
    private static readonly TimeSpan RetryDelay = TimeSpan.FromSeconds(2);
    private Random random = new Random();
    private static int concurrentThreads = 50;
    private static byte[] dataBytes = Encoding.UTF8.GetBytes(new string('A', 2048));

    private static BlobUploadOptions options = new BlobUploadOptions
    {
        TransferOptions = new StorageTransferOptions
        {
            // Set the maximum number of workers that 
            // may be used in a parallel transfer.
            MaximumConcurrency = concurrentThreads,

            // 1MB.
            MaximumTransferSize = 1 * 1024 * 1024
        }
    };

    public static async Task Main(string[] args)
    {
        ThreadPool.SetMinThreads(concurrentThreads + 50, concurrentThreads + 50);

        ServicePointManager.DefaultConnectionLimit = concurrentThreads;
        
        //those don't seem to have any significant effect
        //ServicePointManager.Expect100Continue = false;
        //ServicePointManager.UseNagleAlgorithm = false;
        
        var p = new Program();
        var containerClient = await p.GetBlobContainerClient(ContainerUrl, ContainerName);
        //await p.ClearContainer(containerClient);
        await p.WriteAsyncWithSemaphores(containerClient);

    }

    private static async Task UploadStreamWithMetrics(BlobClient blobClient, MemoryStream stream)
    {
        Console.WriteLine($"Starting {blobClient.Name}");

        Stopwatch stopwatch = new Stopwatch();
        stopwatch.Start();

        await blobClient.UploadAsync(stream, options);

        stopwatch.Stop();
        Console.WriteLine($"Stopping {blobClient.Name} Time: {stopwatch.ElapsedMilliseconds}");
    }


    public async Task WriteAsyncWithSemaphores(BlobContainerClient containerClient)
    {
        Stopwatch stopwatch = new Stopwatch();
        stopwatch.Start();

        var tasks = new List<Task>();
        using (var throttler = new SemaphoreSlim(concurrentThreads))
        {
            for (int i = 0; i < numberOfBlobs; i++)
            {
                await throttler.WaitAsync().ConfigureAwait(false);
                Console.WriteLine("tokens available: " + throttler.CurrentCount);
                tasks.Add(UploadBlobAsync(containerClient, throttler));
                //tasks.Add(FakeTask(containerClient, throttler));
            }

            await Task.WhenAll(tasks);
        }

        stopwatch.Stop();
        System.IO.File.AppendAllText(LogFilePath, $"Method: WriteAsyncWithSemaphores, {numberOfBlobs} blobs, {concurrentThreads} Threads, Time: {stopwatch.ElapsedMilliseconds}" + Environment.NewLine);
    }



    private async Task UploadBlobAsync(BlobContainerClient containerClient, SemaphoreSlim throttler)
    {
        try
        {
            await UploadBlobWithRetryAsync(containerClient, Guid.NewGuid().ToString());
        }
        finally
        {
            throttler.Release();
        }
    }

    private static async Task UploadBlobWithRetryAsync(BlobContainerClient containerClient, string filename)
    {
        int retryCount = 0;
        bool success = false;

        while (!success && retryCount < 3) 
        {
            try
            {
                BlobClient blobClient = containerClient.GetBlobClient(filename);
                using (var stream = new MemoryStream(dataBytes))
                {
                    await UploadStreamWithMetrics(blobClient, stream);
                }
                success = true;
            }
            catch (RequestFailedException ex) when (ex.Status == (int)HttpStatusCode.TooManyRequests || ex.Status == (int)HttpStatusCode.ServiceUnavailable)
            {
                retryCount++;
                await Task.Delay(RetryDelay * retryCount); 
            }
        }
    }

Обновлено: я заметил, что проблема может заключаться в том, что все доступные потоки в SemaphoreSlim не используются, но я не смог найти, что мешает их распределению.

tokens available: 46
Starting f201eaec-9c6e-4cd7-965a-fe69e831cdc2
tokens available: 45
Starting e4d4ea9b-d396-4736-a25a-570125b564d5
tokens available: 44
Starting 9a52abae-3b9e-4835-8e0a-d17bb633f476
Stopping dd8b52d5-ab94-4708-8502-d1a5cee1bdf0 Time: 143
tokens available: 44
Starting 7851ced0-22b0-46ba-8e4a-3c97452231b7
Stopping a64b86a6-fa4f-4ea8-8e32-eb4c920b46a1 Time: 123
tokens available: 44
Starting 7ea658b3-a839-4b9b-a2c5-1f4a4553de2a
tokens available: 43
Starting cbe69d17-e436-4f47-96cf-c311a26865ca
tokens available: 42
Starting 9b7aed6a-31f1-4dd5-bca3-0528a5ecb0f7
Stopping c3fe9d26-fcc7-4f3f-a4dd-c12597070a9f Time: 152
Stopping f201eaec-9c6e-4cd7-965a-fe69e831cdc2 Time: 117
tokens available: 43
Starting 77cee6e0-3e06-4260-aff4-ab42560edb7a
Stopping e4d4ea9b-d396-4736-a25a-570125b564d5 Time: 128
Stopping 9a52abae-3b9e-4835-8e0a-d17bb633f476 Time: 124
tokens available: 44
Starting 64e4da60-698b-48a2-9ea3-5a31427e300e
Stopping 7851ced0-22b0-46ba-8e4a-3c97452231b7 Time: 124
tokens available: 44
Stopping 7ea658b3-a839-4b9b-a2c5-1f4a4553de2a Time: 114
Starting bc813234-0b87-4654-8c71-88b9f0d80519
Stopping cbe69d17-e436-4f47-96cf-c311a26865ca Time: 120
tokens available: 45

В результате на самом деле он не работает с ожидаемой степенью параллелизма, поэтому, хотя отдельные вызовы выполняются быстро (около 150 мс), общая загрузка занимает около 30 с, что подтверждается приведенными выше наблюдениями: (1000 больших двоичных объектов / 5 фактических потоков используется в среднем) * 150 мс = 30 с

Я создал FakeTask, чтобы подтвердить, что все потоки обычно используются, чтобы исключить мои потенциальные проблемы с окружением:

private async Task FakeTask(BlobContainerClient containerClient, SemaphoreSlim throttler)
{
   using (var stream = new MemoryStream(dataBytes))
   {
       await Task.Delay(random.Next(3000, 3001));
   }
   throttler.Release();

}

выход:

tokens available: 0
tokens available: 1
tokens available: 0
tokens available: 0
tokens available: 0
tokens available: 0
tokens available: 0
tokens available: 0
tokens available: 0
tokens available: 0
tokens available: 1
tokens available: 0
tokens available: 0
tokens available: 0
tokens available: 0
tokens available: 0
tokens available: 1
tokens available: 0
tokens available: 0
tokens available: 1
tokens available: 0

Поскольку потоки там используются правильно, в UploadBlobWithRetryAsync должно быть что-то, что предотвращает или задерживает выделение потоков.

Я внес изменения в ваш код и загрузил 1000 больших двоичных объектов за 4–5 секунд, как показано в этом log.txt. Во многом это зависит от скорости сети.

Dasari Kamali 19.06.2024 17:26

@DasariKamali, какие изменения вы внесли, чтобы я мог увидеть, связана ли моя низкая производительность с моей сетью или с кодом? Разве вы не сталкивались время от времени с ObjectDisposeException?

stan 19.06.2024 18:53

Никаких исключений я не встречал.

Dasari Kamali 19.06.2024 23:47

Я увеличил степень параллелизма и оптимизировал настройки для улучшения скорости загрузки.

Dasari Kamali 20.06.2024 00:08
Как установить LAMP Stack - Security 5/5 на виртуальную машину Azure Linux VM
Как установить LAMP Stack - Security 5/5 на виртуальную машину Azure Linux VM
В предыдущей статье мы завершили установку базы данных, для тех, кто не знает.
Как установить LAMP Stack 1/2 на Azure Linux VM
Как установить LAMP Stack 1/2 на Azure Linux VM
В дополнение к нашему предыдущему сообщению о намерении Azure прекратить поддержку Azure Database для MySQL в качестве единого сервера после 16...
0
4
108
2
Перейти к ответу Данный вопрос помечен как решенный

Ответы 2

Я увеличил степень параллелизма и оптимизировал настройки для улучшения скорости загрузки. Я успешно загрузил 1000 больших двоичных объектов по 2 КБ в хранилище Azure в течение 4–5 секунд без каких-либо проблем.

Код:

using System.Diagnostics;
using System.Net;
using Azure;
using Azure.Storage.Blobs;
using Azure.Storage.Blobs.Models;

public class Program
{
    private const int NumberOfBlobs = 1000; 
    private static readonly string LogFilePath = @"C:\log.txt"; 
    private static readonly string BlobsDirectoryPath = @"C:\blobs\"; 
    private static readonly TimeSpan RetryDelay = TimeSpan.FromSeconds(2);
    private const string ConnectionString = "<storage_connec_string>";
    private const string ContainerName = "<container_name>";
    public static async Task Main(string[] args)
    {
        ServicePointManager.DefaultConnectionLimit = 1000;
        ServicePointManager.Expect100Continue = false;
        ServicePointManager.UseNagleAlgorithm = false;

        var p = new Program();
        var containerClient = await p.GetBlobContainerClient(ConnectionString, ContainerName);
        await p.ClearContainer(containerClient);
        await p.UploadBlobsInParallel(containerClient);
    }
    private async Task<BlobContainerClient> GetBlobContainerClient(string connectionString, string containerName)
    {
        var containerClient = new BlobContainerClient(connectionString, containerName);
        await containerClient.CreateIfNotExistsAsync(PublicAccessType.None);
        return containerClient;
    }
    private async Task ClearContainer(BlobContainerClient containerClient)
    {
        await foreach (var blob in containerClient.GetBlobsAsync())
        {
            await containerClient.DeleteBlobAsync(blob.Name);
        }
    }
    private async Task UploadBlobsInParallel(BlobContainerClient containerClient)
    {
        var stopwatch = Stopwatch.StartNew();
        var semaphore = new SemaphoreSlim(200);
        var tasks = new List<Task>();
        var files = Directory.GetFiles(BlobsDirectoryPath);
        foreach (var filePath in files)
        {
            if (tasks.Count >= NumberOfBlobs)
                break;
            await semaphore.WaitAsync();
            tasks.Add(Task.Run(async () =>
            {
                try
                {
                    await UploadBlobWithRetryAsync(containerClient, Path.GetFileName(filePath), filePath);
                }
                finally
                {
                    semaphore.Release();
                }
            }));
        }
        await Task.WhenAll(tasks);
        stopwatch.Stop();
        Console.WriteLine($"{NumberOfBlobs} files uploaded successfully in {stopwatch.ElapsedMilliseconds}ms.");
        await File.AppendAllTextAsync(LogFilePath, $"Time: {stopwatch.ElapsedMilliseconds}ms{Environment.NewLine}");
    }
    private static async Task UploadBlobWithRetryAsync(BlobContainerClient containerClient, string filename, string filePath)
    {
        int retryCount = 0;
        bool success = false;
        while (!success && retryCount < 3)
        {
            try
            {
                var blobClient = containerClient.GetBlobClient(filename);
                using var stream = File.OpenRead(filePath);
                await blobClient.UploadAsync(stream, overwrite: true);
                success = true;
            }
            catch (RequestFailedException ex) when (ex.Status == 429 || ex.Status == 503) 
            {
                retryCount++;
                await Task.Delay(RetryDelay * retryCount);
            }
            catch (Exception ex)
            {
                Console.WriteLine($"Failed to upload {filename}: {ex.Message}");
                throw;
            }
        }
    }
}

Выход :

1000 BLOB-файлов успешно загружены за 5 секунд, как показано ниже.

журнал.txt:

Это зависит от скорости сети. В приведенном ниже файле log.txt вы можете видеть, что время загрузки больших двоичных объектов изменилось в зависимости от скорости сети.

1000 BLOB-объектов размером 2 КБ были успешно загружены в хранилище BLOB-объектов Azure, как показано ниже.

Я запустил ваш код, и мне потребовалось 30 секунд, так что, похоже, это проблема среды, а не проблема кода. Чего я не понимаю, так это того, что если вы видите мое РЕДАКТИРОВАНИЕ в моем сообщении, отдельные загрузки выполняются быстро (150 мс), проблема в том, что SemaphoreSlim не полностью используется в моей среде, поскольку некоторые задачи и потоки остаются неиспользованными. Однако это не систематично, поскольку моя FakeTask использовала все потоки SemaphoreSlim.

stan 20.06.2024 16:29

@stan Похоже, проблема может быть вызвана вашей средой, учитывая, что отдельные загрузки выполняются быстро. Попробуйте увеличить лимит SemaphoreSlim, чтобы полностью использовать доступные потоки и гарантировать, что никакие задачи не будут ждать без необходимости.

Dasari Kamali 21.06.2024 08:23
Ответ принят как подходящий

Через некоторое время я наконец понял, почему мой код работал неэффективно и не выделял все доступные потоки. Оказывается, когда я не запускал режим отладки, производительность улучшилась в 10 раз. Я не понимаю, почему у FakeTask было правильное распределение потоков в режиме отладки, а у задачи UploadBlobAsync - нет. Даже если для работы режима отладки необходима некоторая синхронизация, я думал, что при запуске будут использоваться доступные потоки или вести себя одинаково для разных задач, особенно потому, что у меня не было точки останова и поскольку все еще существовала некоторая степень параллелизма (5 потоков в время), так что это до сих пор загадка. Это открытие позволило мне в конечном итоге достичь общей задержки в 3 секунды для 1000 больших двоичных объектов и 50 одновременных потоков, что соответствует задержке в 150 мс, которая у меня была при загрузке в Azure.

Другие вопросы по теме

Является ли суффикс контроллера обязательным в .NET Core (.NET 6)?
EntraID: как передать роли приложения ниже по течению
Asp.net core 8 выбирает несколько с выбранным по умолчанию, не работает после обновления
Параллельные задачи. Запускайте параллельные потоки, но не ждите завершения других задач и получайте последние данные из базы данных
Ошибка «МАНИФЕСТ НЕИЗВЕСТНО» при публикации стандартного веб-приложения ASP.NET Core в приложениях-контейнерах Azure
.NET Core, недопустимое состояние ModelState в навигационных свойствах AllowNull в простой модели продуктов и категорий
Как установить тайм-аут для поиска операции с Mongo в С#
Я использую VS Code, как мне плавно перейти с ASP.NET Core из .NET 7 на .NET 8?
Как настроить решение C#, такое как внутренние зависимости и зависимости NuGet, на любом ПК без дополнительных ручных настроек, зависящих от ПК?
Настройки пересылки Yarp для динамической пересылки