У меня есть некоторые потоковые данные, которые можно минимально уменьшить следующим образом:
{
"data":[
{
"key":1,
"val":"a"
},
{
"key":2,
"val":"b",
"test":"bla"
}
]
}
из которого мне нужно получить доступ к массиву "data"
, который представляет собой строку формата JSON. А точнее мне нужно найти в JSON поле "val"
, где "key"==2
.
До сих пор я пробовал:
Я знаю, что могу получить к нему доступ следующим образом:
F.get_json_object(...,"$.data[1].val")
но если JSON изменит порядок объектов в массиве data
, он больше не будет работать.
Для JSON я мог бы использовать:
F.get_json_object(...,"$.data[?(@.key==2)].val")
но, похоже, это не работает с Databricks.
Я попытался динамически создать структуру из строки JSON. Но «Запросы с потоковыми источниками должны выполняться с помощью writeStream.start()». Но я не хочу куда-либо записывать поток, так как я все еще нахожусь на препроцессинге. Или как я могу обойти это?
Я пытался определить только структуру массива, как показано здесь, но поскольку элементы массива имеют разную структуру, это не работает.
Я попытался написать пользовательскую функцию для доступа к объекту data
и содержащую строку JSON, которую затем разобрал бы следующим образом:
def parse_json(id,idName,keyName,jsonString):
from json import loads
data=loads(jsonString)
res=[d[keyName] for d in data if d[idName]==id]
return res[0]
и попытался вызвать его с помощью jsonString=F.col("data")
, где "data"
удерживает строку. Но это выдает мне ошибки, говоря, что он не находит атрибут, который я ввел в поле id
.
Можете ли вы попробовать все вышеперечисленное и сообщить мне, помогло ли это вам?
@DileepRajNarayanThumula, у меня это работает. пожалуйста, опубликуйте это как ответ.
@DuesserBeast Я добавил решение
Не могли бы вы рассказать подробнее о «различной структуре»? насколько сильно варьируется, вы всегда ожидаете, что как минимум key
и val
существует внутри data
?
@Эмма, конечно! Всегда будет поле key
, и я понимаю, что всегда будет поле val
, которое меня интересует. Может быть разное количество (в зависимости от ключа; т. е. для каждого ключа существует своя структура) других полей, которые - для целей вопроса - меня не интересуют. однако, если ответ является общим и может справиться с его захватом, это было бы еще лучше!
Я попробовал следующий подход:
import json
def get_val_for_key(json_str, key_val):
data = json.loads(json_str)
for item in data['data']:
if item.get('key') == key_val:
return item.get('val')
return None
get_val_for_key_udf = udf(get_val_for_key, StringType())
result_df = df.withColumn("desired_val", get_val_for_key_udf(F.col("json_column"), lit(2)))
display(result_df)
Результаты:
json_column desired_val
{"data":[{"key":1,"val":"a"},{"key":2,"val":"b","test":"bla"}]} b
В приведенном выше коде анализируется строка JSON, выполняется динамический поиск «val
», где «key
» == 2, и обрабатываются различные структуры JSON
или порядки ключей.
Я подожду с принятием пару дней на случай, если появится более естественный подход... я надеялся, что он будет. в противном случае ваш подход работает отлично!
@DuesserBaest Заранее спасибо.
Один из подходов — преобразовать строковый JSON в массив типа структуры, а затем получить нужное значение.
Несмотря на то, что структура варьируется, если у вас есть какая-то стабильная схема, вы можете построить схему, включающую в себя то, что вы хотите разложить.
Например, схема ниже будет анализировать поле test
, если оно существует, и вы получите NULL
, когда оно не существует (когда ключ = 1). Кроме того, если вас не интересует поле test
, вы можете опустить StructField
и test
игнорироваться.
schema = StructType([
StructField('data', ArrayType(StructType([
StructField("key", IntegerType()),
StructField("val", StringType()),
StructField("test", StringType()),
# add more field that you are interested in
])))
])
Используйте эту схему в from_json
, затем извлеките нужное поле.
df = (df.withColumn('data', F.from_json('data', schema))
.withColumn('data', F.filter(F.col('data').data, lambda x: x.key == 2)[0].val))
Если key = 2
нет, вы получите NULL
без сильного сбоя.
Это был именно тот ответ, который я искал. Что именно произойдет, если моя схема объявит элементы, которых нет? Эти элементы имеют ценность None
?
def get_val_for_key(json_str, key_val): data = json.loads(json_str) для элемента в данных['data']: if item.get('key') == key_val: return item.get('val') return None get_val_for_key_udf = udf(get_val_for_key, StringType()) result_df = df.withColumn("desired_val", get_val_for_key_udf(F.col("json_column"),lit(2)))