Org.apache.spark.SparkException: задание прервано из-за сбоя этапа: задача 98 на этапе 11.0 не удалась 4 раза

Я использую Google Cloud Dataproc для искровой работы, а мой редактор — Zepplin. Я пытался записать данные json в ведро gcp. Раньше это удавалось, когда я пробовал файл размером 10 МБ. Но не удалось с файлом 10 ГБ. У моего dataproc есть 1 мастер с 4 процессорами, 26 ГБ памяти, 500 ГБ на диске. 5 рабочих с одинаковым конфигом. Я думаю, он должен был быть в состоянии обрабатывать 10 ГБ данных.

Моя команда toDatabase.repartition(10).write.json("gs://mypath")

Ошибка

org.apache.spark.SparkException: Job aborted.
  at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:224)
  at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:154)
  at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
  at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
  at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
  at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
  at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
  at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
  at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:656)
  at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:656)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
  at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:656)
  at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:273)
  at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:267)
  at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:225)
  at org.apache.spark.sql.DataFrameWriter.json(DataFrameWriter.scala:528)
  ... 54 elided
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 98 in stage 11.0 failed 4 times, most recent failure: Lost task 98.3 in stage 11.0 (TID 3895, etl-w-2.us-east1-b.c.team-etl-234919.internal, executor 294): ExecutorLostFailure (executor 294 exited caused by one of the running tasks) Reason: Container marked as failed: container_1554684028327_0001_01_000307 on host: etl-w-2.us-east1-b.c.team-etl-234919.internal. Exit status: 143. Diagnostics: [2019-04-08 01:50:14.153]Container killed on request. Exit code is 143
[2019-04-08 01:50:14.153]Container exited with a non-zero exit code 143.
[2019-04-08 01:50:14.154]Killed by external signal

Driver stacktrace:
  at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1651)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1639)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1638)
  at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
  at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1638)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
  at scala.Option.foreach(Option.scala:257)
  at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1872)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1821)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1810)
  at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
  at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2034)
  at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:194)
  ... 74 more

Любая идея о том, почему?

Будет ли ваша работа успешной, если вы запишете файл в HDFS вместо GCS?

Igor Dvorzhak 08.04.2019 06:53

Какой у вас исходный номер раздела? вы должны использовать объединение, а не перераспределение.

howie 08.04.2019 08:54

@IgorDvorzhak моя работа по записи файлов в MySQL не удалась после неудачи этой.

wwwwan 08.04.2019 15:24

Я спрашиваю, если вы измените это задание для записи в HDFS, произойдет ли сбой? Или это задание не завершается ошибкой, даже когда оно записывает в GCS, а какое-то другое задание завершается ошибкой при записи в MySQL?

Igor Dvorzhak 08.04.2019 19:12
Стоит ли изучать PHP в 2023-2024 годах?
Стоит ли изучать PHP в 2023-2024 годах?
Привет всем, сегодня я хочу высказать свои соображения по поводу вопроса, который я уже много раз получал в своем сообществе: "Стоит ли изучать PHP в...
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
В JavaScript одним из самых запутанных понятий является поведение ключевого слова "this" в стрелочной и обычной функциях.
Приемы CSS-макетирования - floats и Flexbox
Приемы CSS-макетирования - floats и Flexbox
Здравствуйте, друзья-студенты! Готовы совершенствовать свои навыки веб-дизайна? Сегодня в нашем путешествии мы рассмотрим приемы CSS-верстки - в...
Тестирование функциональных ngrx-эффектов в Angular 16 с помощью Jest
В системе управления состояниями ngrx, совместимой с Angular 16, появились функциональные эффекты. Это здорово и делает код определенно легче для...
Концепция локализации и ее применение в приложениях React ⚡️
Концепция локализации и ее применение в приложениях React ⚡️
Локализация - это процесс адаптации приложения к различным языкам и культурным требованиям. Это позволяет пользователям получить опыт, соответствующий...
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
11
4
33 238
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Скорее всего, вы сталкиваетесь с нехваткой памяти для рабочих процессов Spark, если они работают с меньшим набором данных, но не с большим. Проблемы с памятью для каждого рабочего будут в большей степени зависеть от вашего разделения и настроек для каждого исполнителя, а не от общей доступной памяти в масштабе всего кластера (поэтому создание большего кластера не поможет решить проблему такого типа).

Вы можете попробовать любую комбинацию из следующего:

  1. Переразбивка на большее количество разделов для вывода вместо 10
  2. Создайте кластер с highmem вместо standard машин
  3. Создайте кластер с настройками искровой памяти, которые изменяют соотношение памяти и ЦП: gcloud dataproc clusters create --properties spark:spark.executor.cores=1 например, каждый исполнитель будет выполнять только одну задачу за раз с одинаковым объемом памяти, тогда как Dataproc обычно запускает 2 исполнителя на машину и соответственно делит ЦП. На 4-ядерных машинах у вас обычно есть 2 исполнителя, и каждый исполнитель допускает 2 ядра. Этот параметр даст каждому из этих двух исполнителей только 1 ядро, при этом используя половину памяти машины.

Другие вопросы по теме

Данные, считанные из кафки в искру, исчезают после регистрации в виде таблицы?
Как обеспечить, чтобы все данные, принадлежащие пользователю, попадали в один и тот же файл при использовании spark?
Как можно преобразовать линейный список PySpark RDD в DataFrame?
Как добавить совершенно нерелевантный столбец во фрейм данных при использовании pyspark, spark + databricks
Как отправить slurm job, используя много воркеров, а не просто работая в локальном режиме?
Создать уникальный идентификатор для комбинации пары значений из двух столбцов в фрейме данных искры
Как использовать aggregateBykey для получения списка значений для каждого ключа?
Включить метрику Spark в LucidWorks Fusion
Как разобрать JSON, содержащий строковое свойство, представляющее JSON
Фильтрация кадров данных, обусловленных несколькими столбцами, с различными условиями в зависимости от значений столбца