S3AsyncClient и AsyncResponseTransformer поддерживают противодавление во время загрузки

Я создал типичный API загрузки, используя стек Spring Reactive и AWS Java SDK v2. По сути, есть контроллер, который вызывает s3AsyncClient для загрузки.

@GetMapping(path = "/{filekey}")
Mono<ResponseEntity<Flux<ByteBuffer>>> downloadFile(@PathVariable("filekey") String filekey) {    
    GetObjectRequest request = GetObjectRequest.builder()
      .bucket(s3config.getBucket())
      .key(filekey)
      .build();
    
    return Mono.fromFuture(s3client.getObject(request, AsyncResponseTransformer.toPublisher()))
      .map(response -> {
        checkResult(response.response());
        String filename = getMetadataItem(response.response(),"filename",filekey);            
        return ResponseEntity.ok()
          .header(HttpHeaders.CONTENT_TYPE, response.response().contentType())
          .header(HttpHeaders.CONTENT_LENGTH, Long.toString(response.response().contentLength()))
          .header(HttpHeaders.CONTENT_DISPOSITION, "attachment; filename=\"" + filename + "\"")
          .body(Flux.from(response));
      });
}

Javadoc для этого AsyncResponseTransformer.toPublisher() интерфейса издателя включает в себя следующее:

Вы несете ответственность за подписку на этого издателя и управление соответствующее противодавление. Таким образом, этот трансформатор является всего лишь рекомендуется для расширенных случаев использования.

Netty настроена на использование метода Direct No Cleaner, т.е. выделяет DirectByteBuffers вместо HeapBuffers, а также использует UNSAFE для выделения/освобождения буферов.

-Dio.netty.maxDirectMemory составляет 2 или 3 ГБ (проверено различное поведение).

Я вижу, что время от времени возникают ошибки OutOfDirectMemory и соединение разрывается. Клиент получает преждевременное завершение потока контента.

Похоже, что S3AsyncClient может превосходить потребителей данных и направлять переполнение буферов, независимо от того, сколько памяти я даю netty. JVM остается неизменным на уровне около 300 МБ.

Я наткнулся на это для Netty: OOM убил JVM с помощью объектов Netty DirectByteBuffer размером 320 x 16 МБ

Вы не можете контролировать объем памяти, если не вызываете OOM, как вы это сделали. Пул Netty не будет вести себя как Java GC по сравнению с кучей, т.е. увеличивать некоторое регулирование/частоту его работы, чтобы использовать ресурсы в заданных пределах (выбрасывая OOM только при определенных обстоятельствах). Пул памяти Netty создан для имитации поведения встроенного распределителя, например jemalloc, поэтому его цель — сохранить столько памяти, сколько необходимо приложению для работы. По этой причине объем сохраняемой прямой памяти зависит от нагрузки на распределение, которую выполняет код приложения, т. е. от количества незавершенных выделений без освобождения.

Я предлагаю вместо того, чтобы осознать его природу, подготовить интересный тест. загрузите предварительную/тестовую машину и просто отслеживайте Netty Direct использование памяти интересующим вас приложением. Полагаю, вы настроен -Dio.netty.maxDirectMemory=0 для использования JMX чтобы раскрыть используемую прямую память, но Netty может предоставить свою собственную метрики (сохраняя настройку io.netty.maxDirectMemory), просто проверьте что библиотеки, которые его используют, позаботятся об открытии через JMX или используя любую структуру метрик. Если эти приложения не будут раскрывать API довольно прост в использовании, см. https://netty.io/4.1/api/io/netty/buffer/PooledByteBufAllocatorMetric.html

Я использую netty 4.1.89 или 4.1.108 (пробовал обновить) AWS SDK v2 2.23.21 И клиент AWS CRT 0.29.14 (последняя версия)

Я попробовал сделать Flux.from(response).rateLimit(1)безуспешно.

Мой тест производительности заключается в параллельной загрузке файлов размером 500 МБ с участием до 40 пользователей. Узел имеет 8 ГБ оперативной памяти и 1 процессорный модуль.

Я понимаю, что этого недостаточно для обработки всех пользователей, но я ожидал, что он автоматически создаст противодавление и будет продолжать потоковую передачу файлов медленнее, т. е. получить следующий буфер из S3 -> записать следующий буфер в пользователя 1, получить следующий буфер из s3 -> записать пользователю2 и т. д.

Однако даже когда я использую только один медленный потребитель, я вижу, что Netty сообщает о прямом потреблении памяти до 500 МБ, а если я остановлю, оно упадет до 16 МБ (я полагаю, кеш PoolArena по умолчанию). Итак, похоже, что S3 Async Client помещает все 500 МБ в прямые буферы Netty, и клиент медленно их истощает.

Попытка ограничить пропускную способность AWS CRT: targetThroughputInGbps(0.1) не помогло.

У меня такое ощущение, что S3AsyncClient+CRT+spring boot netty не обрабатывает противодавление автоматически. https://github.com/netty/netty/issues/13751

Поскольку я не могу контролировать скорость загрузки со стороны клиента (может быть медленное или быстрое соединение), как я могу поддерживать противодавление, чтобы поддерживать прямые буферы на определенном пределе? Это вообще возможно?

Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
В компьютерном программировании биты играют важнейшую роль в представлении и манипулировании данными на двоичном уровне. Побитовые операции...
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Приходилось ли вам сталкиваться с требованиями, в которых вас могли попросить поднять тревогу или выдать ошибку, когда метод Java занимает больше...
Полный курс Java для разработчиков веб-сайтов и приложений
Полный курс Java для разработчиков веб-сайтов и приложений
Получите сертификат Java Web и Application Developer, используя наш курс.
0
0
158
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Я открыл проблему с aws sdk: https://github.com/aws/aws-sdk-java-v2/issues/5158

В то же время я обнаружил причину: асинхронный клиент s3 (независимо от лежащего в его основе http-клиента) уважает request(n), сделанный на Flux<ByteBuffer> с помощью реактора-нетти. Проблема в том, что размер фрагмента у разных клиентов разный.

s3-CRT по умолчанию использует чанк 8MB.

s3-Netty по умолчанию использует чанк 8KB, то есть размер в 1024 раза меньший.

Reactor-netty сначала запрашивает 128 предметы, а затем пополняет их 64. (@см. MonoSendMany и MonoSendMAX_SIZE/REFILL_SIZE).

Теперь, если ваш потребитель достаточно медленный и загружает большой файл, запросы Reactor-Netty 128 * 8 = 1024MB от s3-crt и, в конечном итоге, буферы Reactor-Netty заполняются этими данными, даже если для канала WRITABILITY_CHANGED установлено значение false.

А если вы загружаете несколько файлов, легко упираться в стену максимального ограничения прямой памяти.

Поскольку MAX_SIZE/REFILL_SIZE — это жестко запрограммированные статические поля в реакторе-netty, единственным решением является уменьшение размера части/куска S3 с помощью:

S3AsyncClient.crtBuilder()
    .minimumPartSizeInBytes(1L * 1024 * 1024) // 1 MB

Это позволит S3-crt помещать 128 * 1 = 128MB max в буфер реактора-нетти при каждом запросе на загрузку. Хотя это может замедлить общую пропускную способность/производительность асинхронного клиента s3 и загрузок, это помогает поддерживать большее количество параллельных загрузок без сбоев OutOfDirectMemoryError.

Это скорее обходной путь, чем решение, но пока не будет способа настроить противодавление в реакторе MAX_SIZE/REFILL_SIZE, мне придется его использовать.

Другие вопросы по теме