Контрольная точка Pyspark в Dataproc (StackOverFlowError)

Я столкнулся с ошибкой stackoveroverflow при сохранении набора данных с помощью pyspark. Я преобразую весь фрейм данных в doubletype, а затем продолжаю вычислять статистику, и я прочитал, что контрольные точки - это решение проблемы stackoverflow. Однако у меня возникли проблемы с его реализацией в dataproc.

  1. Я работаю с pyspark, и когда я проверяю фрейм данных и проверяю его с помощью df.isCheckpointed (), он возвращает false. Однако, когда я его отлаживаю, df.rdd.is_checkpointed говорит True. Есть ли проблема с пакетом / я что-то делаю не так?

  2. Я думал, что localCheckpoint более подходит для моей цели (https://spark.apache.org/docs/2.3.1/api/java/org/apache/spark/rdd/RDD.html#localCheckpoint ()), поскольку моя проблема заключалась в том, что глубина DAG была слишком глубокой, но я не мог найти ни одного варианта использования. Кроме того, если я просто установил контрольную точку, RDD говорит, что это контрольная точка (как в первом вопросе), но если я попробовал localcheckpoint, он говорит, что это не так. Кто-нибудь пробовал эту функцию?

  3. После того, как я попробовал использовать локальный автономный режим, я попробовал использовать dataproc. Я пробовал и hdfs, и облачное хранилище Google, но в любом случае хранилище было пустым, но rdd говорит, что это контрольная точка.

Кто-нибудь может мне с этим помочь? Заранее спасибо!

Стоит ли изучать PHP в 2023-2024 годах?
Стоит ли изучать PHP в 2023-2024 годах?
Привет всем, сегодня я хочу высказать свои соображения по поводу вопроса, который я уже много раз получал в своем сообществе: "Стоит ли изучать PHP в...
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
В JavaScript одним из самых запутанных понятий является поведение ключевого слова "this" в стрелочной и обычной функциях.
Приемы CSS-макетирования - floats и Flexbox
Приемы CSS-макетирования - floats и Flexbox
Здравствуйте, друзья-студенты! Готовы совершенствовать свои навыки веб-дизайна? Сегодня в нашем путешествии мы рассмотрим приемы CSS-верстки - в...
Тестирование функциональных ngrx-эффектов в Angular 16 с помощью Jest
В системе управления состояниями ngrx, совместимой с Angular 16, появились функциональные эффекты. Это здорово и делает код определенно легче для...
Концепция локализации и ее применение в приложениях React ⚡️
Концепция локализации и ее применение в приложениях React ⚡️
Локализация - это процесс адаптации приложения к различным языкам и культурным требованиям. Это позволяет пользователям получить опыт, соответствующий...
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
0
0
519
2

Ответы 2

Если вы используете localCheckpoint, он будет записывать на локальный диск исполнителей, а не на HDFS / GCS: https://spark.apache.org/docs/2.3.1/api/java/org/apache/spark/rdd/RDD.html#localCheckpoint--.

Также обратите внимание, что есть активный (контрольная точка немедленно) и не нетерпеливый (контрольная точка, когда RDD фактически материализуется) режим контрольной точки. Это может повлиять на то, что возвращают эти методы. Код часто является лучшей документацией: https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L1615.

Как правило, к таким вопросам прилагайте образцы кода (репродукции) - так мы сможем ответить на ваш вопрос более прямо.

Что касается localcheckpoint, я читал, что он записывает на локальный диск, как вы сказали, поэтому я не настраивал для этого какой-либо каталог. Я ищу способы проверить, правильно ли я установил контрольную точку, поскольку все упомянутые мной флаги имеют значение false.

Yong Hyun Kwon 04.08.2018 08:04

Мне не знакомо слово «материализованный», но после контрольной точки я сохраняю rdd и собираю статистику, и даже после этого я не могу найти rdd в каталоге

Yong Hyun Kwon 04.08.2018 08:08

Извините, что я не могу поделиться всеми кодами, но в тот момент, когда я проверяю rdds, я просто написал dataframe.rdd.checkpoint () / dataframe.persist (), а до этого sparkcontext.setCheckpointDir ("gs: //. ...... ") для установки контрольных точек.

Yong Hyun Kwon 04.08.2018 08:12

Создайте временный каталог, используя ваш объект SparkSession:

spark.sparkContext.setCheckpointDir("/tmp/checkpoints")


dataframe_name = # Any Spark Dataframe of type <pyspark.sql.dataframe.DataFrame>

на данный момент dataframe_name будет DAG, который вы можете сохранить как контрольную точку, например,

dataframe_checkpoint = dataframe_name.checkpoint()

dataframe_checkpoint также является искровым фреймом данных типа <pyspark.sql.dataframe.DataFrame>, но вместо DAG он хранит результат запроса.

Используйте контрольные точки, если:

  1. вычисление занимает много времени
  2. вычислительная цепочка слишком длинная
  3. зависит слишком много RDD

Другие вопросы по теме