Обновление: мне нужно было увеличить память на узлах Dataproc, но я не мог попасть в пользовательский интерфейс Spark по разным причинам, чтобы понять, почему умирали исполнители. Возврат к этому проекту с немного большим опытом работы со Spark и GCP позволил мне быстро решить проблему.
====
Я долгое время пытался запустить фазу прогнозирования рекомендательной модели ALS в pyspark на Dataproc. Обновление: подтверждено, что этот код успешно работает на Databricks.
Код:
spark = SparkSession.builder.appName("test-mf").getOrCreate()
model = ALSModel.load("gs://my-dataproc-bucket/trained-model")
userRecs = model.recommendForAllUsers(100).collect()
(Я просто выполняю «сбор», поскольку это кажется самой простой операцией, чтобы заставить код действительно работать - изначально я делал несколько операторов select, чтобы попытаться обработать данные, и это тоже не помогло.)
Я получаю массу сообщений об ошибках за относительно короткий промежуток времени (может быть, за 15 минут от начала работы до окончательного отказа), ни одно из которых не имеет большого значения для меня и не принесло легкого дымящегося пистолета от поиска в Google.
Вот последний набор журналов, дайте мне знать, если вам что-нибудь понадобится раньше:
18/03/27 22:38:59 WARN org.apache.spark.ExecutorAllocationManager: No stages are running, but numRunningTasks != 0
Traceback (most recent call last):
File "/tmp/be3c5758e6694a4ca7f2911043f7a173/spark-matrix-factorization.py", line 35, in <module>
userRecs = model.recommendForAllUsers(100).collect()
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/dataframe.py", line 438, in collect
File "/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
File "/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py", line 319, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o50.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 4.0 failed 4 times, most recent failure: Lost task 1.3 in stage 4.0 (TID 26, my-dataproc-cluster-w-1.c.my-gcp-project.internal, executor 10): ExecutorLostFailure (executor 10 exited caused by one of the running tasks) Reason: Container marked as failed: container_1520973147661_0018_01_000012 on host: my-dataproc-cluster-w-1.c.my-gcp-project.internal. Exit status: 1. Diagnostics: Exception from container-launch.
Container id: container_1520973147661_0018_01_000012
Exit code: 1
Stack trace: ExitCodeException exitCode=1:
at org.apache.hadoop.util.Shell.runCommand(Shell.java:972)
at org.apache.hadoop.util.Shell.run(Shell.java:869)
at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:1170)
at org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:236)
at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:305)
at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:84)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
Container exited with a non-zero exit code 1
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1499)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1487)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1486)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1486)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1714)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1669)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1658)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2022)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2043)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2062)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2087)
at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:936)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.collect(RDD.scala:935)
at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:278)
at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply$mcI$sp(Dataset.scala:2803)
at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:2800)
at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:2800)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2823)
at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:2800)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:280)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:748)
18/03/27 22:38:59 INFO org.spark_project.jetty.server.AbstractConnector: Stopped Spark@446a8845{HTTP/1.1,[http/1.1]}{0.0.0.0:4040}
18/03/27 22:38:59 WARN org.apache.spark.ExecutorAllocationManager: Attempted to mark unknown executor 10 idle
ERROR: (gcloud.dataproc.jobs.submit.pyspark) Job [be3c5758e6694a4ca7f2911043f7a173] entered state [ERROR] while waiting for [DONE].
Я пытался увидеть, есть ли где-нибудь еще журналы, которые могут дать мне более информативное сообщение об ошибке, но мне не удалось настроить прокси-сервер для просмотра пользовательского интерфейса в Dataproc, и не нашел никаких сообщений после запуска диагностики кластеров gcloud dataproc .
В ответ на Денниса ниже:
Типы машин:
Главный узел
Стандарт (1 мастер, N рабочих)
Тип аппарата
n1-standard-4 (4 виртуальных ЦП, 15,0 ГБ памяти)
Размер основного диска
500 ГБ
Рабочие узлы
2
Тип аппарата
n1-standard-4 (4 виртуальных ЦП, 15,0 ГБ памяти)
Размер основного диска
500 ГБ
Локальные SSD
0
Размер данных:
Вся обученная модель ALS (которая уже содержит все данные) занимает всего 104M.
Счетчик вместо сбора дает аналогичную проблему:
18/03/28 22:37:08 ERROR org.apache.spark.scheduler.TaskSetManager: Task 3 in stage 4.0 failed 4 times; aborting job
18/03/28 22:37:08 WARN org.apache.spark.scheduler.TaskSetManager: Lost task 0.3 in stage 4.0 (TID 18, my-dataproc-cluster-w-1.c.my-dataproc-cluster.internal, executor 6): ExecutorLostFailure (executor 6 exited caused by one of the running tasks) Reason: Container marked as failed: container_1520973147661_0019_01_000008 on host: my-dataproc-cluster-w-1.c.my-dataproc-cluster.internal. Exit status: 1. Diagnostics: Exception from container-launch.
Container id: container_1520973147661_0019_01_000008
Exit code: 1
Stack trace: ExitCodeException exitCode=1:
at org.apache.hadoop.util.Shell.runCommand(Shell.java:972)
at org.apache.hadoop.util.Shell.run(Shell.java:869)
at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:1170)
at org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:236)
at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:305)
at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:84)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
Container exited with a non-zero exit code 1
18/03/28 22:37:08 WARN org.apache.spark.ExecutorAllocationManager: No stages are running, but numRunningTasks != 0
18/03/28 22:37:08 WARN org.apache.spark.ExecutorAllocationManager: Attempted to mark unknown executor 6 idle
Traceback (most recent call last):
File "/tmp/9d05f24785474f1f84720daa115af584/spark-matrix-factorization.py", line 35, in <module>
userRecs = model.recommendForAllUsers(100).count()
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/dataframe.py", line 427, in count
File "/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
File "/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py", line 319, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o50.count.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 4.0 failed 4 times, most recent failure: Lost task 3.3 in stage 4.0 (TID 19, my-dataproc-cluster-w-1.c.my-dataproc-cluster.internal, executor 6): ExecutorLostFailure (executor 6 exited caused by one of the running tasks) Reason: Container marked as failed: container_1520973147661_0019_01_000008 on host: my-dataproc-cluster-w-1.c.my-dataproc-cluster.internal. Exit status: 1. Diagnostics: Exception from container-launch.
Container id: container_1520973147661_0019_01_000008
Exit code: 1
Stack trace: ExitCodeException exitCode=1:
at org.apache.hadoop.util.Shell.runCommand(Shell.java:972)
at org.apache.hadoop.util.Shell.run(Shell.java:869)
at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:1170)
at org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:236)
at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:305)
at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:84)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
Container exited with a non-zero exit code 1
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1499)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1487)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1486)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1486)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1714)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1669)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1658)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2022)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2043)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2062)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2087)
at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:936)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.collect(RDD.scala:935)
at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:278)
at org.apache.spark.sql.Dataset$$anonfun$count$1.apply(Dataset.scala:2430)
at org.apache.spark.sql.Dataset$$anonfun$count$1.apply(Dataset.scala:2429)
at org.apache.spark.sql.Dataset$$anonfun$55.apply(Dataset.scala:2837)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:2836)
at org.apache.spark.sql.Dataset.count(Dataset.scala:2429)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:280)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:748)
18/03/28 22:37:08 INFO org.spark_project.jetty.server.AbstractConnector: Stopped Spark@ee58b0b{HTTP/1.1,[http/1.1]}{0.0.0.0:4040}
ERROR: (gcloud.dataproc.jobs.submit.pyspark) Job [9d05f24785474f1f84720daa115af584] entered state [ERROR] while waiting for [DONE].
@DennisHuo обновил вопрос ответами на ваши вопросы.
Есть ли какие-нибудь заметные отличия в настройке Databricks: версия Spark, память узла и т. д.? Вы экспериментировали с другими кластерами?
Какие типы машин вы используете? Насколько велик ваш набор данных? Что произойдет, если вы вызовете count () вместо collect ()? Collect () отправит весь распределенный набор данных в вашу единственную программу-драйвер, что может быть OOMing.