Я ищу производительный (быстрый) и надежный (без каких-либо исключений в обычный день, кроме регулирования) способ загрузки большого количества (скажем, 1000) небольших (скажем, 2 КБ) больших двоичных объектов в Azure. Блоки Azure кажутся ненужными, поскольку мои большие двоичные объекты достаточно малы, чтобы их можно было использовать в отдельном http-вызове.
AZCopy — не вариант, поскольку я ищу решение .net, а не CLI.
Я пробовал простые подходы (например, стандартный Task.WhenAll в сочетании с UploadAsync) и более сложные подходы с использованием SemaphoreSlim и повторных попыток регулирования исключений. Несмотря на это, я всегда получаю либо случайные исключения, причину которых я не понимаю (например, System.ObjectDisposeException для объекта System.Net.Sockets.Socket), либо разочаровывающую производительность (лучшее, что я получил, было 16 секунд, что кажется очень долго ждать пакетной загрузки 2-мегабайтных объектов). Я не знаю, разумны ли мои ожидания, но я надеялся получить что-то около (numberOfBlobs/параллелизм) * 200 мс, то есть 4 секунды для 1000 больших двоичных объектов и 50 одновременных потоков.
Ниже я делюсь своим неэффективным и ненадежным кодом. Убедитесь, что вы не используете его для своих приложений.
const int numberOfBlobs = 1000;
private static readonly string LogFilePath = @"C:\log.txt";
private static readonly TimeSpan RetryDelay = TimeSpan.FromSeconds(2);
private Random random = new Random();
private static int concurrentThreads = 50;
private static byte[] dataBytes = Encoding.UTF8.GetBytes(new string('A', 2048));
private static BlobUploadOptions options = new BlobUploadOptions
{
TransferOptions = new StorageTransferOptions
{
// Set the maximum number of workers that
// may be used in a parallel transfer.
MaximumConcurrency = concurrentThreads,
// 1MB.
MaximumTransferSize = 1 * 1024 * 1024
}
};
public static async Task Main(string[] args)
{
ThreadPool.SetMinThreads(concurrentThreads + 50, concurrentThreads + 50);
ServicePointManager.DefaultConnectionLimit = concurrentThreads;
//those don't seem to have any significant effect
//ServicePointManager.Expect100Continue = false;
//ServicePointManager.UseNagleAlgorithm = false;
var p = new Program();
var containerClient = await p.GetBlobContainerClient(ContainerUrl, ContainerName);
//await p.ClearContainer(containerClient);
await p.WriteAsyncWithSemaphores(containerClient);
}
private static async Task UploadStreamWithMetrics(BlobClient blobClient, MemoryStream stream)
{
Console.WriteLine($"Starting {blobClient.Name}");
Stopwatch stopwatch = new Stopwatch();
stopwatch.Start();
await blobClient.UploadAsync(stream, options);
stopwatch.Stop();
Console.WriteLine($"Stopping {blobClient.Name} Time: {stopwatch.ElapsedMilliseconds}");
}
public async Task WriteAsyncWithSemaphores(BlobContainerClient containerClient)
{
Stopwatch stopwatch = new Stopwatch();
stopwatch.Start();
var tasks = new List<Task>();
using (var throttler = new SemaphoreSlim(concurrentThreads))
{
for (int i = 0; i < numberOfBlobs; i++)
{
await throttler.WaitAsync().ConfigureAwait(false);
Console.WriteLine("tokens available: " + throttler.CurrentCount);
tasks.Add(UploadBlobAsync(containerClient, throttler));
//tasks.Add(FakeTask(containerClient, throttler));
}
await Task.WhenAll(tasks);
}
stopwatch.Stop();
System.IO.File.AppendAllText(LogFilePath, $"Method: WriteAsyncWithSemaphores, {numberOfBlobs} blobs, {concurrentThreads} Threads, Time: {stopwatch.ElapsedMilliseconds}" + Environment.NewLine);
}
private async Task UploadBlobAsync(BlobContainerClient containerClient, SemaphoreSlim throttler)
{
try
{
await UploadBlobWithRetryAsync(containerClient, Guid.NewGuid().ToString());
}
finally
{
throttler.Release();
}
}
private static async Task UploadBlobWithRetryAsync(BlobContainerClient containerClient, string filename)
{
int retryCount = 0;
bool success = false;
while (!success && retryCount < 3)
{
try
{
BlobClient blobClient = containerClient.GetBlobClient(filename);
using (var stream = new MemoryStream(dataBytes))
{
await UploadStreamWithMetrics(blobClient, stream);
}
success = true;
}
catch (RequestFailedException ex) when (ex.Status == (int)HttpStatusCode.TooManyRequests || ex.Status == (int)HttpStatusCode.ServiceUnavailable)
{
retryCount++;
await Task.Delay(RetryDelay * retryCount);
}
}
}
Обновлено: я заметил, что проблема может заключаться в том, что все доступные потоки в SemaphoreSlim не используются, но я не смог найти, что мешает их распределению.
tokens available: 46
Starting f201eaec-9c6e-4cd7-965a-fe69e831cdc2
tokens available: 45
Starting e4d4ea9b-d396-4736-a25a-570125b564d5
tokens available: 44
Starting 9a52abae-3b9e-4835-8e0a-d17bb633f476
Stopping dd8b52d5-ab94-4708-8502-d1a5cee1bdf0 Time: 143
tokens available: 44
Starting 7851ced0-22b0-46ba-8e4a-3c97452231b7
Stopping a64b86a6-fa4f-4ea8-8e32-eb4c920b46a1 Time: 123
tokens available: 44
Starting 7ea658b3-a839-4b9b-a2c5-1f4a4553de2a
tokens available: 43
Starting cbe69d17-e436-4f47-96cf-c311a26865ca
tokens available: 42
Starting 9b7aed6a-31f1-4dd5-bca3-0528a5ecb0f7
Stopping c3fe9d26-fcc7-4f3f-a4dd-c12597070a9f Time: 152
Stopping f201eaec-9c6e-4cd7-965a-fe69e831cdc2 Time: 117
tokens available: 43
Starting 77cee6e0-3e06-4260-aff4-ab42560edb7a
Stopping e4d4ea9b-d396-4736-a25a-570125b564d5 Time: 128
Stopping 9a52abae-3b9e-4835-8e0a-d17bb633f476 Time: 124
tokens available: 44
Starting 64e4da60-698b-48a2-9ea3-5a31427e300e
Stopping 7851ced0-22b0-46ba-8e4a-3c97452231b7 Time: 124
tokens available: 44
Stopping 7ea658b3-a839-4b9b-a2c5-1f4a4553de2a Time: 114
Starting bc813234-0b87-4654-8c71-88b9f0d80519
Stopping cbe69d17-e436-4f47-96cf-c311a26865ca Time: 120
tokens available: 45
В результате на самом деле он не работает с ожидаемой степенью параллелизма, поэтому, хотя отдельные вызовы выполняются быстро (около 150 мс), общая загрузка занимает около 30 с, что подтверждается приведенными выше наблюдениями: (1000 больших двоичных объектов / 5 фактических потоков используется в среднем) * 150 мс = 30 с
Я создал FakeTask, чтобы подтвердить, что все потоки обычно используются, чтобы исключить мои потенциальные проблемы с окружением:
private async Task FakeTask(BlobContainerClient containerClient, SemaphoreSlim throttler)
{
using (var stream = new MemoryStream(dataBytes))
{
await Task.Delay(random.Next(3000, 3001));
}
throttler.Release();
}
выход:
tokens available: 0
tokens available: 1
tokens available: 0
tokens available: 0
tokens available: 0
tokens available: 0
tokens available: 0
tokens available: 0
tokens available: 0
tokens available: 0
tokens available: 1
tokens available: 0
tokens available: 0
tokens available: 0
tokens available: 0
tokens available: 0
tokens available: 1
tokens available: 0
tokens available: 0
tokens available: 1
tokens available: 0
Поскольку потоки там используются правильно, в UploadBlobWithRetryAsync должно быть что-то, что предотвращает или задерживает выделение потоков.
@DasariKamali, какие изменения вы внесли, чтобы я мог увидеть, связана ли моя низкая производительность с моей сетью или с кодом? Разве вы не сталкивались время от времени с ObjectDisposeException?
Никаких исключений я не встречал.
Я увеличил степень параллелизма и оптимизировал настройки для улучшения скорости загрузки.
Я увеличил степень параллелизма и оптимизировал настройки для улучшения скорости загрузки. Я успешно загрузил 1000 больших двоичных объектов по 2 КБ в хранилище Azure в течение 4–5 секунд без каких-либо проблем.
Код:
using System.Diagnostics;
using System.Net;
using Azure;
using Azure.Storage.Blobs;
using Azure.Storage.Blobs.Models;
public class Program
{
private const int NumberOfBlobs = 1000;
private static readonly string LogFilePath = @"C:\log.txt";
private static readonly string BlobsDirectoryPath = @"C:\blobs\";
private static readonly TimeSpan RetryDelay = TimeSpan.FromSeconds(2);
private const string ConnectionString = "<storage_connec_string>";
private const string ContainerName = "<container_name>";
public static async Task Main(string[] args)
{
ServicePointManager.DefaultConnectionLimit = 1000;
ServicePointManager.Expect100Continue = false;
ServicePointManager.UseNagleAlgorithm = false;
var p = new Program();
var containerClient = await p.GetBlobContainerClient(ConnectionString, ContainerName);
await p.ClearContainer(containerClient);
await p.UploadBlobsInParallel(containerClient);
}
private async Task<BlobContainerClient> GetBlobContainerClient(string connectionString, string containerName)
{
var containerClient = new BlobContainerClient(connectionString, containerName);
await containerClient.CreateIfNotExistsAsync(PublicAccessType.None);
return containerClient;
}
private async Task ClearContainer(BlobContainerClient containerClient)
{
await foreach (var blob in containerClient.GetBlobsAsync())
{
await containerClient.DeleteBlobAsync(blob.Name);
}
}
private async Task UploadBlobsInParallel(BlobContainerClient containerClient)
{
var stopwatch = Stopwatch.StartNew();
var semaphore = new SemaphoreSlim(200);
var tasks = new List<Task>();
var files = Directory.GetFiles(BlobsDirectoryPath);
foreach (var filePath in files)
{
if (tasks.Count >= NumberOfBlobs)
break;
await semaphore.WaitAsync();
tasks.Add(Task.Run(async () =>
{
try
{
await UploadBlobWithRetryAsync(containerClient, Path.GetFileName(filePath), filePath);
}
finally
{
semaphore.Release();
}
}));
}
await Task.WhenAll(tasks);
stopwatch.Stop();
Console.WriteLine($"{NumberOfBlobs} files uploaded successfully in {stopwatch.ElapsedMilliseconds}ms.");
await File.AppendAllTextAsync(LogFilePath, $"Time: {stopwatch.ElapsedMilliseconds}ms{Environment.NewLine}");
}
private static async Task UploadBlobWithRetryAsync(BlobContainerClient containerClient, string filename, string filePath)
{
int retryCount = 0;
bool success = false;
while (!success && retryCount < 3)
{
try
{
var blobClient = containerClient.GetBlobClient(filename);
using var stream = File.OpenRead(filePath);
await blobClient.UploadAsync(stream, overwrite: true);
success = true;
}
catch (RequestFailedException ex) when (ex.Status == 429 || ex.Status == 503)
{
retryCount++;
await Task.Delay(RetryDelay * retryCount);
}
catch (Exception ex)
{
Console.WriteLine($"Failed to upload {filename}: {ex.Message}");
throw;
}
}
}
}
Выход :
1000 BLOB-файлов успешно загружены за 5 секунд, как показано ниже.
журнал.txt:
Это зависит от скорости сети. В приведенном ниже файле log.txt вы можете видеть, что время загрузки больших двоичных объектов изменилось в зависимости от скорости сети.
1000 BLOB-объектов размером 2 КБ были успешно загружены в хранилище BLOB-объектов Azure, как показано ниже.
Я запустил ваш код, и мне потребовалось 30 секунд, так что, похоже, это проблема среды, а не проблема кода. Чего я не понимаю, так это того, что если вы видите мое РЕДАКТИРОВАНИЕ в моем сообщении, отдельные загрузки выполняются быстро (150 мс), проблема в том, что SemaphoreSlim не полностью используется в моей среде, поскольку некоторые задачи и потоки остаются неиспользованными. Однако это не систематично, поскольку моя FakeTask использовала все потоки SemaphoreSlim.
@stan Похоже, проблема может быть вызвана вашей средой, учитывая, что отдельные загрузки выполняются быстро. Попробуйте увеличить лимит SemaphoreSlim, чтобы полностью использовать доступные потоки и гарантировать, что никакие задачи не будут ждать без необходимости.
Через некоторое время я наконец понял, почему мой код работал неэффективно и не выделял все доступные потоки. Оказывается, когда я не запускал режим отладки, производительность улучшилась в 10 раз. Я не понимаю, почему у FakeTask было правильное распределение потоков в режиме отладки, а у задачи UploadBlobAsync - нет. Даже если для работы режима отладки необходима некоторая синхронизация, я думал, что при запуске будут использоваться доступные потоки или вести себя одинаково для разных задач, особенно потому, что у меня не было точки останова и поскольку все еще существовала некоторая степень параллелизма (5 потоков в время), так что это до сих пор загадка. Это открытие позволило мне в конечном итоге достичь общей задержки в 3 секунды для 1000 больших двоичных объектов и 50 одновременных потоков, что соответствует задержке в 150 мс, которая у меня была при загрузке в Azure.
Я внес изменения в ваш код и загрузил 1000 больших двоичных объектов за 4–5 секунд, как показано в этом log.txt. Во многом это зависит от скорости сети.