Я пытаюсь создать конечную точку в Котлине, которая принимает файлы tar размером более 10 ГБ и обрабатывает содержимое один за другим.
Файл Tar содержит миллионы файлов JSON, и я запускаю это приложение в контейнере Docker с очень ограниченным размером диска, поэтому извлечение всего архива во временный каталог не является вариантом.
Следующий подход с Apache Compress:
post(...) {
val multipart = call.receiveMultipart()
multipart.forEachPart { part ->
if (part is PartData.FileItem) {
part.streamProvider().use { inputStream ->
BufferedInputStream(inputStream).use { bufferedInputStream ->
TarArchiveInputStream(bufferedInputStream).use { tarInput ->
приводит к ошибке java.io.IOException: Corrupted TAR archive.
из-за предоставления данных tar
в потоке вместо одной огромной переменной, содержащей все байты. Я также не могу объединить весь входной поток в одну переменную ByteArray
и передать его в BufferedInputStream
, потому что у меня нет 20 гигабайт памяти.
Любая помощь приветствуется.
java.io.InputStream
java.io.BufferedStream
Код примера не содержит каких-либо специальных типов, принадлежащих Kotlin или Ktor.
Обновлять
Более длинный пример с отправкой файла в качестве тела POST:
call.receiveStream().buffered().use { bufferedInputStream ->
TarArchiveInputStream(bufferedInputStream).use { tarInput ->
var entry = tarInput.nextEntry
while (entry != null) {
if (!entry.isDirectory && entry.name.endsWith(".json")) {
scope.launch {
val jsonString = tarInput.bufferedReader().readText()
val json: Map<String, JsonElement> =
Json.parseToJsonElement(jsonString).jsonObject
Обновление после ответа
// any stream that implements java.io.InputStream
val bodyStream = call.receiveStream()
val elems = sequence {
bodyStream.buffered().use { bufferedInputStream ->
TarArchiveInputStream(bufferedInputStream).use { tarInput ->
while (true) {
val entry = tarInput.nextEntry ?: break
// do something with entry, yield that something, and process that something later.
yield()
}
}
}
}
Проблема заключалась в асинхронной обработке tar, это подробно объяснено в принятом ответе.
@broot Иногда выдается ошибка «Поврежденный архив TAR», иногда ошибка не возникает, а содержимое заполняется полной тарабарщиной. См.: paste.com.tr/raw/vcpscorl . Этого не произойдет, если я загружу весь архив TAR в переменную и вместо этого передам его как входной поток (проверено с небольшим TAR).
Я бы предположил, что это проблема с сетью/передачей данных, а не с устройством чтения tar. HTTP не очень надежен для таких огромных файлов, хотя 10 ГБ для локальной сети имеет смысл. Прежде чем приступить к детарированию чего-либо, вы можете сделать что-нибудь попроще, например подсчитать прочитанные байты или вычислить хэш.
Да, это локальная сеть, curl -v -F upload=@... http:/0.0.0.0:8080/...
, но я получаю одну и ту же ошибку «Поврежденный архив TAR» как в малых, так и в больших выборках.
@broot Есть ли вероятность того, что java.io.BufferedReader
может заблокировать весь ввод tar? paste.com.tr/raw/rkswbpoa
Опять же, я считаю, что проблема не в приведенном выше коде. BufferedInputStream
— очень распространенная и хорошо протестированная утилита, используемая тысячами приложений каждый день. Это нормально. Для начала убедитесь, что вы действительно получаете отправляемые необработанные данные. Я предполагаю следующее: 1. Проблемы с сетью. 2. Многочастная обработка. Не знаю, может быть, файлы большего размера разбиваются на несколько частей? 3. Некоторое несоответствие между тем, как вы получаете данные и как вы их отправляете. В некоторых случаях Curl автоматически преобразует новые строки в отправленных файлах, может в этом дело?
Если вы не планируете отправлять несколько файлов и эта составная часть не является обязательным требованием, я бы рассмотрел возможность не использовать multipart, а отправить файл напрямую. Это будет проще, менее подвержено ошибкам и, возможно, более производительно, поскольку многочастный AFAIR требует отправки данных в формате base64 (или urlencode?), что занимает больше места. Ваш локон будет использовать --data-binary
вместо -F
. Вам также придется адаптировать серверную часть.
@broot, не повезло. Вывод cURL: Paste.com.tr/zdczoatz и код сервера Paste.com.tr/xkuroygl. Я получаю поток тела напрямую, но все равно получаю ошибку «поврежденный tar-архив», а содержимое файла снова перемешано и бессвязно.
Я обновил вопрос, указав пример кода, который вы предоставили выше, надеюсь, вы не возражаете. И я добавил ответ.
Я предполагаю, что проблема вызвана этой строкой:
scope.launch {
Мы не можем обрабатывать входной поток одновременно, потому что всякий раз, когда мы вызываем nextItem
, мы повторно используем один и тот же входной поток, он просто ищет другое место. Таким образом, все одновременные потребители фактически потребляют информацию из одного и того же места.
Мы можем только последовательно читать входной поток. Если анализ JSON занимает много времени и вам по-прежнему нравится использовать несколько потоков, вы можете сначала последовательно прочитать массивы/строки байтов, а затем одновременно проанализировать их как JSON. Но чтение из самого потока должно быть последовательным.
Спасибо, чувак, спасибо! Я никак не мог понять, какого черта. Исправлено удалениемscope.launch и последовательной обработкой <3 <3.
У вас ошибка появляется сразу после начала чтения, ничего не потребляя, или это происходит через какое-то время? Я не знаю этих утилит, но предоставленный вами код выглядит хорошо. Если библиотека предоставляет этот входной поток tar, то он должен иметь возможность обработки в потоковом режиме.