Spark: как отлаживать pandas-UDF в VS Code

Я ищу способ отлаживать Spark Pandas UDF в версии vscode и Pycharm Community (поместить точку останова и остановить внутри UDF). В момент, когда точка останова находится внутри UDF, отладчик не останавливается.

В ссылке ниже описан локальный режим и распределенный режим.

Я пытаюсь хотя бы отладить в локальном режиме. В коде Pycharm/VS должен быть способ отладки локального enc с помощью «Присоединиться к локальному процессу». Только не могу понять как.

На данный момент я не могу найти ответа, как подключить отладчик pyspark к локальному процессу внутри UDF в VS Code (мой разработчик).

Я нашел только примеры ниже в Pycharm.

  1. Прикрепить к локальному процессу Как вызвать PySpark в режиме отладки?

Когда я пытаюсь подключиться к процессу, я получаю сообщение ниже в Pycharm. В VS Code я получаю сообщение о том, что процесс не может быть присоединен.

Attaching to a process with PID=33,692
/home/usr_name/anaconda3/envs/yf/bin/python3.8 /snap/pycharm-community/223/plugins/python-ce/helpers/pydev/pydevd_attach_to_process/attach_pydevd.py --port 40717 --pid 33692
WARNING: The 'kernel.yama.ptrace_scope' parameter value is not 0, attach to process may not work correctly.
         Please run 'sudo sysctl kernel.yama.ptrace_scope=0' to change the value temporary
     or add the 'kernel.yama.ptrace_scope = 0' line to /etc/sysctl.d/10-ptrace.conf to set it permanently.

Process finished with exit code 0
Server stopped.
  1. pyspark_xray https://github.com/bradyjiang/pyspark_xray С помощью этого пакета можно отлаживать rdd, работающие на worker, но мне не удалось настроить пакет для отладки UDF.

Пример кода, точка останова не останавливается внутри UDF pandas_function(url_json):

import pandas as pd
import pyspark
from pyspark.sql import Row
from pyspark.sql.types import StructType, StructField, IntegerType,StringType
spark = pyspark.sql.SparkSession.builder.appName("test") \
    .master('local[*]') \
    .getOrCreate()
sc = spark.sparkContext

# Create initial dataframe respond_sdf
d_list = [('api_1',"{'api': ['api_1', 'api_1', 'api_1'],'A': [1,2,3], 'B': [4,5,6] }"),
            (' api_2', "{'api': ['api_2', 'api_2', 'api_2'],'A': [7,8,9], 'B': [10,11,12] }")]

schema = StructType([
  StructField('url', StringType(), True),
  StructField('content', StringType(), True)
  ])

jsons = sc.parallelize(rdd_list)
respond_sdf = spark.createDataFrame(jsons, schema)

# Pandas UDF 
def pandas_function(url_json):
# Here I want to place breakpoint
    df = pd.DataFrame(eval(url_json['content'][0]))
    return df

# Pnadas UDF transformation applied to respond_sdf
respond_sdf.groupby(F.monotonically_increasing_id()).applyInPandas(pandas_function, schema=schema).show()

если настройка отладчика слишком громоздка, вы можете поместить операторы печати внутри UDF.

mck 25.12.2020 16:52

@mck Спасибо за информацию, в данный момент я печатаю журнал pyspark в файл и сохраняю переменные из UDF iside в pickle, чтобы получить точное состояние, но это больно. Я хотел бы плавную отладку с VS Code, остановившись внутри UDF и выполнив различные команды в консоли отладки VS Code, чтобы ускорить отладку.

Dan 25.12.2020 17:08

это, вероятно, сложно, учитывая, как работает искра, как описано в вашем связанном вопросе. возможно, вы могли бы описать в своем вопросе, с какой проблемой вы столкнулись, следуя этому ответу?

mck 25.12.2020 17:10

Одна вещь, которую я могу предложить вам, Дэн, сначала примерьте свои данные на управляемую часть, она слишком велика, чтобы сразу поместиться в память, опубликуйте, что вы должны либо собрать, либо преобразовать в фрейм данных pandas, теперь вы можете попытаться запустить функция с вами будет работать на вашем фрейме данных как udf, так как она будет работать в родной отладке python.

Aditya Vikram Singh 29.12.2020 17:19
1
4
2 702
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

В этом примере показано, как использовать отличную библиотеку pyspark_exray для перехода к функциям UDF, переданным в функцию Dataframe.mapInPandas.

https://github.com/bradyjiang/pyspark_xray/blob/master/demo_app02/driver.py

Другие вопросы по теме