У меня есть образец файла данных json, как показано ниже:
{"data_id":"1234","risk_characteristics":{"indicators":{"alcohol":true,"house":true,"business":true,"familyname":true,"swimming_pool":true}}}
{"data_id":"6789","risk_characteristics":{"indicators":{"alcohol":true,"house":true,"business":false,"familyname":true}}}
{"data_id":"5678","risk_characteristics":{"indicators":{"alcohol":false}}}
Я преобразовал файл json в паркет и загрузил его в улей, используя приведенный ниже код.
dataDF = spark.read.json("path/Datasmall.json")
dataDF.write.parquet("data.parquet")
parqFile = spark.read.parquet("data.parquet")
parqFile.write.saveAsTable("indicators_table", format='parquet', mode='append', path='/externalpath/indicators_table/')
from pyspark.sql import HiveContext
hive_context = HiveContext(sc)
fromHiveDF = hive_context.table("default.indicators_table")
fromHiveDF.show()
indicatorsDF = fromHiveDF.select('data_id', 'risk_characteristics.indicators')
indicatorsDF.printSchema()
root
|-- data_id: string (nullable = true)
|-- indicators: struct (nullable = true)
| |-- alcohol: boolean (nullable = true)
| |-- house: boolean (nullable = true)
| |-- business: boolean (nullable = true)
| |-- familyname: boolean (nullable = true)
indicatorsDF.show()
+-------+--------------------+
|data_id| indicators|
+-------+--------------------+
| 1234|[true, true, true...|
| 6789|[true, false, tru...|
| 5678| [false,,,,]|
+-------+--------------------+
Вместо того, чтобы извлекать данные как select data_id, индикаторы.алкоголь, индикаторы.дом и т. д., Я просто хочу получить файл данных паркета только с тремя столбцами ниже. То есть поля структуры преобразуются в строки под именем столбца индикаторов_типа.
data_id indicators_type indicators_value
1234 alcohol T
1234 house T
1234 business T
1234 familyname T
1234 swimming_ppol T
6789 alcohol T
6789 house F
6789 business T
6789 familyname F
5678 alcohol F
Могу я спросить, как это сделать. Я пытаюсь сделать это с помощью pyspark. Также есть ли способ добиться этого без жесткого кодирования буквальных деталей. В моих реальных данных данные структуры могут выходить за пределы фамилии, и их может быть даже 100.
Большое спасибо
@Srinivas я добавил образцы данных
Используйте stack
, чтобы сложить столбцы:
df.show()
+-------+--------------------------+
|data_id|indicators |
+-------+--------------------------+
|1234 |[true, true, false, true] |
|6789 |[true, false, true, false]|
+-------+--------------------------+
stack_expr = 'stack(' + str(len(df.select('indicators.*').columns)) + ', ' + ', '.join(["'%s', indicators.%s" % (col,col) for col in df.select('indicators.*').columns]) + ') as (indicators_type, indicators_value)'
df2 = df.selectExpr(
'data_id',
stack_expr
)
df2.show()
+-------+---------------+----------------+
|data_id|indicators_type|indicators_value|
+-------+---------------+----------------+
| 1234| alcohol| true|
| 1234| house| true|
| 1234| business| false|
| 1234| familyname| true|
| 6789| alcohol| true|
| 6789| house| false|
| 6789| business| true|
| 6789| familyname| false|
+-------+---------------+----------------+
Есть ли способ добиться этого без жесткого кодирования буквальных деталей. В моих реальных данных данные структуры могут выходить за пределы фамилии, и их может быть даже 100.
решение отлично работает с примерами входных данных. Однако мой базовый файл имеет формат паркета (я преобразовал исходные данные json в формат паркета). Когда я применяю один и тот же код к базовым данным паркета, я получаю следующую ошибку: стек(39, 'алкоголь', indicators
.alcohol
, ....................]паркет\n"
я обновил описание с более подробной информацией о входных данных
решение действительно работает. я получал ошибку из-за несоответствия типа в одном из элементов структуры. Вместо того, чтобы все элементы структуры были логическими, один из них был int. Есть ли способ установить индикаторы_значения как строковый тип как часть команды стека?
@ sabra2121 Наверное да, попробуйте заменить indicators.%s
на cast(indicators.%s as string)
работает отлично .. Большое спасибо @mck! размещение обновленного выражения: stack_expr = 'stack(' + str(len(indDF.select('indicators.*').columns)) + ', ' + ', '.join(["'%s', cast( индикаторы.%s как строка)" % (столбец,столбец) для столбца в indDF.select('indicators.*').columns]) + ') as (indicators_type, индикаторы_значение)'
Другое решение с использованием взрыва:
val df = spark.sql(""" with t1(
select 1234 data_id, named_struct('alcohol',true, 'house',false, 'business', true, 'familyname', false) as indicators
union
select 6789 data_id, named_struct('alcohol',true, 'house',false, 'business', true, 'familyname', false) as indicators
)
select * from t1
""")
df.show(false)
df.printSchema
+-------+--------------------------+
|data_id|indicators |
+-------+--------------------------+
|6789 |[true, false, true, false]|
|1234 |[true, false, true, false]|
+-------+--------------------------+
root
|-- data_id: integer (nullable = false)
|-- indicators: struct (nullable = false)
| |-- alcohol: boolean (nullable = false)
| |-- house: boolean (nullable = false)
| |-- business: boolean (nullable = false)
| |-- familyname: boolean (nullable = false)
val df2 = df.withColumn("x", explode(array(
map(lit("alcohol") ,col("indicators.alcohol")),
map(lit("house"), col("indicators.house")),
map(lit("business"), col("indicators.business")),
map(lit("familyname"), col("indicators.familyname"))
)))
df2.select(col("data_id"),map_keys(col("x"))(0), map_values(col("x"))(0)).show
+-------+--------------+----------------+
|data_id|map_keys(x)[0]|map_values(x)[0]|
+-------+--------------+----------------+
| 6789| alcohol| true|
| 6789| house| false|
| 6789| business| true|
| 6789| familyname| false|
| 1234| alcohol| true|
| 1234| house| false|
| 1234| business| true|
| 1234| familyname| false|
+-------+--------------+----------------+
Обновление-1:
Чтобы динамически получать столбцы структуры индикаторов, используйте описанный ниже подход.
val colsx = df.select("indicators.*").columns
colsx: Array[String] = Array(alcohol, house, business, familyname)
val exp1 = colsx.map( x => s""" map("${x}", indicators.${x}) """ ).mkString(",")
val exp2 = " explode(array( " + exp1 + " )) "
val df2 = df.withColumn("x",expr(exp2))
df2.select(col("data_id"),map_keys(col("x"))(0).as("indicator_key"), map_values(col("x"))(0).as("indicator_value")).show
+-------+-------------+---------------+
|data_id|indicator_key|indicator_value|
+-------+-------------+---------------+
| 6789| alcohol| true|
| 6789| house| false|
| 6789| business| true|
| 6789| familyname| false|
| 1234| alcohol| true|
| 1234| house| false|
| 1234| business| true|
| 1234| familyname| false|
+-------+-------------+---------------+
Это помогает, но есть ли способ добиться этого без жесткого кодирования буквальных деталей. В моих реальных данных данные структуры могут выходить за пределы фамилии, и их может быть даже 100. Я добавлю этот пункт также в основной раздел
Вы можете проверить update1 в ответе.
опубликовать образцы данных?