Объект pickle (данные модели) работает с проблемой памяти при потоковой передаче spark

Пытались применить маринованную модель для прогнозирования потоковых данных. Первоначально размер модели составлял почти 1 ГБ, и мы думали, что уменьшение может решить проблему. Использовал другой протокол и сжатие для обработки объекта и уменьшил его до 60 МБ.

Поток входных данных представляет собой запись json, и прогноз применяется к 3 ключам.

Создание объекта рассола:

Ранее:

joblib.dump(pipeline, 'itemc_nb.pkl') 

Текущий:

joblib.dump(pipeline, 'itemc_nb.pkl',compress=1,protocol=-1) 

Еще одна теория, которую я тестировал, - это потребление памяти на граничном узле, на котором выполняется потоковый скрипт. На полной мощности он работает на 70%, как видно здесь

Пограничный узел имеет емкость 22 ГБ.

Другая мысль связана с тем, сколько раз модель может вызываться, а не собирать мусор. Как это могло быть решено забрать только один раз?

    model = joblib.load(os.path.join(__location__, 'itemc_nb.pkl'))

Вызов функции для оценки входной строки приведен ниже. Здесь также могут быть недостатки, которые могут быть причиной этого.

def predict_result(text):
    ret_val = ''
    try:
        if text is not None and (type(text) == str or type(text) == unicode):
        text = text.strip()
        text = text.lower()
        text = ''.join([i for i in text if not i.isdigit()])
        text = ' '.join(text.split())
        text = ' '.join([word for word in text.split() if word not in (stopwords.words('english'))])
        text = text.split(' ', 0)
        if re.match(r"^([a-z]|[0-9])\b", text[0]): #single letter removal
            return 'non-relevant'
        elif text[0] in ('n/a','na','.','nada','no','xx',''):  #cleaning list
            return 'non-relevant'
        elif not text[0]:
            return 'non-relevant'
        else:
            prediction = model.predict(text)
            cat_name = cat_dict.get(prediction[0], 'No key found')
            ret_val = cat_name
except (AttributeError, KeyError) as e:
        ret_val = 'error'

return ret_val

На данный момент ищу несколько мнений.

Exception encountered while processing data:
An error occurred while calling o394689.insertInto.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 366.0 failed 1 times, most recent failure: Lost task 0.0 in stage 366.0 (TID 366, localhost): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/hdp/current/spark-client/python/lib/pyspark.zip/pyspark/worker.py", line 98, in main
    command = pickleSer._read_with_length(infile)
  File "/usr/hdp/current/spark-client/python/lib/pyspark.zip/pyspark/serializers.py", line 164, in _read_with_length
    return self.loads(obj)
  File "/usr/hdp/current/spark-client/python/lib/pyspark.zip/pyspark/serializers.py", line 442, in loads
    return pickle.loads(obj)
  File "/usr/hdp/current/spark-client/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 700, in subimport
    __import__(name)
  File "/tmp/spark-9e6c86f3-4d80-4bef-833e-e5a225d2824f/userFiles-1784ee88-ee98-467d-9abd-f017cccecf49/streaming_models.zip/itemc/itemc_tagger.py", line 14, in <module>
  File "/u/users/svcerpp/virtualenvs/spark_kernel/lib/python2.7/site-packages/sklearn/externals/joblib/numpy_pickle.py", line 578, in load
    obj = _unpickle(fobj, filename, mmap_mode)
  File "/u/users/svcerpp/virtualenvs/spark_kernel/lib/python2.7/site-packages/sklearn/externals/joblib/numpy_pickle.py", line 508, in _unpickle
    obj = unpickler.load()
  File "/opt/rh/python27/root/usr/lib64/python2.7/pickle.py", line 864, in load
    dispatch[key](self)
  File "/u/users/svcerpp/virtualenvs/spark_kernel/lib/python2.7/site-packages/sklearn/externals/joblib/numpy_pickle.py", line 341, in load_build
    self.stack.append(array_wrapper.read(self))
  File "/u/users/svcerpp/virtualenvs/spark_kernel/lib/python2.7/site-packages/sklearn/externals/joblib/numpy_pickle.py", line 184, in read
    array = self.read_array(unpickler)
  File "/u/users/svcerpp/virtualenvs/spark_kernel/lib/python2.7/site-packages/sklearn/externals/joblib/numpy_pickle.py", line 130, in read_array
    array = unpickler.np.empty(count, dtype=self.dtype)
MemoryError: (MemoryError(), <function subimport at 0x7f1d4f353050>, ('itemc.itemc_tagger',))

    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166)
    at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207)
    at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125)
    at org.apache.spark.sql.execution.BatchPythonEvaluation$$anonfun$doExecute$1.apply(python.scala:405)
    at org.apache.spark.sql.execution.BatchPythonEvaluation$$anonfun$doExecute$1.apply(python.scala:370)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$22.apply(RDD.scala:717)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$22.apply(RDD.scala:717)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
    at org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:96)
    at org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:95)
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.org$apache$spark$sql$hive$execution$InsertIntoHiveTable$$writeToFile$1(InsertIntoHiveTable.scala:170)
    at org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiveFile$3.apply(InsertIntoHiveTable.scala:150)
    at org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiveFile$3.apply(InsertIntoHiveTable.scala:150)
0
0
395
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Это была ошибка памяти из-за того, что большой файл pkl пытался взаимодействовать с данными живого потока. Решили эту проблему, развернув сжатую модель.

joblib.dump(pipeline, 'item.pkl',compress=1,protocol=-1) 

Размер модели увеличился с 1 ГБ до 60 МБ.

Другие вопросы по теме