Есть ли способ в pyspark скопировать одну схему в другой фрейм данных?

У меня есть фрейм данных искры (df1) с определенной схемой, и у меня есть другой фрейм данных с теми же столбцами, но с другой схемой. Я знаю, как сделать это столбец за столбцом, но поскольку у меня большой набор столбцов, это будет довольно долго. Чтобы сохранить согласованность схемы между фреймами данных, мне было интересно, смогу ли я применить одну схему к фрейму данных или создать функцию, которая выполняет эту работу. вот пример

df1
root
|-- A: date (nullable = true)
 |-- B: integer (nullable = true)
 |-- C: string (nullable = true)

df2
root
 |-- A: string (nullable = true)
 |-- B: string (nullable = true)
 |-- C: string (nullable = true)`

Я хочу скопировать применить схему df1 к df2. Буду признателен за любые комментарии и указания.

Я пробую этот подход для одного столбца. Учитывая, что у меня большое количество столбцов, это был бы довольно долгий способ сделать это.

df2 = df2.withColumn("B", df2["B"].cast('int'))

Почему в Python есть оператор "pass"?
Почему в Python есть оператор "pass"?
Оператор pass в Python - это простая концепция, которую могут быстро освоить даже новички без опыта программирования.
Некоторые методы, о которых вы не знали, что они существуют в Python
Некоторые методы, о которых вы не знали, что они существуют в Python
Python - самый известный и самый простой в изучении язык в наши дни. Имея широкий спектр применения в области машинного обучения, Data Science,...
Основы Python Часть I
Основы Python Часть I
Вы когда-нибудь задумывались, почему в программах на Python вы видите приведенный ниже код?
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
Алиса и Боб имеют неориентированный граф из n узлов и трех типов ребер:
Оптимизация кода с помощью тернарного оператора Python
Оптимизация кода с помощью тернарного оператора Python
И последнее, что мы хотели бы показать вам, прежде чем двигаться дальше, это
Советы по эффективной веб-разработке с помощью Python
Советы по эффективной веб-разработке с помощью Python
Как веб-разработчик, Python может стать мощным инструментом для создания эффективных и масштабируемых веб-приложений.
2
0
89
2
Перейти к ответу Данный вопрос помечен как решенный

Ответы 2

Попробуй это -

Входные кадры данных

from pyspark.sql.functions import *
from pyspark.sql.types import *

from datetime import datetime

data1 = [("2022-01-01", 1, "A"),
         ("2022-01-02", 2, "B"),
         ("2022-01-03", 3, "C")
        ]

data1 = [(datetime.strptime(date_str, "%Y-%m-%d"), b, c) for date_str, b, c in data1]

schema1 = StructType([StructField("A", DateType(), True),
                      StructField("B", IntegerType(), True),
                      StructField("C", StringType(), True)
                     ]
                    )

df1 = spark.createDataFrame(data1, schema=schema1)

df1.printSchema()

data2 = [("2022-01-04", "4", "D"),
         ("2022-01-05", "5", "E"),
         ("2022-01-06", "6", "F")
        ]
schema2 = StructType([StructField("A", StringType(), True),
                      StructField("B", StringType(), True),
                      StructField("C", StringType(), True)
                     ]
                    )
df2 = spark.createDataFrame(data2, schema=schema2)

df2.printSchema()
df2 = spark.createDataFrame(data=df2.rdd,schema=df1.schema)
df2.printSchema()

root
 |-- A: date (nullable = true)
 |-- B: integer (nullable = true)
 |-- C: string (nullable = true)

В качестве альтернативы вы создаете метод, вам нужно более общее решение -

def apply_schema(df1, df2):
    schema1 = df1.schema
    
    schema2 = df2.schema
    
    data_types = {field.name: field.dataType for field in schema1.fields}
    
    for field in schema2.fields:
        column_name = field.name
        
        if column_name in data_types:
            column_type = data_types[column_name]
            df2 = df2.withColumn(column_name, df2[column_name].cast(column_type))
    
    return df2

И используйте этот метод, чтобы наложить схему df1 на df2 -

df2 = apply_schema(df1, df2)

print("Schema of df1:")
df1.printSchema()

print("Schema of df2:")
df2.printSchema()

df2.show()

Schema of df1:
root
 |-- A: date (nullable = true)
 |-- B: integer (nullable = true)
 |-- C: string (nullable = true)

Schema of df2:
root
 |-- A: date (nullable = true)
 |-- B: integer (nullable = true)
 |-- C: string (nullable = true)

+----------+---+---+
|         A|  B|  C|
+----------+---+---+
|2022-01-04|  4|  D|
|2022-01-05|  5|  E|
|2022-01-06|  6|  F|
+----------+---+---+

Метод df2 = spark.createDataFrame(data=df2.rdd,schema=df1.schema) не работает для преобразования типов string в integer,long,date..!

notNull 19.04.2023 04:22

Я проверил запрошенный сценарий, и он работает для меня.

Dipanjan Mallick 19.04.2023 04:24

о.. какую версию искры вы пробовали? Пробовал 3.2.1 получил ошибку Caused by: org.apache.spark.api.python.PythonException: 'TypeError: field B: LongType can not accept object '2' in type <class 'str'>'

notNull 19.04.2023 04:30

Я тестировал в искре 3.3.0 и 3.1.2 работал в обеих версиях. Даже работал с входными кадрами данных, которые являются частью вашего ответа. :)

Dipanjan Mallick 19.04.2023 04:39

Странно, не уверен, есть ли какие-то проблемы с моей настройкой... Спасибо за тест :-)

notNull 19.04.2023 04:42

оба метода работали для меня.

Harris 19.04.2023 06:41
Ответ принят как подходящий

Да, это возможно динамически с помощью dataframe.schema.fields

df2.select(*[(col(x.name).cast(x.dataType)) for x in df1.schema.fields])

Example:

from pyspark.sql.functions import *
df1 = spark.createDataFrame([('2022-02-02',2,'a')],['A','B','C']).withColumn("A",to_date(col("A")))
print("df1 Schema")
df1.printSchema()
#df1 Schema
#root
# |-- A: date (nullable = true)
# |-- B: long (nullable = true)
# |-- C: string (nullable = true)

df2 = spark.createDataFrame([('2022-02-02','2','a')],['A','B','C'])
print("df2 Schema")
df2.printSchema()
#df2 Schema
#root
# |-- A: string (nullable = true)
# |-- B: string (nullable = true)
# |-- C: string (nullable = true)
#

#casting the df2 columns by getting df1 schema using select clause
df3 = df2.select(*[(col(x.name).cast(x.dataType)) for x in df1.schema.fields])
df3.show(10,False)
print("df3 Schema")
df3.printSchema()

#+----------+---+---+
#|A         |B  |C  |
#+----------+---+---+
#|2022-02-02|2  |a  |
#+----------+---+---+

#df3 Schema
#root
# |-- A: date (nullable = true)
# |-- B: long (nullable = true)
# |-- C: string (nullable = true)

В этом примере я определил df1 с помощью Integer,date,long types.

df2 определяется с помощью string типа.

df3 определяется с использованием df2 в качестве исходных данных и прикрепленного df1 schema.

Другие вопросы по теме