PySpark UDF: пример первого преобразования

Я действительно новичок в PySpark и пытаюсь перевести некоторый код Python в pyspark. Я начинаю с панды, конвертирую в документ-матрицу терминов, а затем применяю PCA.

UDF:


    class MultiLabelCounter():
        def __init__(self, classes=None):
            self.classes_ = classes

        def fit(self,y):
            self.classes_ = 
    sorted(set(itertools.chain.from_iterable(y)))
            self.mapping = dict(zip(self.classes_,
                                         
    range(len(self.classes_))))
            return self

    def transform(self,y):
        yt = []
        for labels in y:
            data = [0]*len(self.classes_)
            for label in labels:
                data[self.mapping[label]] +=1
            yt.append(data)
        return yt

    def fit_transform(self,y):
        return self.fit(y).transform(y)

    mlb = MultiLabelCounter()
    df_grouped = 
    df_grouped.withColumnRenamed("collect_list(full)","full")

    udf_mlb = udf(lambda x: mlb.fit_transform(x),IntegerType())

    mlb_fitted = df_grouped.withColumn('full',udf_mlb(col("full")))

Я, конечно, получаю NULL результаты.

Я использую искру версии 2.4.4.

РЕДАКТИРОВАТЬ

Добавление образца ввода и вывода по запросу

Вход:

|id|val|
|--|---|
|1|[hello,world]|
|2|[goodbye, world]|
|3|[hello,hello]|

Выход:

|id|hello|goodbye|world|
|--|-----|-------|-----|
|1|1|0|1|
|2|0|1|1|
|3|2|0|0|

Можете ли вы поделиться некоторыми примерами входных и выходных наборов данных?

DKNY 18.03.2022 15:23

может ли этот метод быть функцией вне класса? Это облегчило бы задачу.

Luiz Viola 18.03.2022 15:41

@LuizViola да, я уверен, что их можно переместить за пределы класса

laila 18.03.2022 16:26

@DKNY Я добавил это

laila 18.03.2022 16:26

Глядя на ввод и вывод, кажется, что это реализация одного горячего кодирования в PySpark, это то, что вы ищете?

Luiz Viola 18.03.2022 16:30

@LuizViola как onehotencode, но с подсчетом частоты вместо двоичного

laila 18.03.2022 16:35
Стоит ли изучать PHP в 2023-2024 годах?
Стоит ли изучать PHP в 2023-2024 годах?
Привет всем, сегодня я хочу высказать свои соображения по поводу вопроса, который я уже много раз получал в своем сообществе: "Стоит ли изучать PHP в...
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
В JavaScript одним из самых запутанных понятий является поведение ключевого слова "this" в стрелочной и обычной функциях.
Приемы CSS-макетирования - floats и Flexbox
Приемы CSS-макетирования - floats и Flexbox
Здравствуйте, друзья-студенты! Готовы совершенствовать свои навыки веб-дизайна? Сегодня в нашем путешествии мы рассмотрим приемы CSS-верстки - в...
Тестирование функциональных ngrx-эффектов в Angular 16 с помощью Jest
В системе управления состояниями ngrx, совместимой с Angular 16, появились функциональные эффекты. Это здорово и делает код определенно легче для...
Концепция локализации и ее применение в приложениях React ⚡️
Концепция локализации и ее применение в приложениях React ⚡️
Локализация - это процесс адаптации приложения к различным языкам и культурным требованиям. Это позволяет пользователям получить опыт, соответствующий...
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
0
6
34
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Основываясь на общих входных данных, я попытался воспроизвести ваши выходные данные, и это работает. Пожалуйста, смотрите ниже -

Входные данные

df = spark.createDataFrame(data=[(1, ['hello', 'world']), (2, ['goodbye', 'world']), (3, ['hello', 'hello'])], schema=['id', 'vals'])

df.show()

+---+----------------+
| id|            vals|
+---+----------------+
|  1|  [hello, world]|
|  2|[goodbye, world]|
|  3|  [hello, hello]|
+---+----------------+

Теперь с помощью explode создайте отдельные строки из vals элементов списка. После этого с помощью pivot и count будет вычислена частота. Наконец, замените значения null на 0 с помощью fillna(0). Смотри ниже -

from pyspark.sql.functions import *

df1 = df.select(['id', explode(col('vals'))]).groupBy("id").pivot("col").agg(count(col("col")))

df1.fillna(0).orderBy("id").show()

Выход

+---+-------+-----+-----+
| id|goodbye|hello|world|
+---+-------+-----+-----+
|  1|      0|    1|    1|
|  2|      1|    0|    1|
|  3|      0|    2|    0|
+---+-------+-----+-----+

Спасибо @DKNY за обтекаемый sloution, который работал по мере необходимости :)

laila 21.03.2022 13:45

Другие вопросы по теме