Pyspark: динамически добавить одну строку в окончательный фрейм данных

У меня есть окончательный фрейм данных в этом формате:

  • Product_ID: строка
  • Product_COD: строка
  • Product_NAM: строка
  • Продукт_VER: целое число
  • ProductLine_NAM: строка
  • Language_COD: строка
  • ProductType_NAM: строка
  • Load_DAT: целое число
  • LoadEnd_DAT: целое число
  • edmChange_DTT: метка времени

и я хочу добавить новую строку в этот фрейм данных, где идентификатор (Product_ID) равен -1, а в столбцах строк вставьте «Неизвестно», а в остальных типах данных установите значение «null», например:

Я создал этот код:

id_column = "Product_ID"
df_lessOne = spark.createDataFrame(["-1"], "string").toDF(id_column) #create a new id_column row with -1

appended_df = finalDf.unionByName(df_lessOne, allowMissingColumns=True) #add the rest columns of dataframe with nulls

appended_df_filter = appended_df.filter(""+ id_column + " = '-1'")

columns = [item[0] for item in appended_df_filter.dtypes if item[1].startswith('string')] #select only string columns

# replace string columns with "Unknown" 
for c_na in columns:
    appended_df_filter = (appended_df_filter
                               .filter(""+ id_column + " = '-1'")
                               .withColumn(c_na, lit('Unknown'))
                         )
                          
appended_df = appended_df.filter(""+ id_column + " <> '-1'")

dfs = [appended_df, appended_df_filter]

#add final -1 row to the final dataframe
finalDf = reduce(DataFrame.unionAll, dfs)

display(finalDf)

но, к сожалению, это плохо работает.

Я пытаюсь создать это динамически, потому что после того, как я хочу использовать его в других фреймах данных. Мне просто нужно изменить id_column после.

Может ли кто-нибудь помочь мне в достижении этого

Спасибо!

Как установить LAMP Stack - Security 5/5 на виртуальную машину Azure Linux VM
Как установить LAMP Stack - Security 5/5 на виртуальную машину Azure Linux VM
В предыдущей статье мы завершили установку базы данных, для тех, кто не знает.
Как установить LAMP Stack 1/2 на Azure Linux VM
Как установить LAMP Stack 1/2 на Azure Linux VM
В дополнение к нашему предыдущему сообщению о намерении Azure прекратить поддержку Azure Database для MySQL в качестве единого сервера после 16...
0
0
52
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий
from pyspark.sql.types import *
from datetime import datetime
import pyspark.sql.functions as F

data2 = [
    ("xp3980","2103","Product_1",1,"PdLine_23","XX1","PNT_1",2,36636,datetime.strptime('2020-08-20 10:00:00', '%Y-%m-%d %H:%M:%S')),
    ("gi9387","2411","Product_2",1,"PdLine_44","YT89","PNT_6",2,35847,datetime.strptime('2021-07-21 7:00:00', '%Y-%m-%d %H:%M:%S'))
  ]

schema = StructType([ \
    StructField("Product_ID",StringType(),True), \
    StructField("Product_COD",StringType(),True), \
    StructField("Product_NAM",StringType(),True), \
    StructField("Product_VER", IntegerType(),True), \
    StructField("ProductLine_NAM", StringType(), True), \
    StructField("Language_COD", StringType(), True), \
    StructField("ProductType_NAM", StringType(), True), \
    StructField("Load_DAT", IntegerType(), True), \
    StructField("LoadEnd_DAT", IntegerType(), True), \
    StructField("edmChange_DTT", TimestampType(), True) \
  ])
 
my_df = spark.createDataFrame(data=data2,schema=schema)

df_res = spark.createDataFrame([(-1,)]).toDF("Product_ID")

for c in my_df.schema:
    if str(c.name) == 'Product_ID':
        continue
    if str(c.dataType) == 'StringType':
        df_res = df_res.withColumn(c.name, F.lit('Unknown'))
    else:
        df_res = df_res.withColumn(c.name, F.lit(None))

my_df.union(df_res).show()

+----------+-----------+-----------+-----------+---------------+------------+---------------+--------+-----------+-------------------+
# |Product_ID|Product_COD|Product_NAM|Product_VER|ProductLine_NAM|Language_COD|ProductType_NAM|Load_DAT|LoadEnd_DAT|      edmChange_DTT|
# +----------+-----------+-----------+-----------+---------------+------------+---------------+--------+-----------+-------------------+
# |    xp3980|       2103|  Product_1|          1|      PdLine_23|         XX1|          PNT_1|       2|      36636|2020-08-20 10:00:00|
# |    gi9387|       2411|  Product_2|          1|      PdLine_44|        YT89|          PNT_6|       2|      35847|2021-07-21 07:00:00|
# |        -1|    Unknown|    Unknown|       null|        Unknown|     Unknown|        Unknown|    null|       null|               null|
# +----------+-----------+-----------+-----------+---------------+------------+---------------+--------+-----------+-------------------+

Большое спасибо Луис, это работает хорошо :) !!!!

coding 18.11.2022 16:13

Другие вопросы по теме

Похожие вопросы

Каков отраслевой стандарт метода дедупликации в потоках данных?
Как снова использовать вновь созданный столбец в преобразовании производного столбца в том же преобразовании производного столбца?
Можно ли запустить Node-Red в докере на виртуальной машине EFLOW (Azure IoT Edge на устройстве Windows)?
Как проверить, пусты ли файлы в каталоге озера данных с помощью Фабрики данных Azure?
Azure Databricks: непредвиденный сбой при ожидании готовности кластера. Причина Кластер непригоден для использования, так как драйвер неисправен
Напишите сценарий PowerShell с помощью модуля Runbook, чтобы сделать моментальный снимок виртуальной машины: Запуск от имени учетной записи: Azure
Как добавить журналы в аналитику журналов в приложениях логики Azure?
Локальный репозиторий helm не обновляется из реестра контейнеров Azure
Поставщик учетных данных клиента Azure выдает ошибку "/me request действителен только с делегированным потоком проверки подлинности"
Хранилище BLOB-объектов Azure с Python, создавать контейнеры, но не перечислять их?