Анализ столбцов Databricks String (XML)

Я пытаюсь проанализировать строки XML, записанные в нескольких строках поля таблицы.

Что у меня есть:

1. Фрейм данных Spark

Кадр данных выглядит следующим образом:

С этой схемой:

root
 |-- Id: decimal(18,0) (nullable = true)
 |-- OrderId: string (nullable = true)
 |-- Action: string (nullable = true)
 |-- TrayportDateTimeUTC: timestamp (nullable = true)
 |-- GV8Data: string (nullable = true)

2. Образец XML

Это образец одной XML-строки:

<GV8APIDATA
    xmlns:xsd = "http://www.w3.org/2001/XMLSchema"
    xmlns:xsi = "http://www.w3.org/2001/XMLSchema-instance"
    xmlns = "gv8api-trayport-com">
    <ORDER EngineID = "1" PersistentOrderID = "60076044" OrderID = "52389917" OldEngineID = "0" OldOrderID = "0" Action = "Remove" DateTime = "2023-10-14T00:16:33.0467795Z" DateTimeNanoSecondsPart = "0" Price = "169.50" Volume = "1" HiddenVolume = "0" PriceDelta = "0.00" Side = "Ask" Status = "Withheld" Company = "Company" CompanyID = "278" Broker = "ICAP" BrokerID = "4" User = "User" UserID = "1910" Trader = "User" TraderID = "1910" OrderType = "GoodTillCancelled" AllOrNone = "false" CounterPartyOk = "false" SystemRank = "55936541" ImpliedType = "None" IsTradable = "No" OrderDealt = "false" TradingCapacity = "DEAL" ExecutionMaker = "19850428" DerivativeIndicator = "false" DEA = "false" LiquidityProvision = "true" ProductClassification = "RM - MIFID" IsMarketData = "true" IsOwnData = "true">
        <INSTSPECIFIER InstID = "10641712" InstName = "Germany Peaks EEX" FirstSequenceID = "10000104" SeqSpan = "Single" FirstSequenceItemID = "240" SecondSequenceItemID = "0" FirstSequenceItemName = "Dec-23" SecondSequenceItemName = "" TermFormatID = "2906593310" ExternalInstID = "10641712"/>
    </ORDER>
</GV8APIDATA>

Судя по моим проверкам, XML хорошо отформатирован.

Мой предварительный

1. Быстрый тест Spark Sql

Использование функций from_xml e Schema_of_xml

%sql
SELECT from_xml('<GV8APIDATA
    xmlns:xsd = "http://www.w3.org/2001/XMLSchema"
    xmlns:xsi = "http://www.w3.org/2001/XMLSchema-instance"
    xmlns = "gv8api-trayport-com">
    <ORDER EngineID = "1" PersistentOrderID = "60076044" OrderID = "52389917" OldEngineID = "0" OldOrderID = "0" Action = "Remove" DateTime = "2023-10-14T00:16:33.0467795Z" DateTimeNanoSecondsPart = "0" Price = "169.50" Volume = "1" HiddenVolume = "0" PriceDelta = "0.00" Side = "Ask" Status = "Withheld" Company = "Company" CompanyID = "278" Broker = "ICAP" BrokerID = "4" User = "User" UserID = "1910" Trader = "User" TraderID = "1910" OrderType = "GoodTillCancelled" AllOrNone = "false" CounterPartyOk = "false" SystemRank = "55936541" ImpliedType = "None" IsTradable = "No" OrderDealt = "false" TradingCapacity = "DEAL" ExecutionMaker = "19850428" DerivativeIndicator = "false" DEA = "false" LiquidityProvision = "true" ProductClassification = "RM - MIFID" IsMarketData = "true" IsOwnData = "true">
        <INSTSPECIFIER InstID = "10641712" InstName = "Germany Peaks EEX" FirstSequenceID = "10000104" SeqSpan = "Single" FirstSequenceItemID = "240" SecondSequenceItemID = "0" FirstSequenceItemName = "Dec-23" SecondSequenceItemName = "" TermFormatID = "2906593310" ExternalInstID = "10641712"/>
    </ORDER>
</GV8APIDATA>', schema_of_xml('<GV8APIDATA
    xmlns:xsd = "http://www.w3.org/2001/XMLSchema"
    xmlns:xsi = "http://www.w3.org/2001/XMLSchema-instance"
    xmlns = "gv8api-trayport-com">
    <ORDER EngineID = "1" PersistentOrderID = "60076044" OrderID = "52389917" OldEngineID = "0" OldOrderID = "0" Action = "Remove" DateTime = "2023-10-14T00:16:33.0467795Z" DateTimeNanoSecondsPart = "0" Price = "169.50" Volume = "1" HiddenVolume = "0" PriceDelta = "0.00" Side = "Ask" Status = "Withheld" Company = "Company" CompanyID = "278" Broker = "ICAP" BrokerID = "4" User = "User" UserID = "1910" Trader = "User" TraderID = "1910" OrderType = "GoodTillCancelled" AllOrNone = "false" CounterPartyOk = "false" SystemRank = "55936541" ImpliedType = "None" IsTradable = "No" OrderDealt = "false" TradingCapacity = "DEAL" ExecutionMaker = "19850428" DerivativeIndicator = "false" DEA = "false" LiquidityProvision = "true" ProductClassification = "RM - MIFID" IsMarketData = "true" IsOwnData = "true">
        <INSTSPECIFIER InstID = "10641712" InstName = "Germany Peaks EEX" FirstSequenceID = "10000104" SeqSpan = "Single" FirstSequenceItemID = "240" SecondSequenceItemID = "0" FirstSequenceItemName = "Dec-23" SecondSequenceItemName = "" TermFormatID = "2906593310" ExternalInstID = "10641712"/>
    </ORDER>
</GV8APIDATA>')) test

и я получаю более или менее ожидаемый результат

2. Комплексный тест Spark Sql

Использование функций from_xml e Schema_of_xml

SELECT from_xml(GV8Data, schema_of_xml('STRUCT<ORDER: STRUCT<INSTSPECIFIER: STRUCT<_ExternalInstID: BIGINT, _FirstSequenceID: BIGINT, _FirstSequenceItemID: BIGINT, _FirstSequenceItemName: STRING, _InstID: BIGINT, _InstName: STRING, _SecondSequenceItemID: BIGINT, _SecondSequenceItemName: STRING, _SeqSpan: STRING, _TermFormatID: BIGINT>, _Action: STRING, _AllOrNone: BOOLEAN, _Broker: STRING, _BrokerID: BIGINT, _Company: STRING, _CompanyID: BIGINT, _CounterPartyOk: BOOLEAN, _DEA: BOOLEAN, _DateTime: TIMESTAMP, _DateTimeNanoSecondsPart: BIGINT, _DerivativeIndicator: BOOLEAN, _EngineID: BIGINT, _ExecutionMaker: BIGINT, _HiddenVolume: BIGINT, _ImpliedType: STRING, _IsMarketData: BOOLEAN, _IsOwnData: BOOLEAN, _IsTradable: STRING, _LiquidityProvision: BOOLEAN, _OldEngineID: BIGINT, _OldOrderID: BIGINT, _OrderDealt: BOOLEAN, _OrderID: BIGINT, _OrderType: STRING, _PersistentOrderID: BIGINT, _Price: DOUBLE, _PriceDelta: DOUBLE, _ProductClassification: STRING, _Side: STRING, _Status: STRING, _SystemRank: BIGINT, _Trader: STRING, _TraderID: BIGINT, _TradingCapacity: STRING, _User: STRING, _UserID: BIGINT, _Volume: BIGINT>, _xmlns: STRING, `_xmlns:xsd`: STRING, `_xmlns:xsi`: STRING>')) test
FROM PARQUET.`path_to_file/file.parquet`

все начинает идти не так

3. Тест Писпарка

from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType, BooleanType, TimestampType

# INSTSPECIFIER
inst_specifier_schema = StructType([
    StructField("InstID", StringType(), True),
    StructField("InstName", StringType(), True),
    StructField("FirstSequenceID", StringType(), True),
    StructField("SeqSpan", StringType(), True),
    StructField("FirstSequenceItemID", StringType(), True),
    StructField("SecondSequenceItemID", StringType(), True),
    StructField("FirstSequenceItemName", StringType(), True),
    StructField("SecondSequenceItemName", StringType(), True),
    StructField("TermFormatID", StringType(), True),
    StructField("ExternalInstID", StringType(), True)
])

# ORDER
order_schema = StructType([
    StructField("EngineID", StringType(), True),
    StructField("PersistentOrderID", StringType(), True),
    StructField("OrderID", StringType(), True),
    StructField("OldEngineID", StringType(), True),
    StructField("OldOrderID", StringType(), True),
    StructField("Action", StringType(), True),
    StructField("DateTime", StringType(), True),
    StructField("DateTimeNanoSecondsPart", StringType(), True),
    StructField("Price", FloatType(), True),
    StructField("Volume", StringType(), True),
    StructField("HiddenVolume", StringType(), True),
    StructField("PriceDelta", FloatType(), True),
    StructField("Side", StringType(), True),
    StructField("Status", StringType(), True),
    StructField("Company", StringType(), True),
    StructField("CompanyID", StringType(), True),
    StructField("Broker", StringType(), True),
    StructField("BrokerID", StringType(), True),
    StructField("User", StringType(), True),
    StructField("UserID", StringType(), True),
    StructField("Trader", StringType(), True),
    StructField("TraderID", StringType(), True),
    StructField("OrderType", StringType(), True),
    StructField("AllOrNone", BooleanType(), True),
    StructField("CounterPartyOk", BooleanType(), True),
    StructField("SystemRank", StringType(), True),
    StructField("ImpliedType", StringType(), True),
    StructField("IsTradable", StringType(), True),
    StructField("OrderDealt", BooleanType(), True),
    StructField("TradingCapacity", StringType(), True),
    StructField("ExecutionMaker", StringType(), True),
    StructField("DerivativeIndicator", BooleanType(), True),
    StructField("DEA", BooleanType(), True),
    StructField("LiquidityProvision", BooleanType(), True),
    StructField("ProductClassification", StringType(), True),
    StructField("IsMarketData", BooleanType(), True),
    StructField("IsOwnData", BooleanType(), True),
    StructField("INSTSPECIFIER", inst_specifier_schema, True)
])

# root
gv8api_data_schema = StructType([
    StructField("ORDER", order_schema, True)
])

df = spark.read.parquet("path_to_file/file.parquet")
parsed_df = df.withColumn("parsed_xml", from_xml("GV8Data",schema=order_schema))

Мои результаты

Я получаю эту непонятную ошибку

Error in callback <bound method UserNamespaceCommandHook.post_run_cell of <dbruntime.DatasetInfo.UserNamespaceCommandHook object at 0x7fe060e563e0>> (for post_run_cell):

Без каких-либо других подробностей.

Что мне не хватает?

Можете ли вы скомпилировать код в Scala? Я также использую эти данные лотка и разработал метод в Scala, скомпилированный в толстой банке.

Devyl 31.05.2024 19:22

Да, у меня нет языковых ограничений

Gam 03.06.2024 09:45
Стоит ли изучать PHP в 2026-2027 годах?
Стоит ли изучать PHP в 2026-2027 годах?
Привет всем, сегодня я хочу высказать свои соображения по поводу вопроса, который я уже много раз получал в своем сообществе: "Стоит ли изучать PHP в...
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
В JavaScript одним из самых запутанных понятий является поведение ключевого слова "this" в стрелочной и обычной функциях.
Приемы CSS-макетирования - floats и Flexbox
Приемы CSS-макетирования - floats и Flexbox
Здравствуйте, друзья-студенты! Готовы совершенствовать свои навыки веб-дизайна? Сегодня в нашем путешествии мы рассмотрим приемы CSS-верстки - в...
Тестирование функциональных ngrx-эффектов в Angular 16 с помощью Jest
В системе управления состояниями ngrx, совместимой с Angular 16, появились функциональные эффекты. Это здорово и делает код определенно легче для...
Концепция локализации и ее применение в приложениях React ⚡️
Концепция локализации и ее применение в приложениях React ⚡️
Локализация - это процесс адаптации приложения к различным языкам и культурным требованиям. Это позволяет пользователям получить опыт, соответствующий...
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
2
2
85
2
Перейти к ответу Данный вопрос помечен как решенный

Ответы 2

Ответ принят как подходящий

Я думаю, что это самый уродливый код, который я когда-либо писал; это не решает мой вопрос, но представляет собой рабочее решение. Хотя было бы неплохо сделать его более эффективным, если кто-то знает, как это сделать.

По сути, я использую UDF для преобразования xml в json с помощью xmltodict и json, затем извлекаю структуру из словаря с помощью from_json.

import xmltodict, json
from pyspark.sql.functions import schema_of_json, from_json
from pyspark.sql.types import StructType, StructField, StringType, FloatType, BooleanType


# UDF to convert xml to json

def xmlparse(data):
    result = json.dumps(xmltodict.parse(data)["GV8APIDATA"]["ORDER"])
    return result

xmlparse_udf = udf(xmlparse, StringType()) 

# Json schema

# INSTSPECIFIER element
inst_specifier_schema = StructType([
    StructField("@InstID", StringType(), True),
    StructField("@InstName", StringType(), True),
    StructField("@FirstSequenceID", StringType(), True),
    StructField("@SeqSpan", StringType(), True),
    StructField("@FirstSequenceItemID", StringType(), True),
    StructField("@SecondSequenceItemID", StringType(), True),
    StructField("@FirstSequenceItemName", StringType(), True),
    StructField("@SecondSequenceItemName", StringType(), True),
    StructField("@TermFormatID", StringType(), True),
    StructField("@ExternalInstID", StringType(), True)
])

# ORDER element
order_schema = StructType([
    StructField("@EngineID", StringType(), True),
    StructField("@PersistentOrderID", StringType(), True),
    StructField("@OrderID", StringType(), True),
    StructField("@OldEngineID", StringType(), True),
    StructField("@OldOrderID", StringType(), True),
    StructField("@Action", StringType(), True),
    StructField("@DateTime", StringType(), True),
    StructField("@DateTimeNanoSecondsPart", StringType(), True),
    StructField("@Price", FloatType(), True),
    StructField("@Volume", StringType(), True),
    StructField("@HiddenVolume", StringType(), True),
    StructField("@PriceDelta", FloatType(), True),
    StructField("@Side", StringType(), True),
    StructField("@Status", StringType(), True),
    StructField("@Company", StringType(), True),
    StructField("@CompanyID", StringType(), True),
    StructField("@Broker", StringType(), True),
    StructField("@BrokerID", StringType(), True),
    StructField("@OldBrokerID", StringType(), True),
    StructField("@User", StringType(), True),
    StructField("@UserID", StringType(), True),
    StructField("@Trader", StringType(), True),
    StructField("@TraderID", StringType(), True),
    StructField("@OrderType", StringType(), True),
    StructField("@AllOrNone", BooleanType(), True),
    StructField("@CounterPartyOk", BooleanType(), True),
    StructField("@ImpliedType", StringType(), True),
    StructField("@IsTradable", StringType(), True),
    StructField("@OrderDealt", BooleanType(), True),
    StructField("@Execution", StringType(), True),
    StructField("@IsMarketData", BooleanType(), True),
    StructField("@IsOwnData", BooleanType(), True),
    StructField("INSTSPECIFIER", inst_specifier_schema, True)
])

# Execution

df = spark.read.parquet("path_to_file/file.parquet")
parsed_df = df.withColumn("GV8Data_json", xmlparse_udf("GV8Data")).withColumn("GV8Data_struct", from_json("GV8Data_json",schema=order_schema))

Извини за поздний ответ,

вы можете использовать собственный пакет scala.xml для анализа, например:

затем вы можете скомпилировать толстую банку или пакет, чтобы добавить его в свою искровую работу.

import spark.implicits._

case class ORDER(
  id: String
)

def parse(message: String) = {
  // Split cml content by "TRADE" like
  val node = scala.xml.XML.loadString(message)
  (node \ "ORDER").iterator.map(elem => {
    val id = (elem \ "@OrderID").text
    ORDER(id)
  })
}

val message = "<GV8APIDATA\n    xmlns:xsd=\"http://www.w3.org/2001/XMLSchema\"\n    xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\"\n    xmlns=\"gv8api-trayport-com\">\n    <ORDER EngineID=\"1\" PersistentOrderID=\"60076044\" OrderID=\"52389917\" OldEngineID=\"0\" OldOrderID=\"0\" Action=\"Remove\" DateTime=\"2023-10-14T00:16:33.0467795Z\" DateTimeNanoSecondsPart=\"0\" Price=\"169.50\" Volume=\"1\" HiddenVolume=\"0\" PriceDelta=\"0.00\" Side=\"Ask\" Status=\"Withheld\" Company=\"Company\" CompanyID=\"278\" Broker=\"ICAP\" BrokerID=\"4\" User=\"User\" UserID=\"1910\" Trader=\"User\" TraderID=\"1910\" OrderType=\"GoodTillCancelled\" AllOrNone=\"false\" CounterPartyOk=\"false\" SystemRank=\"55936541\" ImpliedType=\"None\" IsTradable=\"No\" OrderDealt=\"false\" TradingCapacity=\"DEAL\" ExecutionMaker=\"19850428\" DerivativeIndicator=\"false\" DEA=\"false\" LiquidityProvision=\"true\" ProductClassification=\"RM - MIFID\" IsMarketData=\"true\" IsOwnData=\"true\">\n        <INSTSPECIFIER InstID=\"10641712\" InstName=\"Germany Peaks EEX\" FirstSequenceID=\"10000104\" SeqSpan=\"Single\" FirstSequenceItemID=\"240\" SecondSequenceItemID=\"0\" FirstSequenceItemName=\"Dec-23\" SecondSequenceItemName=\"\" TermFormatID=\"2906593310\" ExternalInstID=\"10641712\"/>\n    </ORDER>\n</GV8APIDATA>"
val ds = Seq(message).toDS.flatMap(parse)
ds.show(false)

Другие вопросы по теме