Я использую pandas-on-spark в сочетании с регулярным выражением, чтобы удалить некоторые сокращения из столбца в фрейме данных. В пандах это все работает нормально, но у меня стоит задача перенести этот код в рабочую нагрузку на нашем искровом кластере, и поэтому решил использовать панды-на-спарке. Однако я столкнулся со странной ошибкой. Я использую следующую функцию для очистки аббревиатур (здесь несколько упрощено для удобства чтения, на самом деле abbreviations_dict имеет 61 аббревиатуру, а шаблоны представляют собой список с тремя шаблонами регулярных выражений).
import pyspark.pandas as pspd
def resolve_abbreviations(job_list: pspd.Series) -> pspd.Series:
"""
The job titles contain a lot of abbreviations for common terms.
We write them out to create a more standardized job title list.
:param job_list: df.SchoneFunctie during processing steps
:return: SchoneFunctie where abbreviations are written out in words
"""
abbreviations_dict = {
"1e": "eerste",
"1ste": "eerste",
"2e": "tweede",
"2de": "tweede",
"3e": "derde",
"3de": "derde",
"ceo": "chief executive officer",
"cfo": "chief financial officer",
"coo": "chief operating officer",
"cto": "chief technology officer",
"sr": "senior",
"tech": "technisch",
"zw": "zelfstandig werkend"
}
#Create a list of abbreviations
abbreviations_pob = list(abbreviations_dict.keys())
#For each abbreviation in this list
for abb in abbreviations_pob:
# define patterns to look for
patterns = [fr'((?<=( ))|(?<=(^))|(?<=(\\))|(?<=(\())){abb}((?=( ))|(?=(\\))|(?=($))|(?=(\))))',
fr'{abb}\.']
# actual recoding of abbreviations to written out form
value_to_replace = abbreviations_dict[abb]
for patt in patterns:
job_list = job_list.str.replace(pat=fr'{patt}', repl=f'{value_to_replace} ', regex=True)
return job_list
Когда я затем вызываю функцию с помощью серии pspd и выполняю действие, чтобы выполнить план запроса:
df['SchoneFunctie'] = resolve_abbreviations(df['SchoneFunctie'])
print(df.head(100))
он выдает ошибку java.lang.StackOverflowError. Трассировка стека слишком длинная для вставки сюда, я вставил ее часть, так как она повторяется.
23/05/05 09:53:14 WARN TaskSetManager: Lost task 0.0 in stage 4.0 (TID 4) (PC ID executor driver): java.lang.StackOverflowError
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2408)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:508)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:466)
at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:527)
at sun.reflect.GeneratedMethodAccessor22.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2319)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:508)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:466)
at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:527)
at sun.reflect.GeneratedMethodAccessor22.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2319)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:508)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:466)
at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:527)
at sun.reflect.GeneratedMethodAccessor22.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2319)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:508)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:466)
at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:527)
at sun.reflect.GeneratedMethodAccessor22.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2319)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:508)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:466)
at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:527)
at sun.reflect.GeneratedMethodAccessor22.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2319)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:508)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:466)
at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:527)
at sun.reflect.GeneratedMethodAccessor22.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2319)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:508)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:466)
at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:527)
at sun.reflect.GeneratedMethodAccessor22.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2319)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:508)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:466)
at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:527)
at sun.reflect.GeneratedMethodAccessor22.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2319)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:508)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:466)
at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:527)
at sun.reflect.GeneratedMethodAccessor22.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2319)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:508)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:466)
at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:527)
at sun.reflect.GeneratedMethodAccessor22.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2319)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:508)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:466)
at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:527)
at sun.reflect.GeneratedMethodAccessor22.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2319)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:508)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:466)
at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:527)
at sun.reflect.GeneratedMethodAccessor22.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2319)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:508)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:466)
at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:527)
at sun.reflect.GeneratedMethodAccessor22.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2319)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
Так продолжается довольно долго, пока я не получаю:
23/05/03 14:19:11 ERROR TaskSetManager: Task 0 in stage 4.0 failed 1 times; aborting job
Traceback (most recent call last):
File "C:\Program Files\JetBrains\PyCharm 2021.3\plugins\python\helpers\pydev\pydevconsole.py", line 364, in runcode
coro = func()
File "<input>", line 194, in <module>
File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\pandas\frame.py", line 12255, in __repr__
pdf = cast("DataFrame", self._get_or_create_repr_pandas_cache(max_display_count))
File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\pandas\frame.py", line 12246, in _get_or_create_repr_pandas_cache
self, "_repr_pandas_cache", {n: self.head(n + 1)._to_internal_pandas()}
File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\pandas\frame.py", line 12241, in _to_internal_pandas
return self._internal.to_pandas_frame
File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\pandas\utils.py", line 588, in wrapped_lazy_property
setattr(self, attr_name, fn(self))
File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\pandas\internal.py", line 1056, in to_pandas_frame
pdf = sdf.toPandas()
File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\sql\pandas\conversion.py", line 205, in toPandas
pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns)
File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\sql\dataframe.py", line 817, in collect
sock_info = self._jdf.collectToPython()
File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\py4j\java_gateway.py", line 1321, in __call__
return_value = get_return_value(
File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\sql\utils.py", line 190, in deco
return f(*a, **kw)
File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\py4j\protocol.py", line 326, in get_return_value
raise Py4JJavaError(
py4j.protocol.Py4JJavaError----------------------------------------
Exception occurred during processing of request from ('127.0.0.1', 54483)
Traceback (most recent call last):
File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\socketserver.py", line 316, in _handle_request_noblock
self.process_request(request, client_address)
File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\socketserver.py", line 347, in process_request
self.finish_request(request, client_address)
File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\socketserver.py", line 360, in finish_request
self.RequestHandlerClass(request, client_address, self)
File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\socketserver.py", line 747, in __init__
self.handle()
File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\accumulators.py", line 281, in handle
poll(accum_updates)
File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\accumulators.py", line 253, in poll
if func():
File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\accumulators.py", line 257, in accum_updates
num_updates = read_int(self.rfile)
File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\serializers.py", line 593, in read_int
length = stream.read(4)
File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\socket.py", line 704, in readinto
return self._sock.recv_into(b)
ConnectionResetError: [WinError 10054] An existing connection was forcibly closed by the remote host
----------------------------------------
ERROR:root:Exception while sending command.
Traceback (most recent call last):
File "C:\Program Files\JetBrains\PyCharm 2021.3\plugins\python\helpers\pydev\pydevconsole.py", line 364, in runcode
coro = func()
File "<input>", line 194, in <module>
File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\pandas\frame.py", line 12255, in __repr__
pdf = cast("DataFrame", self._get_or_create_repr_pandas_cache(max_display_count))
File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\pandas\frame.py", line 12246, in _get_or_create_repr_pandas_cache
self, "_repr_pandas_cache", {n: self.head(n + 1)._to_internal_pandas()}
File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\pandas\frame.py", line 12241, in _to_internal_pandas
return self._internal.to_pandas_frame
File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\pandas\utils.py", line 588, in wrapped_lazy_property
setattr(self, attr_name, fn(self))
File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\pandas\internal.py", line 1056, in to_pandas_frame
pdf = sdf.toPandas()
File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\sql\pandas\conversion.py", line 205, in toPandas
pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns)
File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\sql\dataframe.py", line 817, in collect
sock_info = self._jdf.collectToPython()
File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\py4j\java_gateway.py", line 1321, in __call__
return_value = get_return_value(
File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\sql\utils.py", line 190, in deco
return f(*a, **kw)
File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\py4j\protocol.py", line 326, in get_return_value
raise Py4JJavaError(
py4j.protocol.Py4JJavaError: <unprintable Py4JJavaError object>
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\py4j\clientserver.py", line 511, in send_command
answer = smart_decode(self.stream.readline()[:-1])
File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\socket.py", line 704, in readinto
return self._sock.recv_into(b)
ConnectionResetError: [WinError 10054] An existing connection was forcibly closed by the remote host
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\py4j\java_gateway.py", line 1038, in send_command
response = connection.send_command(command)
File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\py4j\clientserver.py", line 539, in send_command
raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while sending or receiving
: <exception str() failed>
Некоторые вещи, которые я пробовал / факты, которые, как мне кажется, могут иметь значение:
pspd.Dataframe делает
.apply(lambda ...) сбой внутри функции resolve_abbreviations.Любая помощь будет принята с благодарностью. Возможно, мне лучше избегать API-интерфейса pandas-on-spark и преобразовать код в обычный pyspark, поскольку API-интерфейс pandas-on-spark, по-видимому, еще недостаточно развит для запуска сценариев pandas «как есть»? Или, может быть, дизайн нашего кода ущербен по своей природе, и есть другой эффективный способ добиться подобных результатов?






Возможно ли, что ваши входные данные глубоко вложены? Это может способствовать зацикливанию вызовов стека, которые вы можете увидеть здесь.
Первое, что я бы попробовал, это запустить с большим размером стека, чем вы делаете сейчас. Я не уверен, на какой версии ОС/java вы это используете, поэтому не могу знать, какой размер стека по умолчанию на вашем компьютере. Однако обычно он колеблется в пределах от 100 КБ до 1024 КБ.
Попробуйте запустить его с размером стека 4 МБ. Внутри JVM это делается с помощью параметра Xss. Вы захотите сделать это в драйвере с параметром конфигурации spark.driver.extraJavaOptions. Что-то вроде этого:
from pyspark import SparkConf, SparkContext
conf = (SparkConf()
.setMaster("whateverMasterYouHave")
.setAppName("MyApp")
.set("spark.driver.extraJavaOptions", "-Xss4M"))
sc = SparkContext.getOrCreate(conf = conf)
Apache Spark написан на языке Scala, который работает в так называемой виртуальной машине Java. Очень полезно иметь некоторые знания о JVM, когда вы работаете со Spark (даже если вы пишете Python, в основе лежит JVM). Дополнительную информацию о параметре Xss можно найти здесь , но я также предлагаю прочитать о JVM на таких сайтах, как этот или, может быть, в некоторых вводных руководствах.
Большое спасибо - я уже понял, что некоторые знания JVM будут полезны.
Является ли StackOverFlow вызванным малым размером стека, или я прав в своем предположении о ленивом вычислении, и поэтому этот код ошибочен/антипаттерн с самого начала? Увеличение размера стека решает проблему на моем локальном компьютере, но я все еще опасаюсь применять этот код в производстве, поскольку не хочу резко увеличивать наши затраты на облачные вычисления. Я думаю, что шаблон кода, подобный представленному здесь, распространен в пандах, и поэтому вопрос может быть применим к более широкой аудитории. Поэтому я назначу награду за ответ, который также касается части ленивой оценки. Возможно, вы что-то знаете об этом?
После внимательного изучения вашей трассировки стека я понимаю, что первый абзац в моем ответе неверен. Я отредактирую это. В любом случае, я не думаю, что это связано с ленивой моделью оценки Spark. Можете ли вы отредактировать свой пост и добавить более длинную версию трассировки стека, которую вы уже опубликовали? В частности, строки, показывающие at org.apache.spark...., чтобы увидеть, где ошибка происходит в самом Spark? Возможно ли, что ваши входные данные глубоко вложены? Это может способствовать зацикливанию вызовов стека, которые я там вижу.
Я отредактировал свой вопрос, включив в него более длинную версию трассировки стека. Кажется, в нем нет строк org.apache.spark. Насколько я знаю, мои данные не вложены, я просто очищаю столбец со строками, без сложных структур данных. Обратите внимание, однако, что функция, которую я представляю здесь, несколько упрощена для удобства чтения. На самом деле в Abbreviations dict 61 аббревиатура, а Patterns — это список из трех регулярных выражений. Следовательно, двойной цикл for в функции выполняется всего 61x3 = 183 итерации.
Ммм интересно! Трассировка стека, которую вы добавили, выглядит так, как будто это трассировка стека исполнителя. У вас также есть трассировка стека драйвера? Мы должны где-то найти искровую линию Apache? Может быть, это помогает записать все ваши журналы в текстовый файл? Что-то вроде spark-submit <args> &> myTextFile? И тогда вы можете искать там строки org.apache.spark?
Я использую два способа запуска искрового кода на моей локальной машине. Первый — через консоль python на PyCharm, используя среду conda с установленным pip install pyspark. Второй способ — полная установка Spark на WSL linux. Это единственное место, где я могу использовать spark-submit. Запуск кода в WSL Linux работает нормально. Возможно, ошибка StackOverFlowError была вызвана тем, что оболочка python выделяла меньше ресурсов для искры? По крайней мере, теперь я совершенно уверен, что проблема была не в шаблоне кода, поскольку он отлично работает на других установках. Это было моей главной заботой, поэтому я принял ваш ответ.
У меня все еще возникают проблемы с этим кодом, и я задал по нему отдельный вопрос. Может быть, вы сможете пролить на это свой свет? stackoverflow.com/questions/76218874/…
Спасибо. У вас есть ссылка на дополнительную информацию по этому поводу? Например, откуда вы узнали, что параметр Xss управляет размером стека?