Как полностью обработать большой файл tar в памяти?

Я пытаюсь создать конечную точку в Котлине, которая принимает файлы tar размером более 10 ГБ и обрабатывает содержимое один за другим.

Файл Tar содержит миллионы файлов JSON, и я запускаю это приложение в контейнере Docker с очень ограниченным размером диска, поэтому извлечение всего архива во временный каталог не является вариантом.

Следующий подход с Apache Compress:

post(...) {
    val multipart = call.receiveMultipart()
    multipart.forEachPart { part ->
        if (part is PartData.FileItem) {
            part.streamProvider().use { inputStream ->
                BufferedInputStream(inputStream).use { bufferedInputStream ->
                    TarArchiveInputStream(bufferedInputStream).use { tarInput ->

приводит к ошибке java.io.IOException: Corrupted TAR archive. из-за предоставления данных tar в потоке вместо одной огромной переменной, содержащей все байты. Я также не могу объединить весь входной поток в одну переменную ByteArray и передать его в BufferedInputStream, потому что у меня нет 20 гигабайт памяти.

Любая помощь приветствуется.

  • входной поток -> java.io.InputStream
  • буферизованный поток -> java.io.BufferedStream

Код примера не содержит каких-либо специальных типов, принадлежащих Kotlin или Ktor.

Обновлять

Более длинный пример с отправкой файла в качестве тела POST:

call.receiveStream().buffered().use { bufferedInputStream ->
    TarArchiveInputStream(bufferedInputStream).use { tarInput ->
        var entry = tarInput.nextEntry
        while (entry != null) {
            if (!entry.isDirectory && entry.name.endsWith(".json")) {
                scope.launch {
                    val jsonString = tarInput.bufferedReader().readText()
                    val json: Map<String, JsonElement> =
                        Json.parseToJsonElement(jsonString).jsonObject

Обновление после ответа

// any stream that implements java.io.InputStream
val bodyStream = call.receiveStream()
val elems = sequence {
    bodyStream.buffered().use { bufferedInputStream ->
        TarArchiveInputStream(bufferedInputStream).use { tarInput ->
            while (true) {
                val entry = tarInput.nextEntry ?: break
                // do something with entry, yield that something, and process that something later.
                yield()
            }
        }
    }
}

Проблема заключалась в асинхронной обработке tar, это подробно объяснено в принятом ответе.

У вас ошибка появляется сразу после начала чтения, ничего не потребляя, или это происходит через какое-то время? Я не знаю этих утилит, но предоставленный вами код выглядит хорошо. Если библиотека предоставляет этот входной поток tar, то он должен иметь возможность обработки в потоковом режиме.

broot 15.08.2024 09:42

@broot Иногда выдается ошибка «Поврежденный архив TAR», иногда ошибка не возникает, а содержимое заполняется полной тарабарщиной. См.: paste.com.tr/raw/vcpscorl . Этого не произойдет, если я загружу весь архив TAR в переменную и вместо этого передам его как входной поток (проверено с небольшим TAR).

Caner - sagopanin sag kolu 15.08.2024 09:49

Я бы предположил, что это проблема с сетью/передачей данных, а не с устройством чтения tar. HTTP не очень надежен для таких огромных файлов, хотя 10 ГБ для локальной сети имеет смысл. Прежде чем приступить к детарированию чего-либо, вы можете сделать что-нибудь попроще, например подсчитать прочитанные байты или вычислить хэш.

broot 15.08.2024 09:55

Да, это локальная сеть, curl -v -F upload=@... http:/0.0.0.0:8080/..., но я получаю одну и ту же ошибку «Поврежденный архив TAR» как в малых, так и в больших выборках.

Caner - sagopanin sag kolu 15.08.2024 09:58

@broot Есть ли вероятность того, что java.io.BufferedReader может заблокировать весь ввод tar? paste.com.tr/raw/rkswbpoa

Caner - sagopanin sag kolu 15.08.2024 09:59

Опять же, я считаю, что проблема не в приведенном выше коде. BufferedInputStream — очень распространенная и хорошо протестированная утилита, используемая тысячами приложений каждый день. Это нормально. Для начала убедитесь, что вы действительно получаете отправляемые необработанные данные. Я предполагаю следующее: 1. Проблемы с сетью. 2. Многочастная обработка. Не знаю, может быть, файлы большего размера разбиваются на несколько частей? 3. Некоторое несоответствие между тем, как вы получаете данные и как вы их отправляете. В некоторых случаях Curl автоматически преобразует новые строки в отправленных файлах, может в этом дело?

broot 15.08.2024 10:25

Если вы не планируете отправлять несколько файлов и эта составная часть не является обязательным требованием, я бы рассмотрел возможность не использовать multipart, а отправить файл напрямую. Это будет проще, менее подвержено ошибкам и, возможно, более производительно, поскольку многочастный AFAIR требует отправки данных в формате base64 (или urlencode?), что занимает больше места. Ваш локон будет использовать --data-binary вместо -F. Вам также придется адаптировать серверную часть.

broot 15.08.2024 10:27

@broot, не повезло. Вывод cURL: Paste.com.tr/zdczoatz и код сервера Paste.com.tr/xkuroygl. Я получаю поток тела напрямую, но все равно получаю ошибку «поврежденный tar-архив», а содержимое файла снова перемешано и бессвязно.

Caner - sagopanin sag kolu 15.08.2024 10:50

Я обновил вопрос, указав пример кода, который вы предоставили выше, надеюсь, вы не возражаете. И я добавил ответ.

broot 15.08.2024 11:35
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
В компьютерном программировании биты играют важнейшую роль в представлении и манипулировании данными на двоичном уровне. Побитовые операции...
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Приходилось ли вам сталкиваться с требованиями, в которых вас могли попросить поднять тревогу или выдать ошибку, когда метод Java занимает больше...
Полный курс Java для разработчиков веб-сайтов и приложений
Полный курс Java для разработчиков веб-сайтов и приложений
Получите сертификат Java Web и Application Developer, используя наш курс.
0
9
53
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Я предполагаю, что проблема вызвана этой строкой:

scope.launch {

Мы не можем обрабатывать входной поток одновременно, потому что всякий раз, когда мы вызываем nextItem, мы повторно используем один и тот же входной поток, он просто ищет другое место. Таким образом, все одновременные потребители фактически потребляют информацию из одного и того же места.

Мы можем только последовательно читать входной поток. Если анализ JSON занимает много времени и вам по-прежнему нравится использовать несколько потоков, вы можете сначала последовательно прочитать массивы/строки байтов, а затем одновременно проанализировать их как JSON. Но чтение из самого потока должно быть последовательным.

Спасибо, чувак, спасибо! Я никак не мог понять, какого черта. Исправлено удалениемscope.launch и последовательной обработкой <3 <3.

Caner - sagopanin sag kolu 15.08.2024 11:49

Другие вопросы по теме