Pandas-on-spark бросает java.lang.StackOverFlowError

Я использую pandas-on-spark в сочетании с регулярным выражением, чтобы удалить некоторые сокращения из столбца в фрейме данных. В пандах это все работает нормально, но у меня стоит задача перенести этот код в рабочую нагрузку на нашем искровом кластере, и поэтому решил использовать панды-на-спарке. Однако я столкнулся со странной ошибкой. Я использую следующую функцию для очистки аббревиатур (здесь несколько упрощено для удобства чтения, на самом деле abbreviations_dict имеет 61 аббревиатуру, а шаблоны представляют собой список с тремя шаблонами регулярных выражений).

import pyspark.pandas as pspd

def resolve_abbreviations(job_list: pspd.Series) -> pspd.Series:
    """
    The job titles contain a lot of abbreviations for common terms.
    We write them out to create a more standardized job title list.

    :param job_list: df.SchoneFunctie during processing steps
    :return: SchoneFunctie where abbreviations are written out in words
    """
    abbreviations_dict = {
        "1e": "eerste",
        "1ste": "eerste",
        "2e": "tweede",
        "2de": "tweede",
        "3e": "derde",
        "3de": "derde",
        "ceo": "chief executive officer",
        "cfo": "chief financial officer",
        "coo": "chief operating officer",
        "cto": "chief technology officer",
        "sr": "senior",
        "tech": "technisch",
        "zw": "zelfstandig werkend"
    }

    #Create a list of abbreviations
    abbreviations_pob = list(abbreviations_dict.keys())

    #For each abbreviation in this list
    for abb in abbreviations_pob:
        # define patterns to look for
        patterns = [fr'((?<=( ))|(?<=(^))|(?<=(\\))|(?<=(\())){abb}((?=( ))|(?=(\\))|(?=($))|(?=(\))))',
                    fr'{abb}\.']
        # actual recoding of abbreviations to written out form
        value_to_replace = abbreviations_dict[abb]
        for patt in patterns:
            job_list = job_list.str.replace(pat=fr'{patt}', repl=f'{value_to_replace} ', regex=True)

    return job_list

Когда я затем вызываю функцию с помощью серии pspd и выполняю действие, чтобы выполнить план запроса:

df['SchoneFunctie'] = resolve_abbreviations(df['SchoneFunctie'])
print(df.head(100))

он выдает ошибку java.lang.StackOverflowError. Трассировка стека слишком длинная для вставки сюда, я вставил ее часть, так как она повторяется.

23/05/05 09:53:14 WARN TaskSetManager: Lost task 0.0 in stage 4.0 (TID 4) (PC ID executor driver): java.lang.StackOverflowError
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2408)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:508)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:466)
    at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:527)
    at sun.reflect.GeneratedMethodAccessor22.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2319)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:508)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:466)
    at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:527)
    at sun.reflect.GeneratedMethodAccessor22.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2319)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:508)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:466)
    at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:527)
    at sun.reflect.GeneratedMethodAccessor22.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2319)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:508)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:466)
    at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:527)
    at sun.reflect.GeneratedMethodAccessor22.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2319)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:508)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:466)
    at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:527)
    at sun.reflect.GeneratedMethodAccessor22.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2319)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:508)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:466)
    at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:527)
    at sun.reflect.GeneratedMethodAccessor22.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2319)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:508)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:466)
    at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:527)
    at sun.reflect.GeneratedMethodAccessor22.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2319)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:508)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:466)
    at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:527)
    at sun.reflect.GeneratedMethodAccessor22.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2319)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:508)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:466)
    at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:527)
    at sun.reflect.GeneratedMethodAccessor22.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2319)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:508)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:466)
    at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:527)
    at sun.reflect.GeneratedMethodAccessor22.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2319)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:508)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:466)
    at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:527)
    at sun.reflect.GeneratedMethodAccessor22.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2319)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:508)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:466)
    at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:527)
    at sun.reflect.GeneratedMethodAccessor22.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2319)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:508)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:466)
    at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:527)
    at sun.reflect.GeneratedMethodAccessor22.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2319)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)

Так продолжается довольно долго, пока я не получаю:

23/05/03 14:19:11 ERROR TaskSetManager: Task 0 in stage 4.0 failed 1 times; aborting job
Traceback (most recent call last):
  File "C:\Program Files\JetBrains\PyCharm 2021.3\plugins\python\helpers\pydev\pydevconsole.py", line 364, in runcode
    coro = func()
  File "<input>", line 194, in <module>
  File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\pandas\frame.py", line 12255, in __repr__
    pdf = cast("DataFrame", self._get_or_create_repr_pandas_cache(max_display_count))
  File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\pandas\frame.py", line 12246, in _get_or_create_repr_pandas_cache
    self, "_repr_pandas_cache", {n: self.head(n + 1)._to_internal_pandas()}
  File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\pandas\frame.py", line 12241, in _to_internal_pandas
    return self._internal.to_pandas_frame
  File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\pandas\utils.py", line 588, in wrapped_lazy_property
    setattr(self, attr_name, fn(self))
  File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\pandas\internal.py", line 1056, in to_pandas_frame
    pdf = sdf.toPandas()
  File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\sql\pandas\conversion.py", line 205, in toPandas
    pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns)
  File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\sql\dataframe.py", line 817, in collect
    sock_info = self._jdf.collectToPython()
  File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\py4j\java_gateway.py", line 1321, in __call__
    return_value = get_return_value(
  File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\sql\utils.py", line 190, in deco
    return f(*a, **kw)
  File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\py4j\protocol.py", line 326, in get_return_value
    raise Py4JJavaError(
py4j.protocol.Py4JJavaError----------------------------------------
Exception occurred during processing of request from ('127.0.0.1', 54483)
Traceback (most recent call last):
  File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\socketserver.py", line 316, in _handle_request_noblock
    self.process_request(request, client_address)
  File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\socketserver.py", line 347, in process_request
    self.finish_request(request, client_address)
  File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\socketserver.py", line 360, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\socketserver.py", line 747, in __init__
    self.handle()
  File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\accumulators.py", line 281, in handle
    poll(accum_updates)
  File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\accumulators.py", line 253, in poll
    if func():
  File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\accumulators.py", line 257, in accum_updates
    num_updates = read_int(self.rfile)
  File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\serializers.py", line 593, in read_int
    length = stream.read(4)
  File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\socket.py", line 704, in readinto
    return self._sock.recv_into(b)
ConnectionResetError: [WinError 10054] An existing connection was forcibly closed by the remote host
----------------------------------------
ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "C:\Program Files\JetBrains\PyCharm 2021.3\plugins\python\helpers\pydev\pydevconsole.py", line 364, in runcode
    coro = func()
  File "<input>", line 194, in <module>
  File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\pandas\frame.py", line 12255, in __repr__
    pdf = cast("DataFrame", self._get_or_create_repr_pandas_cache(max_display_count))
  File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\pandas\frame.py", line 12246, in _get_or_create_repr_pandas_cache
    self, "_repr_pandas_cache", {n: self.head(n + 1)._to_internal_pandas()}
  File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\pandas\frame.py", line 12241, in _to_internal_pandas
    return self._internal.to_pandas_frame
  File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\pandas\utils.py", line 588, in wrapped_lazy_property
    setattr(self, attr_name, fn(self))
  File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\pandas\internal.py", line 1056, in to_pandas_frame
    pdf = sdf.toPandas()
  File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\sql\pandas\conversion.py", line 205, in toPandas
    pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns)
  File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\sql\dataframe.py", line 817, in collect
    sock_info = self._jdf.collectToPython()
  File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\py4j\java_gateway.py", line 1321, in __call__
    return_value = get_return_value(
  File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\sql\utils.py", line 190, in deco
    return f(*a, **kw)
  File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\py4j\protocol.py", line 326, in get_return_value
    raise Py4JJavaError(
py4j.protocol.Py4JJavaError: <unprintable Py4JJavaError object>
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
  File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\py4j\clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\socket.py", line 704, in readinto
    return self._sock.recv_into(b)
ConnectionResetError: [WinError 10054] An existing connection was forcibly closed by the remote host
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
  File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\py4j\java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\py4j\clientserver.py", line 539, in send_command
    raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while sending or receiving
: <exception str() failed>

Некоторые вещи, которые я пробовал / факты, которые, как мне кажется, могут иметь значение:

  • На данный момент я пытаюсь запустить это локально. Я запускаю его локально на подмножество из 5000 строк данных, так что это не должно быть проблемой. Возможно, увеличение какой-либо конфигурации по умолчанию все еще может помочь.
  • Я думаю, что это связано с ленивой оценкой в ​​spark и слишком большим DAG для spark из-за циклов for в функции. Но я понятия не имею, как решить проблему. Согласно Документация по лучшим практикам pyspark-on-pandas Я пытался реализовать контрольные точки, но это недоступно для pspd.Series, и преобразование моей серии в pspd.Dataframe делает .apply(lambda ...) сбой внутри функции resolve_abbreviations.

Любая помощь будет принята с благодарностью. Возможно, мне лучше избегать API-интерфейса pandas-on-spark и преобразовать код в обычный pyspark, поскольку API-интерфейс pandas-on-spark, по-видимому, еще недостаточно развит для запуска сценариев pandas «как есть»? Или, может быть, дизайн нашего кода ущербен по своей природе, и есть другой эффективный способ добиться подобных результатов?

Почему в Python есть оператор "pass"?
Почему в Python есть оператор "pass"?
Оператор pass в Python - это простая концепция, которую могут быстро освоить даже новички без опыта программирования.
Некоторые методы, о которых вы не знали, что они существуют в Python
Некоторые методы, о которых вы не знали, что они существуют в Python
Python - самый известный и самый простой в изучении язык в наши дни. Имея широкий спектр применения в области машинного обучения, Data Science,...
Основы Python Часть I
Основы Python Часть I
Вы когда-нибудь задумывались, почему в программах на Python вы видите приведенный ниже код?
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
Алиса и Боб имеют неориентированный граф из n узлов и трех типов ребер:
Оптимизация кода с помощью тернарного оператора Python
Оптимизация кода с помощью тернарного оператора Python
И последнее, что мы хотели бы показать вам, прежде чем двигаться дальше, это
Советы по эффективной веб-разработке с помощью Python
Советы по эффективной веб-разработке с помощью Python
Как веб-разработчик, Python может стать мощным инструментом для создания эффективных и масштабируемых веб-приложений.
4
0
129
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Возможно ли, что ваши входные данные глубоко вложены? Это может способствовать зацикливанию вызовов стека, которые вы можете увидеть здесь.

Первое, что я бы попробовал, это запустить с большим размером стека, чем вы делаете сейчас. Я не уверен, на какой версии ОС/java вы это используете, поэтому не могу знать, какой размер стека по умолчанию на вашем компьютере. Однако обычно он колеблется в пределах от 100 КБ до 1024 КБ.

Попробуйте запустить его с размером стека 4 МБ. Внутри JVM это делается с помощью параметра Xss. Вы захотите сделать это в драйвере с параметром конфигурации spark.driver.extraJavaOptions. Что-то вроде этого:

from pyspark import SparkConf, SparkContext
conf = (SparkConf()
     .setMaster("whateverMasterYouHave")
     .setAppName("MyApp")
     .set("spark.driver.extraJavaOptions", "-Xss4M"))
sc = SparkContext.getOrCreate(conf = conf)

Спасибо. У вас есть ссылка на дополнительную информацию по этому поводу? Например, откуда вы узнали, что параметр Xss управляет размером стека?

Psychotechnopath 04.05.2023 10:02

Apache Spark написан на языке Scala, который работает в так называемой виртуальной машине Java. Очень полезно иметь некоторые знания о JVM, когда вы работаете со Spark (даже если вы пишете Python, в основе лежит JVM). Дополнительную информацию о параметре Xss можно найти здесь , но я также предлагаю прочитать о JVM на таких сайтах, как этот или, может быть, в некоторых вводных руководствах.

Koedlt 04.05.2023 12:02

Большое спасибо - я уже понял, что некоторые знания JVM будут полезны.

Psychotechnopath 04.05.2023 13:00

Является ли StackOverFlow вызванным малым размером стека, или я прав в своем предположении о ленивом вычислении, и поэтому этот код ошибочен/антипаттерн с самого начала? Увеличение размера стека решает проблему на моем локальном компьютере, но я все еще опасаюсь применять этот код в производстве, поскольку не хочу резко увеличивать наши затраты на облачные вычисления. Я думаю, что шаблон кода, подобный представленному здесь, распространен в пандах, и поэтому вопрос может быть применим к более широкой аудитории. Поэтому я назначу награду за ответ, который также касается части ленивой оценки. Возможно, вы что-то знаете об этом?

Psychotechnopath 04.05.2023 14:12

После внимательного изучения вашей трассировки стека я понимаю, что первый абзац в моем ответе неверен. Я отредактирую это. В любом случае, я не думаю, что это связано с ленивой моделью оценки Spark. Можете ли вы отредактировать свой пост и добавить более длинную версию трассировки стека, которую вы уже опубликовали? В частности, строки, показывающие at org.apache.spark...., чтобы увидеть, где ошибка происходит в самом Spark? Возможно ли, что ваши входные данные глубоко вложены? Это может способствовать зацикливанию вызовов стека, которые я там вижу.

Koedlt 04.05.2023 15:51

Я отредактировал свой вопрос, включив в него более длинную версию трассировки стека. Кажется, в нем нет строк org.apache.spark. Насколько я знаю, мои данные не вложены, я просто очищаю столбец со строками, без сложных структур данных. Обратите внимание, однако, что функция, которую я представляю здесь, несколько упрощена для удобства чтения. На самом деле в Abbreviations dict 61 аббревиатура, а Patterns — это список из трех регулярных выражений. Следовательно, двойной цикл for в функции выполняется всего 61x3 = 183 итерации.

Psychotechnopath 05.05.2023 10:03

Ммм интересно! Трассировка стека, которую вы добавили, выглядит так, как будто это трассировка стека исполнителя. У вас также есть трассировка стека драйвера? Мы должны где-то найти искровую линию Apache? Может быть, это помогает записать все ваши журналы в текстовый файл? Что-то вроде spark-submit <args> &> myTextFile? И тогда вы можете искать там строки org.apache.spark?

Koedlt 08.05.2023 12:44

Я использую два способа запуска искрового кода на моей локальной машине. Первый — через консоль python на PyCharm, используя среду conda с установленным pip install pyspark. Второй способ — полная установка Spark на WSL linux. Это единственное место, где я могу использовать spark-submit. Запуск кода в WSL Linux работает нормально. Возможно, ошибка StackOverFlowError была вызвана тем, что оболочка python выделяла меньше ресурсов для искры? По крайней мере, теперь я совершенно уверен, что проблема была не в шаблоне кода, поскольку он отлично работает на других установках. Это было моей главной заботой, поэтому я принял ваш ответ.

Psychotechnopath 09.05.2023 16:51

У меня все еще возникают проблемы с этим кодом, и я задал по нему отдельный вопрос. Может быть, вы сможете пролить на это свой свет? stackoverflow.com/questions/76218874/…

Psychotechnopath 10.05.2023 15:15

Другие вопросы по теме