Как лучше всего загрузить сериализованный IAsyncEnumerable в хранилище BLOB-объектов Azure?

Мы получаем множество записей с удаленного сервера в виде IAsyncEnumerable и после простого преобразования и сериализации JSON пытаемся загрузить результат (файл JSON) в Azure Blob Storage.

Могу ли я как-нибудь улучшить этот код, чтобы уменьшить максимальное использование памяти. Количество записей, которые мы получаем с удаленного сервера, велико.

public async Task ExecuteAsync()
{
    var items = GetItems();

    using var memoryStream = new MemoryStream();
    await JsonSerializer.SerializeAsync(memoryStream, items);

    var blobContainerClient = new Azure.Storage.Blobs.BlobContainerClient("connectionString", "container");
    var blobClient = blobContainerClient.GetBlobClient("blobName");

    memoryStream.Position = 0;
    await blobClient.UploadAsync(memoryStream, overwrite: true);
}

private static async IAsyncEnumerable<Item> GetItems()
{
    // This is calling a remote server that returns IAsyncEnumerable<Row>
    await foreach (var row in GetIAsyncEnumerable())
    {
        yield return new Item(row);
    }
}

До тех пор, пока с сериализацией JSON все не будет в порядке, мы получаем записи одну за другой, а преобразование типа объекта и сериализация также происходят одна за другой.

Но после await JsonSerializer.SerializeAsync(memoryStream, items) мы ждем, пока все элементы закончатся и материализуются в памяти, а затем начинаем загрузку.

Есть ли способ продолжить эту цепочку IAsyncEnumerable и начать загрузку, как только будет получен первый элемент, и продолжить загрузку для каждого нового объекта?

Стоит ли изучать PHP в 2026-2027 годах?
Стоит ли изучать PHP в 2026-2027 годах?
Привет всем, сегодня я хочу высказать свои соображения по поводу вопроса, который я уже много раз получал в своем сообществе: "Стоит ли изучать PHP в...
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
В JavaScript одним из самых запутанных понятий является поведение ключевого слова "this" в стрелочной и обычной функциях.
Приемы CSS-макетирования - floats и Flexbox
Приемы CSS-макетирования - floats и Flexbox
Здравствуйте, друзья-студенты! Готовы совершенствовать свои навыки веб-дизайна? Сегодня в нашем путешествии мы рассмотрим приемы CSS-верстки - в...
Тестирование функциональных ngrx-эффектов в Angular 16 с помощью Jest
В системе управления состояниями ngrx, совместимой с Angular 16, появились функциональные эффекты. Это здорово и делает код определенно легче для...
Концепция локализации и ее применение в приложениях React ⚡️
Концепция локализации и ее применение в приложениях React ⚡️
Локализация - это процесс адаптации приложения к различным языкам и культурным требованиям. Это позволяет пользователям получить опыт, соответствующий...
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
3
0
50
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Вы можете напрямую открыть поток для большого двоичного объекта и сериализовать в него элементы.

public async Task ExecuteAsync()
{
    var blobContainerClient = new BlobContainerClient("connectionString", "container");
    var blobClient = blobContainerClient.GetBlobClient("blobName");

    var items = GetItems();
    await using var blobStream = await blobClient.OpenWriteAsync(overwrite: true);
    await JsonSerializer.SerializeAsync(blobStream, items);
}

Чтобы иметь возможность записывать элементы по мере их появления, вам понадобится что-то вроде этого:

public async Task ExecuteAsync()
{
    var blobContainerClient = new BlobContainerClient("connectionString", "container");
    var blobClient = blobContainerClient.GetBlobClient("blobName");

    var items = GetItems();
    await using var blobStream = await blobClient.OpenWriteAsync(overwrite: true);
    await using var writer = new Utf8JsonWriter(blobStream, new JsonWriterOptions { Indented = true });

    writer.WriteStartArray();
    await foreach (var item in items)
    {
        JsonSerializer.Serialize(writer, item);
        await writer.FlushAsync(); // If you want the item immediately to be available in the blob. For better performance remove.
    }
    writer.WriteEndArray();

    await writer.FlushAsync();
}

Неужели нужно FlushAsync() после каждого предмета?

Good Night Nerd Pride 08.05.2024 13:58

@Магнус, спасибо за ответ. Я изменил SerializeAsync на Serialize во втором примере, чтобы все заработало. Кажется, это очень хороший подход, но результат очень медленный, примерно на два-три порядка медленнее, чем оригинал, и я не знаю, почему.

ctyar 08.05.2024 14:21

Попробуйте убрать флеш в цикле. Затем он будет сброшен, когда буфер заполнится.

Magnus 08.05.2024 14:36

Другие вопросы по теме

Запуск конвейера YAML с помощью C# Azure DevOps SDK
Как использовать управляемую идентификацию для аутентификации вызовов Azure REST API с помощью Azure SDK
Как создать или обновить сертификат с помощью последней версии SDK
Проблемы с установкой сертификата TLS в службе приложений Azure с использованием последней версии пакета SDK
GetNextAsTwinAsync() случайным образом дает сбой после развертывания в приложении-функции Azure
Azure.Identity.AuthenticationFailedException: ответ не задан, убедитесь, что был вызван SendAsync
Можно ли предоставить разрешение на создание группы ресурсов субъекту-службе, если подписки недоступны?
DNS Azure Resource Manager: пример кода для создания записи DNS
Утверждения вошедшего пользователя отсутствуют на странице профиля в веб-приложении ASP.NET Core, вызывающем Microsoft Graph
EventHubClient.Send: сообщение не может быть отправлено, так как оно либо получено по ссылке, либо уже отправлено по ссылке