Я ищу способ отлаживать Spark Pandas UDF в версии vscode и Pycharm Community (поместить точку останова и остановить внутри UDF). В момент, когда точка останова находится внутри UDF, отладчик не останавливается.
В ссылке ниже описан локальный режим и распределенный режим.
Я пытаюсь хотя бы отладить в локальном режиме. В коде Pycharm/VS должен быть способ отладки локального enc с помощью «Присоединиться к локальному процессу». Только не могу понять как.
На данный момент я не могу найти ответа, как подключить отладчик pyspark к локальному процессу внутри UDF в VS Code (мой разработчик).
Я нашел только примеры ниже в Pycharm.
Когда я пытаюсь подключиться к процессу, я получаю сообщение ниже в Pycharm. В VS Code я получаю сообщение о том, что процесс не может быть присоединен.
Attaching to a process with PID=33,692
/home/usr_name/anaconda3/envs/yf/bin/python3.8 /snap/pycharm-community/223/plugins/python-ce/helpers/pydev/pydevd_attach_to_process/attach_pydevd.py --port 40717 --pid 33692
WARNING: The 'kernel.yama.ptrace_scope' parameter value is not 0, attach to process may not work correctly.
Please run 'sudo sysctl kernel.yama.ptrace_scope=0' to change the value temporary
or add the 'kernel.yama.ptrace_scope = 0' line to /etc/sysctl.d/10-ptrace.conf to set it permanently.
Process finished with exit code 0
Server stopped.
Пример кода, точка останова не останавливается внутри UDF pandas_function(url_json):
import pandas as pd
import pyspark
from pyspark.sql import Row
from pyspark.sql.types import StructType, StructField, IntegerType,StringType
spark = pyspark.sql.SparkSession.builder.appName("test") \
.master('local[*]') \
.getOrCreate()
sc = spark.sparkContext
# Create initial dataframe respond_sdf
d_list = [('api_1',"{'api': ['api_1', 'api_1', 'api_1'],'A': [1,2,3], 'B': [4,5,6] }"),
(' api_2', "{'api': ['api_2', 'api_2', 'api_2'],'A': [7,8,9], 'B': [10,11,12] }")]
schema = StructType([
StructField('url', StringType(), True),
StructField('content', StringType(), True)
])
jsons = sc.parallelize(rdd_list)
respond_sdf = spark.createDataFrame(jsons, schema)
# Pandas UDF
def pandas_function(url_json):
# Here I want to place breakpoint
df = pd.DataFrame(eval(url_json['content'][0]))
return df
# Pnadas UDF transformation applied to respond_sdf
respond_sdf.groupby(F.monotonically_increasing_id()).applyInPandas(pandas_function, schema=schema).show()
@mck Спасибо за информацию, в данный момент я печатаю журнал pyspark в файл и сохраняю переменные из UDF iside в pickle, чтобы получить точное состояние, но это больно. Я хотел бы плавную отладку с VS Code, остановившись внутри UDF и выполнив различные команды в консоли отладки VS Code, чтобы ускорить отладку.
это, вероятно, сложно, учитывая, как работает искра, как описано в вашем связанном вопросе. возможно, вы могли бы описать в своем вопросе, с какой проблемой вы столкнулись, следуя этому ответу?
Одна вещь, которую я могу предложить вам, Дэн, сначала примерьте свои данные на управляемую часть, она слишком велика, чтобы сразу поместиться в память, опубликуйте, что вы должны либо собрать, либо преобразовать в фрейм данных pandas, теперь вы можете попытаться запустить функция с вами будет работать на вашем фрейме данных как udf, так как она будет работать в родной отладке python.
В этом примере показано, как использовать отличную библиотеку pyspark_exray
для перехода к функциям UDF, переданным в функцию Dataframe.mapInPandas.
https://github.com/bradyjiang/pyspark_xray/blob/master/demo_app02/driver.py
если настройка отладчика слишком громоздка, вы можете поместить операторы печати внутри UDF.