У меня есть фрейм данных искры (df1) с определенной схемой, и у меня есть другой фрейм данных с теми же столбцами, но с другой схемой. Я знаю, как сделать это столбец за столбцом, но поскольку у меня большой набор столбцов, это будет довольно долго. Чтобы сохранить согласованность схемы между фреймами данных, мне было интересно, смогу ли я применить одну схему к фрейму данных или создать функцию, которая выполняет эту работу. вот пример
df1
root
|-- A: date (nullable = true)
|-- B: integer (nullable = true)
|-- C: string (nullable = true)
df2
root
|-- A: string (nullable = true)
|-- B: string (nullable = true)
|-- C: string (nullable = true)`
Я хочу скопировать применить схему df1 к df2. Буду признателен за любые комментарии и указания.
Я пробую этот подход для одного столбца. Учитывая, что у меня большое количество столбцов, это был бы довольно долгий способ сделать это.
df2 = df2.withColumn("B", df2["B"].cast('int'))
Попробуй это -
Входные кадры данных
from pyspark.sql.functions import *
from pyspark.sql.types import *
from datetime import datetime
data1 = [("2022-01-01", 1, "A"),
("2022-01-02", 2, "B"),
("2022-01-03", 3, "C")
]
data1 = [(datetime.strptime(date_str, "%Y-%m-%d"), b, c) for date_str, b, c in data1]
schema1 = StructType([StructField("A", DateType(), True),
StructField("B", IntegerType(), True),
StructField("C", StringType(), True)
]
)
df1 = spark.createDataFrame(data1, schema=schema1)
df1.printSchema()
data2 = [("2022-01-04", "4", "D"),
("2022-01-05", "5", "E"),
("2022-01-06", "6", "F")
]
schema2 = StructType([StructField("A", StringType(), True),
StructField("B", StringType(), True),
StructField("C", StringType(), True)
]
)
df2 = spark.createDataFrame(data2, schema=schema2)
df2.printSchema()
df2 = spark.createDataFrame(data=df2.rdd,schema=df1.schema)
df2.printSchema()
root
|-- A: date (nullable = true)
|-- B: integer (nullable = true)
|-- C: string (nullable = true)
В качестве альтернативы вы создаете метод, вам нужно более общее решение -
def apply_schema(df1, df2):
schema1 = df1.schema
schema2 = df2.schema
data_types = {field.name: field.dataType for field in schema1.fields}
for field in schema2.fields:
column_name = field.name
if column_name in data_types:
column_type = data_types[column_name]
df2 = df2.withColumn(column_name, df2[column_name].cast(column_type))
return df2
И используйте этот метод, чтобы наложить схему df1 на df2 -
df2 = apply_schema(df1, df2)
print("Schema of df1:")
df1.printSchema()
print("Schema of df2:")
df2.printSchema()
df2.show()
Schema of df1:
root
|-- A: date (nullable = true)
|-- B: integer (nullable = true)
|-- C: string (nullable = true)
Schema of df2:
root
|-- A: date (nullable = true)
|-- B: integer (nullable = true)
|-- C: string (nullable = true)
+----------+---+---+
| A| B| C|
+----------+---+---+
|2022-01-04| 4| D|
|2022-01-05| 5| E|
|2022-01-06| 6| F|
+----------+---+---+
Я проверил запрошенный сценарий, и он работает для меня.
о.. какую версию искры вы пробовали? Пробовал 3.2.1
получил ошибку Caused by: org.apache.spark.api.python.PythonException: 'TypeError: field B: LongType can not accept object '2' in type <class 'str'>'
Я тестировал в искре 3.3.0
и 3.1.2
работал в обеих версиях. Даже работал с входными кадрами данных, которые являются частью вашего ответа. :)
Странно, не уверен, есть ли какие-то проблемы с моей настройкой... Спасибо за тест :-)
оба метода работали для меня.
Да, это возможно динамически с помощью dataframe.schema.fields
df2.select(*[(col(x.name).cast(x.dataType)) for x in df1.schema.fields])
Example:
from pyspark.sql.functions import *
df1 = spark.createDataFrame([('2022-02-02',2,'a')],['A','B','C']).withColumn("A",to_date(col("A")))
print("df1 Schema")
df1.printSchema()
#df1 Schema
#root
# |-- A: date (nullable = true)
# |-- B: long (nullable = true)
# |-- C: string (nullable = true)
df2 = spark.createDataFrame([('2022-02-02','2','a')],['A','B','C'])
print("df2 Schema")
df2.printSchema()
#df2 Schema
#root
# |-- A: string (nullable = true)
# |-- B: string (nullable = true)
# |-- C: string (nullable = true)
#
#casting the df2 columns by getting df1 schema using select clause
df3 = df2.select(*[(col(x.name).cast(x.dataType)) for x in df1.schema.fields])
df3.show(10,False)
print("df3 Schema")
df3.printSchema()
#+----------+---+---+
#|A |B |C |
#+----------+---+---+
#|2022-02-02|2 |a |
#+----------+---+---+
#df3 Schema
#root
# |-- A: date (nullable = true)
# |-- B: long (nullable = true)
# |-- C: string (nullable = true)
В этом примере я определил df1 с помощью Integer,date,long types
.
df2
определяется с помощью string
типа.
df3
определяется с использованием df2
в качестве исходных данных и прикрепленного df1 schema
.
Метод
df2 = spark.createDataFrame(data=df2.rdd,schema=df1.schema)
не работает для преобразования типовstring
вinteger,long,date
..!