Разбор столбца фрейма данных Pyspark строки JSON, который имеет строку массива в одном из столбцов

Я пытаюсь прочитать файл JSON и проанализировать «jsonString» и базовые поля, которые включают массив, в фрейм данных pyspark.

Вот содержимое файла json.

[{"jsonString": "{\"uid\":\"value1\",\"adUsername\":\"value3\",\"courseCertifications\":[{\"uid\":\"value2\",\"courseType\":\"TRAINING\"},{\"uid\":\"TEST\",\"courseType\":\"TRAINING\"}],\"modifiedBy\":\"value4\"}","transactionId": "value5", "tableName": "X"},
 {"jsonString": "{\"uid\":\"value11\",\"adUsername\":\"value13\",\"modifiedBy\":\"value14\"}","transactionId": "value15", "tableName": "X1"},
 {"jsonString": "{\"uid\":\"value21\",\"adUsername\":\"value23\",\"modifiedBy\":\"value24\"}","transactionId": "value25", "tableName": "X2"}]

Я могу проанализировать содержимое строки «jsonString» и выбрать необходимые столбцы, используя приведенную ниже логику.

df = spark.read.json('path.json',multiLine=True)
df = df.withColumn('courseCertifications', explode(array(get_json_object(df['jsonString'],'$.courseCertifications'))))

Теперь моя конечная цель — проанализировать поле «courseType» из «courseCertifications» и создать одну строку для каждого экземпляра.

Я использую логику ниже, чтобы получить «courseType»

df = df.withColumn('new',get_json_object(df.courseCertifications, '$[*].courseType'))

Я могу получить содержимое «courseType», но в виде строки, как показано ниже.

[Row(new=u'["TRAINING","TRAINING"]')]

Моя конечная цель - создать фрейм данных со столбцами transactionId, jsonString.uid, jsonString.adUsername, jsonString.courseCertifications.uid, jsonString.courseCertifications.courseType

  • Мне нужно сохранить все строки и создать несколько строк по одной для экземпляров массива courseCertifications.uid/courseCertifications.courseType.
Структурированный массив Numpy
Структурированный массив Numpy
Однако в реальных проектах я чаще всего имею дело со списками, состоящими из нескольких типов данных. Как мы можем использовать массивы numpy, чтобы...
T - 1Bits: Генерация последовательного массива
T - 1Bits: Генерация последовательного массива
По мере того, как мы пишем все больше кода, мы привыкаем к определенным способам действий. То тут, то там мы находим код, который заставляет нас...
Что такое деструктуризация массива в JavaScript?
Что такое деструктуризация массива в JavaScript?
Деструктуризация позволяет распаковывать значения из массивов и добавлять их в отдельные переменные.
0
0
754
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Элегантный способ решить ваш вопрос — создать схему строки json, а затем проанализировать ее с помощью функции from_json.

import pyspark.sql.functions as f
from pyspark.shell import spark
from pyspark.sql.types import ArrayType, StringType, StructType, StructField

df = spark.read.json('your_path', multiLine=True)
schema = StructType([
    StructField('uid', StringType()),
    StructField('adUsername', StringType()),
    StructField('modifiedBy', StringType()),
    StructField('courseCertifications', ArrayType(
        StructType([
            StructField('uid', StringType()),
            StructField('courseType', StringType())
        ])
    ))
])

df = df \
    .withColumn('tmp', f.from_json(df.jsonString, schema)) \
    .withColumn('adUsername', f.col('tmp').adUsername) \
    .withColumn('uid', f.col('tmp').uid) \
    .withColumn('modifiedBy', f.col('tmp').modifiedBy) \
    .withColumn('tmp', f.explode(f.col('tmp').courseCertifications)) \
    .withColumn('course_uid', f.col('tmp').uid) \
    .withColumn('course_type', f.col('tmp').courseType) \
    .drop('jsonString', 'tmp')
df.show()

Выход:

+-------------+------+----------+----------+----------+-----------+
|transactionId|uid   |adUsername|modifiedBy|course_uid|course_type|
+-------------+------+----------+----------+----------+-----------+
|value5       |value1|value3    |value4    |value2    |TRAINING   |
|value5       |value1|value3    |value4    |TEST      |TRAINING   |
+-------------+------+----------+----------+----------+-----------+

Спасибо @Kafels! Это работает, вместо настраиваемой схемы я делаю следующее, чтобы получить структуру jsonString, поскольку она может содержать разные столбцы в зависимости от файла JSON. json_schema = spark.read.json(df.rdd.map(lambda row: row.jsonString)).schema df = df.withColumn('jsonString', from_json(df['jsonString'], json_schema)) Пожалуйста, поделитесь своими мыслями.

Gopal 03.06.2019 17:28

Код вашего предложения очень хорош, потому что нет необходимости отображать карту, в схеме всегда есть новый или удаленный столбец из файла JSON.

Kafels 03.06.2019 19:18

Спасибо @Kafels, я заметил, что сохраняются только записи, имеющие поле «courseCertifications», а все остальные записи удаляются. Мое требование состоит в том, чтобы получить все записи и заполнить NULL, если поле «courseCertifications» отсутствует для какой-либо строки.

Gopal 04.06.2019 01:47

Отредактируйте JSON в своем вопросе и добавьте больше значений для отладки.

Kafels 04.06.2019 02:26

обновлен вопрос с большим количеством значений, а также добавлена ​​дополнительная информация о o/p Dataframe.

Gopal 04.06.2019 04:48

Другие вопросы по теме