У меня есть многоуровневый вложенный фрейм данных, как показано ниже:
DataFrame[date_time: timestamp, filename: string, label: string, description: string,
feature_set: array<struct<direction:string,tStart:double,tEnd:double,
features:array<struct<field1:string,field2:string,field3:string,field4:string>>>>]
и его значения:
[[datetime.datetime(2022, 8, 24, 7, 51, 54), 'filename1', 'label1', 'description of file 1', [['east', 78.23018987, 79.23010199, [['fld_val11', 'fld_val12', 'fld_Val13', 'fld_Val14']]], ['west', 78.23018987, 79.23010199, [['fld_val21', 'fld_val22', 'fld_val23', 'fld_val24']]], ['south', 78.23018987, 79.23010199, [['fld_val31', 'fld_val32', 'fld_val33', 'fld_val34']]]
root
|-- date_time: timestamp (nullable = true)
|-- filename: string (nullable = true)
|-- label: string (nullable = true)
|-- description: string (nullable = true)
|-- feature_set: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- direction: string (nullable = true)
| | |-- tStart: double (nullable = true)
| | |-- tEnd: double (nullable = true)
| | |-- features: array (nullable = true)
| | | |-- element: struct (containsNull = true)
| | | | |-- field1: string (nullable = true)
| | | | |-- field2: string (nullable = true)
| | | | |-- field3: string (nullable = true)
| | | | |-- field4:string (nullable = true)
Я пытаюсь сгладить его таким образом, чтобы он выглядел следующим образом:
-------------------+--------------------+--------------------+--------------------+--------------------+
| date_time| filename| label| description| feature_set_direction| feature_set_tStart| feature_set_tEnd| feature_set_features_Field1| feature_set_features_Field2| feature_set_features_Field3| feature_set_features_Field4|
+-------------------+--------------------+--------------------+--------------------+--------------------+
|2022-08-24 13:47:47|filename1|label1| description of file 1|east| 78.230189787|79.23010199| fld_val11| fld_val12| fld_Val13| fld_Val14|
+-------------------+--------------------+--------------------+--------------------+--------------------+
root
|-- date_time: timestamp (nullable = true)
|-- filename: string (nullable = true)
|-- label: string (nullable = true)
|-- description: string (nullable = true)
|-- feature_set: array (nullable = true)
|-- feature_set_element: struct (containsNull = true)
|-- feature_set_element_direction: string (nullable = true)
|-- feature_set_element_tStart: double (nullable = true)
|-- feature_set_element_tEnd: double (nullable = true)
|-- feature_set_element_features: array (nullable = true)
|-- feature_set_element_features_element: struct (containsNull = true)
|-- feature_set_element_features_element_field1: string (nullable = true)
|-- feature_set_element_features_element_field2: string (nullable = true)
|-- feature_set_element_features_element_field3: string (nullable = true)
|-- feature_set_element_features_element_field4:string (nullable = true)
Я попытался сгладить его с помощью приведенного ниже кода, но его ошибка:
flat_df = df.select("date_time", "filename", "label", "description", "feature_set.*")
AnalysisException: может расширять только типы структурных данных. Атрибут:
ArrayBuffer(feature_set).
Я также пробовал использовать несколько других методов, таких как val, но безуспешно.
val df2 = df.select(col("date_time"),
col("filename"),
col("label"),
col("description"),
col("feature_set"))
SyntaxError: неверный синтаксис (, строка 1) Файл: 1 val df2 = df.select(col("date_time")
Может ли кто-нибудь предложить, как продолжить?
Спасибо.






Поскольку вы работаете с массивами, вы не можете просто выбрать подстолбцы: сначала вам нужно будет использовать explode.
Этот блок кода предназначен только для воссоздания ваших данных (вам не нужно этого делать):
import datetime
from pyspark.sql.types import *
schema = StructType([
StructField("date_time", TimestampType(), True),
StructField("filename", StringType(), True),
StructField("label", StringType(), True),
StructField("description", StringType(), True),
StructField("feature_set",
ArrayType(
StructType([
StructField("direction", StringType(), True),
StructField("tStart", DoubleType(), True),
StructField("tEnd", DoubleType(), True),
StructField("features",
ArrayType(
StructType([
StructField("field1", StringType(), True),
StructField("field2", StringType(), True),
StructField("field3", StringType(), True),
StructField("field4", StringType(), True),
])
), True),
])
)
)
])
data = [ ( datetime.datetime(2022, 8, 24, 7, 51, 54), 'filename1', 'label1', 'description of file 1', [ ( 'east', 78.23018987, 79.23010199, [ ( 'fld_val11', 'fld_val12', 'fld_Val13', 'fld_Val14') ]), ( 'west', 78.23018987, 79.23010199, [ ( 'fld_val21', 'fld_val22', 'fld_val23', 'fld_val24') ]), ( 'south', 78.23018987, 79.23010199, [ ( 'fld_val31', 'fld_val32', 'fld_val33', 'fld_val34') ]) ]) ]
df = spark.createDataFrame(data, schema)
Теперь, когда у нас есть данные, нам нужно использовать комбинацию select и explode, чтобы получить нужную схему:
from pyspark.sql import functions as F
flat_df = df \
.withColumn("feature_set", F.explode("feature_set")) \
.select("date_time", "filename", "label", "description", "feature_set.*") \
.withColumn("features", F.explode("features")) \
.select("date_time", "filename", "label", "description", "direction", "tStart", "tEnd", "features.*")
Итак, как вы видите, мы explode создаем каждый столбец массива и затем разворачиваем их. Таким образом, вы говорите Spark создать строку в результирующем фрейме данных для каждого элемента в имеющихся у вас массивах.
Выходная схема не будет точно такой же, как то, что вы запрашиваете (например, массива feature_set больше нет), но это делает более компактную версию без дублирования данных. Результирующая схема и фрейм данных выглядят следующим образом:
>>> flat_df.printSchema()
root
|-- date_time: timestamp (nullable = true)
|-- filename: string (nullable = true)
|-- label: string (nullable = true)
|-- description: string (nullable = true)
|-- direction: string (nullable = true)
|-- tStart: double (nullable = true)
|-- tEnd: double (nullable = true)
|-- field1: string (nullable = true)
|-- field2: string (nullable = true)
|-- field3: string (nullable = true)
|-- field4: string (nullable = true)
>>> flat_df.show()
+-------------------+---------+------+--------------------+---------+-----------+-----------+---------+---------+---------+---------+
| date_time| filename| label| description|direction| tStart| tEnd| field1| field2| field3| field4|
+-------------------+---------+------+--------------------+---------+-----------+-----------+---------+---------+---------+---------+
|2022-08-24 07:51:54|filename1|label1|description of fi...| east|78.23018987|79.23010199|fld_val11|fld_val12|fld_Val13|fld_Val14|
|2022-08-24 07:51:54|filename1|label1|description of fi...| west|78.23018987|79.23010199|fld_val21|fld_val22|fld_val23|fld_val24|
|2022-08-24 07:51:54|filename1|label1|description of fi...| south|78.23018987|79.23010199|fld_val31|fld_val32|fld_val33|fld_val34|
+-------------------+---------+------+--------------------+---------+-----------+-----------+---------+---------+---------+---------+
Спасибо за ответ. В то время как он очень хорошо расширяет последний уровень значений (Field1, Field2, Field3, field4), он игнорирует более высокие уровни (tStart, tEnd, Filename, Labels и т. д.). Не уверен, что сделал какую-то ошибку. Но если у вас есть какие-либо мысли, пожалуйста, дайте мне знать
+------+--+-----+---------+---------+ -------------------+--------+------------------ -+--------------------+-------+ ------+--------------------+---- ---------------+----+----------- ---------+ |Поле1| Поле2| Поле3|Поле4|
Можете ли вы отредактировать свой вопрос и добавить несколько примеров входных данных? Он может быть в формате json/csv/в любом другом формате, который вы используете. Тогда я обязательно дам вам полностью работоспособное решение :)
Я пытался обновить. Пожалуйста, дайте мне знать, если это нормально.
Я адаптировал свой ответ, дайте мне знать, если это то, что вы ищете!
Когда у вас есть столбец типа array (например, feature_set: array), вы должны взорвать его перед выполнением «feature_set.*»