Я столкнулся с ошибкой stackoveroverflow при сохранении набора данных с помощью pyspark. Я преобразую весь фрейм данных в doubletype, а затем продолжаю вычислять статистику, и я прочитал, что контрольные точки - это решение проблемы stackoverflow. Однако у меня возникли проблемы с его реализацией в dataproc.
Я работаю с pyspark, и когда я проверяю фрейм данных и проверяю его с помощью df.isCheckpointed (), он возвращает false. Однако, когда я его отлаживаю, df.rdd.is_checkpointed говорит True. Есть ли проблема с пакетом / я что-то делаю не так?
Я думал, что localCheckpoint более подходит для моей цели (https://spark.apache.org/docs/2.3.1/api/java/org/apache/spark/rdd/RDD.html#localCheckpoint ()), поскольку моя проблема заключалась в том, что глубина DAG была слишком глубокой, но я не мог найти ни одного варианта использования. Кроме того, если я просто установил контрольную точку, RDD говорит, что это контрольная точка (как в первом вопросе), но если я попробовал localcheckpoint, он говорит, что это не так. Кто-нибудь пробовал эту функцию?
После того, как я попробовал использовать локальный автономный режим, я попробовал использовать dataproc. Я пробовал и hdfs, и облачное хранилище Google, но в любом случае хранилище было пустым, но rdd говорит, что это контрольная точка.
Кто-нибудь может мне с этим помочь? Заранее спасибо!
Если вы используете localCheckpoint, он будет записывать на локальный диск исполнителей, а не на HDFS / GCS: https://spark.apache.org/docs/2.3.1/api/java/org/apache/spark/rdd/RDD.html#localCheckpoint--.
Также обратите внимание, что есть активный (контрольная точка немедленно) и не нетерпеливый (контрольная точка, когда RDD фактически материализуется) режим контрольной точки. Это может повлиять на то, что возвращают эти методы. Код часто является лучшей документацией: https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L1615.
Как правило, к таким вопросам прилагайте образцы кода (репродукции) - так мы сможем ответить на ваш вопрос более прямо.
Мне не знакомо слово «материализованный», но после контрольной точки я сохраняю rdd и собираю статистику, и даже после этого я не могу найти rdd в каталоге
Извините, что я не могу поделиться всеми кодами, но в тот момент, когда я проверяю rdds, я просто написал dataframe.rdd.checkpoint () / dataframe.persist (), а до этого sparkcontext.setCheckpointDir ("gs: //. ...... ") для установки контрольных точек.
Создайте временный каталог, используя ваш объект SparkSession
:
spark.sparkContext.setCheckpointDir("/tmp/checkpoints")
dataframe_name = # Any Spark Dataframe of type <pyspark.sql.dataframe.DataFrame>
на данный момент dataframe_name
будет DAG
, который вы можете сохранить как контрольную точку, например,
dataframe_checkpoint = dataframe_name.checkpoint()
dataframe_checkpoint
также является искровым фреймом данных типа <pyspark.sql.dataframe.DataFrame>
, но вместо DAG
он хранит результат запроса.
Используйте контрольные точки, если:
Что касается localcheckpoint, я читал, что он записывает на локальный диск, как вы сказали, поэтому я не настраивал для этого какой-либо каталог. Я ищу способы проверить, правильно ли я установил контрольную точку, поскольку все упомянутые мной флаги имеют значение false.