Я пытаюсь проанализировать строки XML, записанные в нескольких строках поля таблицы.
1. Фрейм данных Spark
Кадр данных выглядит следующим образом:
С этой схемой:
root
|-- Id: decimal(18,0) (nullable = true)
|-- OrderId: string (nullable = true)
|-- Action: string (nullable = true)
|-- TrayportDateTimeUTC: timestamp (nullable = true)
|-- GV8Data: string (nullable = true)
2. Образец XML
Это образец одной XML-строки:
<GV8APIDATA
xmlns:xsd = "http://www.w3.org/2001/XMLSchema"
xmlns:xsi = "http://www.w3.org/2001/XMLSchema-instance"
xmlns = "gv8api-trayport-com">
<ORDER EngineID = "1" PersistentOrderID = "60076044" OrderID = "52389917" OldEngineID = "0" OldOrderID = "0" Action = "Remove" DateTime = "2023-10-14T00:16:33.0467795Z" DateTimeNanoSecondsPart = "0" Price = "169.50" Volume = "1" HiddenVolume = "0" PriceDelta = "0.00" Side = "Ask" Status = "Withheld" Company = "Company" CompanyID = "278" Broker = "ICAP" BrokerID = "4" User = "User" UserID = "1910" Trader = "User" TraderID = "1910" OrderType = "GoodTillCancelled" AllOrNone = "false" CounterPartyOk = "false" SystemRank = "55936541" ImpliedType = "None" IsTradable = "No" OrderDealt = "false" TradingCapacity = "DEAL" ExecutionMaker = "19850428" DerivativeIndicator = "false" DEA = "false" LiquidityProvision = "true" ProductClassification = "RM - MIFID" IsMarketData = "true" IsOwnData = "true">
<INSTSPECIFIER InstID = "10641712" InstName = "Germany Peaks EEX" FirstSequenceID = "10000104" SeqSpan = "Single" FirstSequenceItemID = "240" SecondSequenceItemID = "0" FirstSequenceItemName = "Dec-23" SecondSequenceItemName = "" TermFormatID = "2906593310" ExternalInstID = "10641712"/>
</ORDER>
</GV8APIDATA>
Судя по моим проверкам, XML хорошо отформатирован.
1. Быстрый тест Spark Sql
Использование функций from_xml e Schema_of_xml
%sql
SELECT from_xml('<GV8APIDATA
xmlns:xsd = "http://www.w3.org/2001/XMLSchema"
xmlns:xsi = "http://www.w3.org/2001/XMLSchema-instance"
xmlns = "gv8api-trayport-com">
<ORDER EngineID = "1" PersistentOrderID = "60076044" OrderID = "52389917" OldEngineID = "0" OldOrderID = "0" Action = "Remove" DateTime = "2023-10-14T00:16:33.0467795Z" DateTimeNanoSecondsPart = "0" Price = "169.50" Volume = "1" HiddenVolume = "0" PriceDelta = "0.00" Side = "Ask" Status = "Withheld" Company = "Company" CompanyID = "278" Broker = "ICAP" BrokerID = "4" User = "User" UserID = "1910" Trader = "User" TraderID = "1910" OrderType = "GoodTillCancelled" AllOrNone = "false" CounterPartyOk = "false" SystemRank = "55936541" ImpliedType = "None" IsTradable = "No" OrderDealt = "false" TradingCapacity = "DEAL" ExecutionMaker = "19850428" DerivativeIndicator = "false" DEA = "false" LiquidityProvision = "true" ProductClassification = "RM - MIFID" IsMarketData = "true" IsOwnData = "true">
<INSTSPECIFIER InstID = "10641712" InstName = "Germany Peaks EEX" FirstSequenceID = "10000104" SeqSpan = "Single" FirstSequenceItemID = "240" SecondSequenceItemID = "0" FirstSequenceItemName = "Dec-23" SecondSequenceItemName = "" TermFormatID = "2906593310" ExternalInstID = "10641712"/>
</ORDER>
</GV8APIDATA>', schema_of_xml('<GV8APIDATA
xmlns:xsd = "http://www.w3.org/2001/XMLSchema"
xmlns:xsi = "http://www.w3.org/2001/XMLSchema-instance"
xmlns = "gv8api-trayport-com">
<ORDER EngineID = "1" PersistentOrderID = "60076044" OrderID = "52389917" OldEngineID = "0" OldOrderID = "0" Action = "Remove" DateTime = "2023-10-14T00:16:33.0467795Z" DateTimeNanoSecondsPart = "0" Price = "169.50" Volume = "1" HiddenVolume = "0" PriceDelta = "0.00" Side = "Ask" Status = "Withheld" Company = "Company" CompanyID = "278" Broker = "ICAP" BrokerID = "4" User = "User" UserID = "1910" Trader = "User" TraderID = "1910" OrderType = "GoodTillCancelled" AllOrNone = "false" CounterPartyOk = "false" SystemRank = "55936541" ImpliedType = "None" IsTradable = "No" OrderDealt = "false" TradingCapacity = "DEAL" ExecutionMaker = "19850428" DerivativeIndicator = "false" DEA = "false" LiquidityProvision = "true" ProductClassification = "RM - MIFID" IsMarketData = "true" IsOwnData = "true">
<INSTSPECIFIER InstID = "10641712" InstName = "Germany Peaks EEX" FirstSequenceID = "10000104" SeqSpan = "Single" FirstSequenceItemID = "240" SecondSequenceItemID = "0" FirstSequenceItemName = "Dec-23" SecondSequenceItemName = "" TermFormatID = "2906593310" ExternalInstID = "10641712"/>
</ORDER>
</GV8APIDATA>')) test
и я получаю более или менее ожидаемый результат
2. Комплексный тест Spark Sql
Использование функций from_xml e Schema_of_xml
SELECT from_xml(GV8Data, schema_of_xml('STRUCT<ORDER: STRUCT<INSTSPECIFIER: STRUCT<_ExternalInstID: BIGINT, _FirstSequenceID: BIGINT, _FirstSequenceItemID: BIGINT, _FirstSequenceItemName: STRING, _InstID: BIGINT, _InstName: STRING, _SecondSequenceItemID: BIGINT, _SecondSequenceItemName: STRING, _SeqSpan: STRING, _TermFormatID: BIGINT>, _Action: STRING, _AllOrNone: BOOLEAN, _Broker: STRING, _BrokerID: BIGINT, _Company: STRING, _CompanyID: BIGINT, _CounterPartyOk: BOOLEAN, _DEA: BOOLEAN, _DateTime: TIMESTAMP, _DateTimeNanoSecondsPart: BIGINT, _DerivativeIndicator: BOOLEAN, _EngineID: BIGINT, _ExecutionMaker: BIGINT, _HiddenVolume: BIGINT, _ImpliedType: STRING, _IsMarketData: BOOLEAN, _IsOwnData: BOOLEAN, _IsTradable: STRING, _LiquidityProvision: BOOLEAN, _OldEngineID: BIGINT, _OldOrderID: BIGINT, _OrderDealt: BOOLEAN, _OrderID: BIGINT, _OrderType: STRING, _PersistentOrderID: BIGINT, _Price: DOUBLE, _PriceDelta: DOUBLE, _ProductClassification: STRING, _Side: STRING, _Status: STRING, _SystemRank: BIGINT, _Trader: STRING, _TraderID: BIGINT, _TradingCapacity: STRING, _User: STRING, _UserID: BIGINT, _Volume: BIGINT>, _xmlns: STRING, `_xmlns:xsd`: STRING, `_xmlns:xsi`: STRING>')) test
FROM PARQUET.`path_to_file/file.parquet`
все начинает идти не так
3. Тест Писпарка
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType, BooleanType, TimestampType
# INSTSPECIFIER
inst_specifier_schema = StructType([
StructField("InstID", StringType(), True),
StructField("InstName", StringType(), True),
StructField("FirstSequenceID", StringType(), True),
StructField("SeqSpan", StringType(), True),
StructField("FirstSequenceItemID", StringType(), True),
StructField("SecondSequenceItemID", StringType(), True),
StructField("FirstSequenceItemName", StringType(), True),
StructField("SecondSequenceItemName", StringType(), True),
StructField("TermFormatID", StringType(), True),
StructField("ExternalInstID", StringType(), True)
])
# ORDER
order_schema = StructType([
StructField("EngineID", StringType(), True),
StructField("PersistentOrderID", StringType(), True),
StructField("OrderID", StringType(), True),
StructField("OldEngineID", StringType(), True),
StructField("OldOrderID", StringType(), True),
StructField("Action", StringType(), True),
StructField("DateTime", StringType(), True),
StructField("DateTimeNanoSecondsPart", StringType(), True),
StructField("Price", FloatType(), True),
StructField("Volume", StringType(), True),
StructField("HiddenVolume", StringType(), True),
StructField("PriceDelta", FloatType(), True),
StructField("Side", StringType(), True),
StructField("Status", StringType(), True),
StructField("Company", StringType(), True),
StructField("CompanyID", StringType(), True),
StructField("Broker", StringType(), True),
StructField("BrokerID", StringType(), True),
StructField("User", StringType(), True),
StructField("UserID", StringType(), True),
StructField("Trader", StringType(), True),
StructField("TraderID", StringType(), True),
StructField("OrderType", StringType(), True),
StructField("AllOrNone", BooleanType(), True),
StructField("CounterPartyOk", BooleanType(), True),
StructField("SystemRank", StringType(), True),
StructField("ImpliedType", StringType(), True),
StructField("IsTradable", StringType(), True),
StructField("OrderDealt", BooleanType(), True),
StructField("TradingCapacity", StringType(), True),
StructField("ExecutionMaker", StringType(), True),
StructField("DerivativeIndicator", BooleanType(), True),
StructField("DEA", BooleanType(), True),
StructField("LiquidityProvision", BooleanType(), True),
StructField("ProductClassification", StringType(), True),
StructField("IsMarketData", BooleanType(), True),
StructField("IsOwnData", BooleanType(), True),
StructField("INSTSPECIFIER", inst_specifier_schema, True)
])
# root
gv8api_data_schema = StructType([
StructField("ORDER", order_schema, True)
])
df = spark.read.parquet("path_to_file/file.parquet")
parsed_df = df.withColumn("parsed_xml", from_xml("GV8Data",schema=order_schema))
Я получаю эту непонятную ошибку
Error in callback <bound method UserNamespaceCommandHook.post_run_cell of <dbruntime.DatasetInfo.UserNamespaceCommandHook object at 0x7fe060e563e0>> (for post_run_cell):
Без каких-либо других подробностей.
Что мне не хватает?
Да, у меня нет языковых ограничений





Я думаю, что это самый уродливый код, который я когда-либо писал; это не решает мой вопрос, но представляет собой рабочее решение. Хотя было бы неплохо сделать его более эффективным, если кто-то знает, как это сделать.
По сути, я использую UDF для преобразования xml в json с помощью xmltodict и json, затем извлекаю структуру из словаря с помощью from_json.
import xmltodict, json
from pyspark.sql.functions import schema_of_json, from_json
from pyspark.sql.types import StructType, StructField, StringType, FloatType, BooleanType
# UDF to convert xml to json
def xmlparse(data):
result = json.dumps(xmltodict.parse(data)["GV8APIDATA"]["ORDER"])
return result
xmlparse_udf = udf(xmlparse, StringType())
# Json schema
# INSTSPECIFIER element
inst_specifier_schema = StructType([
StructField("@InstID", StringType(), True),
StructField("@InstName", StringType(), True),
StructField("@FirstSequenceID", StringType(), True),
StructField("@SeqSpan", StringType(), True),
StructField("@FirstSequenceItemID", StringType(), True),
StructField("@SecondSequenceItemID", StringType(), True),
StructField("@FirstSequenceItemName", StringType(), True),
StructField("@SecondSequenceItemName", StringType(), True),
StructField("@TermFormatID", StringType(), True),
StructField("@ExternalInstID", StringType(), True)
])
# ORDER element
order_schema = StructType([
StructField("@EngineID", StringType(), True),
StructField("@PersistentOrderID", StringType(), True),
StructField("@OrderID", StringType(), True),
StructField("@OldEngineID", StringType(), True),
StructField("@OldOrderID", StringType(), True),
StructField("@Action", StringType(), True),
StructField("@DateTime", StringType(), True),
StructField("@DateTimeNanoSecondsPart", StringType(), True),
StructField("@Price", FloatType(), True),
StructField("@Volume", StringType(), True),
StructField("@HiddenVolume", StringType(), True),
StructField("@PriceDelta", FloatType(), True),
StructField("@Side", StringType(), True),
StructField("@Status", StringType(), True),
StructField("@Company", StringType(), True),
StructField("@CompanyID", StringType(), True),
StructField("@Broker", StringType(), True),
StructField("@BrokerID", StringType(), True),
StructField("@OldBrokerID", StringType(), True),
StructField("@User", StringType(), True),
StructField("@UserID", StringType(), True),
StructField("@Trader", StringType(), True),
StructField("@TraderID", StringType(), True),
StructField("@OrderType", StringType(), True),
StructField("@AllOrNone", BooleanType(), True),
StructField("@CounterPartyOk", BooleanType(), True),
StructField("@ImpliedType", StringType(), True),
StructField("@IsTradable", StringType(), True),
StructField("@OrderDealt", BooleanType(), True),
StructField("@Execution", StringType(), True),
StructField("@IsMarketData", BooleanType(), True),
StructField("@IsOwnData", BooleanType(), True),
StructField("INSTSPECIFIER", inst_specifier_schema, True)
])
# Execution
df = spark.read.parquet("path_to_file/file.parquet")
parsed_df = df.withColumn("GV8Data_json", xmlparse_udf("GV8Data")).withColumn("GV8Data_struct", from_json("GV8Data_json",schema=order_schema))
Извини за поздний ответ,
вы можете использовать собственный пакет scala.xml для анализа, например:
затем вы можете скомпилировать толстую банку или пакет, чтобы добавить его в свою искровую работу.
import spark.implicits._
case class ORDER(
id: String
)
def parse(message: String) = {
// Split cml content by "TRADE" like
val node = scala.xml.XML.loadString(message)
(node \ "ORDER").iterator.map(elem => {
val id = (elem \ "@OrderID").text
ORDER(id)
})
}
val message = "<GV8APIDATA\n xmlns:xsd=\"http://www.w3.org/2001/XMLSchema\"\n xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\"\n xmlns=\"gv8api-trayport-com\">\n <ORDER EngineID=\"1\" PersistentOrderID=\"60076044\" OrderID=\"52389917\" OldEngineID=\"0\" OldOrderID=\"0\" Action=\"Remove\" DateTime=\"2023-10-14T00:16:33.0467795Z\" DateTimeNanoSecondsPart=\"0\" Price=\"169.50\" Volume=\"1\" HiddenVolume=\"0\" PriceDelta=\"0.00\" Side=\"Ask\" Status=\"Withheld\" Company=\"Company\" CompanyID=\"278\" Broker=\"ICAP\" BrokerID=\"4\" User=\"User\" UserID=\"1910\" Trader=\"User\" TraderID=\"1910\" OrderType=\"GoodTillCancelled\" AllOrNone=\"false\" CounterPartyOk=\"false\" SystemRank=\"55936541\" ImpliedType=\"None\" IsTradable=\"No\" OrderDealt=\"false\" TradingCapacity=\"DEAL\" ExecutionMaker=\"19850428\" DerivativeIndicator=\"false\" DEA=\"false\" LiquidityProvision=\"true\" ProductClassification=\"RM - MIFID\" IsMarketData=\"true\" IsOwnData=\"true\">\n <INSTSPECIFIER InstID=\"10641712\" InstName=\"Germany Peaks EEX\" FirstSequenceID=\"10000104\" SeqSpan=\"Single\" FirstSequenceItemID=\"240\" SecondSequenceItemID=\"0\" FirstSequenceItemName=\"Dec-23\" SecondSequenceItemName=\"\" TermFormatID=\"2906593310\" ExternalInstID=\"10641712\"/>\n </ORDER>\n</GV8APIDATA>"
val ds = Seq(message).toDS.flatMap(parse)
ds.show(false)
Можете ли вы скомпилировать код в Scala? Я также использую эти данные лотка и разработал метод в Scala, скомпилированный в толстой банке.