предположим, что у нас есть следующая структура json:
{
"positions": {
"node": "abc"
}
"submissions" :{
"submissionOffsets":[
{
"attributeName": "sample1",
"attributeValue": 1224
},
{
"attributeName": "sample2",
"attributeValue": 1224
},
{
"attributeName": "sample3",
"attributeValue": 1224
},
{
"attributeName": "sample4",
"attributeValue": 1224
}
}
}
и мы хотим прочитать «submissionOffsets» и извлечь attributeName и attributeValue на основе имени атрибута, например «sample1», и ожидаемая структура должна быть в случае Match
{
"positions": {
"node": "abc"
}
"submissions" :{
"submissionOffsets":[
{
"attributeName": "sample1",
"attributeValue": 1224
},
{
"attributeName": "sample2",
"attributeValue": 1224
},
{
"attributeName": "sample3",
"attributeValue": 1224
},
{
"attributeName": "sample4",
"attributeValue": 1224
}
},
"attributeName": "sample1",
"attributeValue": 1224
}
**Incase of No Match**
{
"positions": {
"node": "abc"
}
"submissions" :{
"submissionOffsets":[
{
"attributeName": "sample1",
"attributeValue": 1224
},
{
"attributeName": "sample2",
"attributeValue": 1224
},
{
"attributeName": "sample3",
"attributeValue": 1224
},
{
"attributeName": "sample4",
"attributeValue": 1224
}
},
"attributeName": null,
"attributeValue": 0.00
}
Это должно быть сделано в кадрах данных
Я пытался с фреймами данных, я взорвал submits.submissionOffsets , затем проверил имя и значение атрибута, но это дало один столбец, и я должен присоединить его к исходному фрейму данных.
Если какое-либо свойство json содержит null
, искра будет удалена и не будет доступна в вашем конечном результате.
filter
функция высшего порядка для фильтрации определенного атрибута из вложенного массива json.
inline
или inline_outer
- чтобы взорвать значения массива.
Ниже приведен пример кода
scala> val df = spark
.read
.option("multiLine", "true")
.json(Seq(data).toDS)
df: org.apache.spark.sql.DataFrame = [positions: struct<node: string>, submissions: struct<submissionOffsets: array<struct<attributeName:string,attributeValue:bigint>>>]
scala> df.show(false)
+---------+----------------------------------------------------------------------+
|positions|submissions |
+---------+----------------------------------------------------------------------+
|{abc} |{[{sample1, 1224}, {sample2, 1224}, {sample3, 1224}, {sample4, 1224}]}|
+---------+----------------------------------------------------------------------+
scala> df.printSchema
root
|-- positions: struct (nullable = true)
| |-- node: string (nullable = true)
|-- submissions: struct (nullable = true)
| |-- submissionOffsets: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- attributeName: string (nullable = true)
| | | |-- attributeValue: long (nullable = true)
scala>
// filter higher order function to filter nested json array values / attributes
// inline_outer is to explode array values
df
.selectExpr(
"*", // to select all columns from the dataset / dataframe
"inline_outer(filter(submissions.submissionOffsets, i -> i.attributeName == 'sample1')) as (attributeName, attributeValue)"
)
.toJSON.show(false)
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|value |
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|{"positions":{"node":"abc"},"submissions":{"submissionOffsets":[{"attributeName":"sample1","attributeValue":1224},{"attributeName":"sample2","attributeValue":1224},{"attributeName":"sample3","attributeValue":1224},{"attributeName":"sample4","attributeValue":1224}]},"attributeName":"sample1","attributeValue":1224}|
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
Спасибо, это работает, но можем ли мы внести некоторые изменения, чтобы он возвращал остальные наборы данных в случае совпадения?
Я не понял тебя. не могли бы вы обновить свой вопрос с ожидаемым результатом в случае несоответствия?
Давайте предположим, если я изменю критерии фильтра на sample5 df .selectExpr("*", // для выбора всех столбцов из набора данных/dataframe "inline(filter(submissions.submissionOffsets, i -> i.attributeName == 'sample5')) as (attributeName, attributeValue)" ) Затем я ожидаю, что значения возвращенных позиций и затем представлений, а затем attrbuteName и attrbuteValue будут пустыми, но позиции и представления должны иметь такие значения, как есть
понятно. В случае, если фильтр не соответствует, какие будут значения по умолчанию для attributeName
и attributeValue
?
обновленный код. добавлена функция inline_outer
для сохранения структуры в случае несоответствия фильтра.
Спасибо, это работает, как мы можем обновить значения как 0 вместо нуля
хорошо, если получится. не могли бы вы принять решение и отметить его как правильный ответ?
да, конечно, можете ли вы помочь мне заполнить 0 вместо нуля в attributeName и attributeValue
конечно, не могли бы вы обновить свой вопрос с помощью приведенного выше варианта использования?
вы имеете в виду новый вопрос?
нет, в этом вопросе только ниже вы можете добавить вышеуказанный вариант использования.
Обновленный вопрос
Вы забыли опубликовать свою попытку решить эту проблему.