Я столкнулся с проблемой при попытке сохранить данные JSON в виде таблицы Delta Lake с помощью PySpark и Delta Lake.
Вот мой код:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from delta import *
delta_version = "2.4.0"
spark = SparkSession.builder \
.appName("JSONToDeltaLake") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.config("spark.jars.packages", f"io.delta:delta-core_2.12:{delta_version}") \
.getOrCreate()
json_data = """
[
{
"name": "John Doe",
"age": 30,
"city": "New York"
},
{
"name": "Jane Smith",
"age": 25,
"city": "Los Angeles"
}
]
"""
json_path = "example_data.json"
with open(json_path, "w") as file:
file.write(json_data)
schema = StructType([
StructField("name", StringType(), True),
StructField("age", IntegerType(), True),
StructField("city", StringType(), True)
])
try:
df = spark.read.schema(schema).json(json_path)
except Exception as e:
print(f"Error reading JSON file: {e}")
spark.stop()
exit(1)
df.printSchema()
df.show()
delta_path = "example_delta_table"
df.write.format("delta").mode("overwrite").save(delta_path)
delta_table = DeltaTable.forPath(spark, delta_path)
delta_df = delta_table.toDF()
delta_df.show()
spark.stop()
Этот код создает пример данных JSON, сохраняет их в файл, считывает данные JSON с помощью PySpark, а затем сохраняет их как таблицу Delta Lake.
Однако когда я запускаю код, в таблице Delta Lake сохраняются только нулевые значения.
& C:/Users/no2si/AppData/Local/Programs/Python/Python311/python.exe c:/Users/no2si/Documents/MarketReSearch/TodayhomeScrape/deltalpp.py
:: loading settings :: url = jar:file:/C:/spark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml
Ivy Default Cache set to: C:\Users\no2si\.ivy2\cache
The jars for the packages stored in: C:\Users\no2si\.ivy2\jars
io.delta#delta-core_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-58d527be-f279-42cc-a057-5a43146af2cd;1.0
confs: [default]
found io.delta#delta-core_2.12;2.4.0 in central
found io.delta#delta-storage;2.4.0 in central
found org.antlr#antlr4-runtime;4.9.3 in central
:: resolution report :: resolve 173ms :: artifacts dl 8ms
:: modules in use:
io.delta#delta-core_2.12;2.4.0 from central in [default]
io.delta#delta-storage;2.4.0 from central in [default]
org.antlr#antlr4-runtime;4.9.3 from central in [default]
---------------------------------------------------------------------
| | modules || artifacts |
| conf | number| search|dwnlded|evicted|| number|dwnlded|
---------------------------------------------------------------------
| default | 3 | 0 | 0 | 0 || 3 | 0 |
---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-58d527be-f279-42cc-a057-5a43146af2cd
confs: [default]
0 artifacts copied, 3 already retrieved (0kB/10ms)
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
root
|-- name: string (nullable = true)
|-- age: integer (nullable = true)
|-- city: string (nullable = true)
+----+----+----+
|name| age|city|
+----+----+----+
|null|null|null|
|null|null|null|
|null|null|null|
|null|null|null|
|null|null|null|
|null|null|null|
|null|null|null|
|null|null|null|
|null|null|null|
|null|null|null|
|null|null|null|
|null|null|null|
+----+----+----+
24/06/07 13:58:49 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
+----+----+----+
|name| age|city|
+----+----+----+
|null|null|null|
|null|null|null|
|null|null|null|
|null|null|null|
|null|null|null|
|null|null|null|
|null|null|null|
|null|null|null|
|null|null|null|
|null|null|null|
|null|null|null|
|null|null|null|
+----+----+----+
24/06/07 13:58:56 WARN SparkEnv: Exception while deleting Spark temp dir: C:\Users\no2si\Documents\MarketReSearch\TodayhomeScrape\spark_temp\spark-eb98b831-105b-46fe-8173-be24ef43c323\userFiles-6cf29488-da35-4e58-8d65-1156838ddd7c
java.io.IOException: Failed to delete: C:\Users\no2si\Documents\MarketReSearch\TodayhomeScrape\spark_temp\spark-eb98b831-105b-46fe-8173-be24ef43c323\userFiles-6cf29488-da35-4e58-8d65-1156838ddd7c\org.antlr_antlr4-runtime-4.9.3.jar
at org.apache.spark.network.util.JavaUtils.deleteRecursivelyUsingJavaIO(JavaUtils.java:151)
at org.apache.spark.network.util.JavaUtils.deleteRecursively(JavaUtils.java:121)
at org.apache.spark.network.util.JavaUtils.deleteRecursivelyUsingJavaIO(JavaUtils.java:134)
at org.apache.spark.network.util.JavaUtils.deleteRecursively(JavaUtils.java:121)
at org.apache.spark.network.util.JavaUtils.deleteRecursively(JavaUtils.java:94)
at org.apache.spark.util.Utils$.deleteRecursively(Utils.scala:1231)
at org.apache.spark.SparkEnv.stop(SparkEnv.scala:108)
at org.apache.spark.SparkContext.$anonfun$stop$25(SparkContext.scala:2175)
at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1509)
at org.apache.spark.SparkContext.stop(SparkContext.scala:2175)
at org.apache.spark.SparkContext.stop(SparkContext.scala:2081)
at org.apache.spark.api.java.JavaSparkContext.stop(JavaSparkContext.scala:550)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
at java.base/java.lang.Thread.run(Thread.java:834)
24/06/07 13:58:57 ERROR ShutdownHookManager: Exception while deleting Spark temp dir: C:\Users\no2si\Documents\MarketReSearch\TodayhomeScrape\spark_temp\spark-eb98b831-105b-46fe-8173-be24ef43c323
java.io.IOException: Failed to delete: C:\Users\no2si\Documents\MarketReSearch\TodayhomeScrape\spark_temp\spark-eb98b831-105b-46fe-8173-be24ef43c323\userFiles-6cf29488-da35-4e58-8d65-1156838ddd7c\org.antlr_antlr4-runtime-4.9.3.jar
at org.apache.spark.network.util.JavaUtils.deleteRecursivelyUsingJavaIO(JavaUtils.java:151)
at org.apache.spark.network.util.JavaUtils.deleteRecursively(JavaUtils.java:121)
at org.apache.spark.network.util.JavaUtils.deleteRecursivelyUsingJavaIO(JavaUtils.java:134)
at org.apache.spark.network.util.JavaUtils.deleteRecursively(JavaUtils.java:121)
at org.apache.spark.network.util.JavaUtils.deleteRecursivelyUsingJavaIO(JavaUtils.java:134)
at org.apache.spark.network.util.JavaUtils.deleteRecursively(JavaUtils.java:121)
at org.apache.spark.network.util.JavaUtils.deleteRecursively(JavaUtils.java:94)
at org.apache.spark.util.Utils$.deleteRecursively(Utils.scala:1231)
at org.apache.spark.util.ShutdownHookManager$.$anonfun$new$4(ShutdownHookManager.scala:65)
at org.apache.spark.util.ShutdownHookManager$.$anonfun$new$4$adapted(ShutdownHookManager.scala:62)
at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
at org.apache.spark.util.ShutdownHookManager$.$anonfun$new$2(ShutdownHookManager.scala:62)
at org.apache.spark.util.SparkShutdownHook.run(ShutdownHookManager.scala:214)
at org.apache.spark.util.SparkShutdownHookManager.$anonfun$runAll$2(ShutdownHookManager.scala:188)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:2088)
at org.apache.spark.util.SparkShutdownHookManager.$anonfun$runAll$1(ShutdownHookManager.scala:188)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at scala.util.Try$.apply(Try.scala:213)
at org.apache.spark.util.SparkShutdownHookManager.runAll(ShutdownHookManager.scala:188)
at org.apache.spark.util.SparkShutdownHookManager$$anon$2.run(ShutdownHookManager.scala:178)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
PS C:\Users\no2si\Documents\MarketReSearch\TodayhomeScrape> 24/06/07 13:58:57 ERROR ShutdownHookManager: Exception while deleting Spark temp dir: C:\Users\no2si\Documents\MarketReSearch\TodayhomeScrape\spark_temp\spark-eb98b831-105b-46fe-8173-be24ef43c323\userFiles-6cf29488-da35-4e58-8d65-1156838ddd7c
java.io.IOException: Failed to delete: C:\Users\no2si\Documents\MarketReSearch\TodayhomeScrape\spark_temp\spark-eb98b831-105b-46fe-8173-be24ef43c323\userFiles-6cf29488-da35-4e58-8d65-1156838ddd7c\org.antlr_antlr4-runtime-4.9.3.jar
at org.apache.spark.network.util.JavaUtils.deleteRecursivelyUsingJavaIO(JavaUtils.java:151)
at org.apache.spark.network.util.JavaUtils.deleteRecursively(JavaUtils.java:121)
at org.apache.spark.network.util.JavaUtils.deleteRecursivelyUsingJavaIO(JavaUtils.java:134)
at org.apache.spark.network.util.JavaUtils.deleteRecursively(JavaUtils.java:121)
at org.apache.spark.network.util.JavaUtils.deleteRecursively(JavaUtils.java:94)
at org.apache.spark.util.Utils$.deleteRecursively(Utils.scala:1231)
at org.apache.spark.util.ShutdownHookManager$.$anonfun$new$4(ShutdownHookManager.scala:65)
at org.apache.spark.util.ShutdownHookManager$.$anonfun$new$4$adapted(ShutdownHookManager.scala:62)
at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
at org.apache.spark.util.ShutdownHookManager$.$anonfun$new$2(ShutdownHookManager.scala:62)
at org.apache.spark.util.SparkShutdownHook.run(ShutdownHookManager.scala:214)
at org.apache.spark.util.SparkShutdownHookManager.$anonfun$runAll$2(ShutdownHookManager.scala:188)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:2088)
at org.apache.spark.util.SparkShutdownHookManager.$anonfun$runAll$1(ShutdownHookManager.scala:188)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at scala.util.Try$.apply(Try.scala:213)
at org.apache.spark.util.SparkShutdownHookManager.runAll(ShutdownHookManager.scala:188)
at org.apache.spark.util.SparkShutdownHookManager$$anon$2.run(ShutdownHookManager.scala:178)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
SUCCESS: The process with PID 7444 (child process of PID 14792) has been terminated.
SUCCESS: The process with PID 14792 (child process of PID 13788) has been terminated.
SUCCESS: The process with PID 13788 (child process of PID 1864) has been terminated.
Что мне следует изменить, чтобы решить эту проблему? Я был бы признателен, если бы вы рассказали мне, как правильно хранить данные JSON в таблице Delta Lake.
Кроме того, я также был бы признателен за любые советы о причинах и решениях предупреждений и сообщений об ошибках, появляющихся в журналах.
Спасибо.
Я попытался убедиться, что файл JSON загружается правильно и указанный путь к хранилищу действителен.
Чтобы проверить, правильно ли загружается файл JSON, я добавил код для печати содержимого файла JSON после его сохранения. Это позволило мне убедиться, что данные JSON записываются в файл, как и ожидалось.
Ваш json-файл использует более одной строки, поэтому вам нужно использовать многострочный параметр для чтения файла. Добавьте .option("multiline", "true") при чтении файла json.
df = spark.read.schema(schema).option("multiline", "true").json(json_path)
Использованная литература:
Ошибка _corrupt_record при чтении файла JSON в Spark
Он работает идеально, я хорошо разбираюсь в Java и тестирую его как Dataset<Row> df = spark.read().schema(schema).option("multiline", "true").json("C:\\0001.TempWS\\example_data.json");
df.show()
ничего не показывает, значит, оно не было загружено должным образом. Я предполагаю, что ему не нравится[]
, т. е. вы сохранили как массив. Попробуйте по одному объекту JSON в строке, без форматирования, без[]
, без запятых между объектами.