Я хочу развернуть векторный столбец в обычные столбцы в фрейме данных. .transform создает отдельные столбцы, но что-то не так с типами данных или «nullable», что выдает ошибку, когда я пытаюсь .show — см. пример кода ниже. Как решить проблему?
from pyspark.sql.types import *
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import udf
spark = SparkSession\
.builder\
.config("spark.driver.maxResultSize", "40g") \
.config('spark.sql.shuffle.partitions', '2001') \
.getOrCreate()
data = [(0.2, 53.3, 0.2, 53.3),
(1.1, 43.3, 0.3, 51.3),
(2.6, 22.4, 0.4, 43.3),
(3.7, 25.6, 0.2, 23.4)]
df = spark.createDataFrame(data, ['A','B','C','D'])
df.show(3)
df.printSchema()
vecAssembler = VectorAssembler(inputCols=['C','D'], outputCol = "features")
new_df = vecAssembler.transform(df)
new_df.printSchema()
new_df.show(3)
split1_udf = udf(lambda value: value[0], DoubleType())
split2_udf = udf(lambda value: value[1], DoubleType())
new_df = new_df.withColumn('c1', split1_udf('features')).withColumn('c2', split2_udf('features'))
new_df.printSchema()
new_df.show(3)
Сообщение об ошибке длиной в милю, ключевые строки в моем понимании следующие: Вызвано: net.razorvine.pickle.PickleException: ожидаемые нулевые аргументы для построения ClassDict (для numpy.dtype)
Py4JJavaErrorTraceback (последний последний вызов) ----> 1 new_df.show(3) /opt/cloudera/parcels/SPARK2/lib/spark2/python/pyspark/sql/dataframe.py в show(self, n, усечь, по вертикали) --> 350 print(self._jdf.showString(n, 20, по вертикали))
/usr/local/lib/python2.7/site-packages/py4j/java_gateway.pyc в __call__(self, *args) 1256 return_value = get_return_value( -> 1257 ответ, self.gateway_client, self.target_id, self.name) /opt/cloudera/parcels/SPARK2/lib/spark2/python/pyspark/sql/utils.py в deco(*a, **kw) ---> 63 return f(*a, **kw) / usr/local/lib/python2.7/site-packages/py4j/protocol.pyc в get_return_value(ответ, gateway_client, target_id, имя) --> формат 328(target_id, ".", имя), значение)
@Goodfaithuser не вставляйте сообщение об ошибке в комментарии. Вместо редактировать ваш вопрос должен включать полную трассировку. Я понимаю, что это будет долго, но это то, что есть.
столбец признаков содержит тип pyspark.ml.linalg.DenseVector
, а элементы вектора признаков имеют тип numpy.float64
.
Чтобы преобразовать numpy dtypes
в нативные python
типы value.item()
split1_udf = udf(lambda value: value[0].item(), DoubleType())
split2_udf = udf(lambda value: value[1].item(), DoubleType())
Использование этого исправления приводит к следующему выводу
+---+----+---+----+----------+---+----+
| A| B| C| D| features| c1| c2|
+---+----+---+----+----------+---+----+
|0.2|53.3|0.2|53.3|[0.2,53.3]|0.2|53.3|
|1.1|43.3|0.3|51.3|[0.3,51.3]|0.3|51.3|
|2.6|22.4|0.4|43.3|[0.4,43.3]|0.4|43.3|
|3.7|25.6|0.2|23.4|[0.2,23.4]|0.2|23.4|
+---+----+---+----+----------+---+----+
Я не знаю, в чем проблема с UDF. Но я нашел другое решение - ниже.
data = [(0.2, 53.3, 0.2, 53.3),
(1.1, 43.3, 0.3, 51.3),
(2.6, 22.4, 0.4, 43.3),
(3.7, 25.6, 0.2, 23.4)]
df = spark.createDataFrame(data, ['A','B','C','D'])
vecAssembler = VectorAssembler(inputCols=['C','D'], outputCol = "features")
new_df = vecAssembler.transform(df)
def extract(row):
return (row.A, row.B,row.C,row.D,) + tuple(row.features.toArray().tolist())
extracted_df = new_df.rdd.map(extract).toDF(['A','B','C','D', 'col1', 'col2'])
extracted_df.show()
В чем ошибка?