Данные JSON, хранящиеся как нулевые значения в таблице Delta Lake с использованием PySpark

Я столкнулся с проблемой при попытке сохранить данные JSON в виде таблицы Delta Lake с помощью PySpark и Delta Lake.

Вот мой код:

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from delta import *

delta_version = "2.4.0" 

spark = SparkSession.builder \
    .appName("JSONToDeltaLake") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .config("spark.jars.packages", f"io.delta:delta-core_2.12:{delta_version}") \
    .getOrCreate()

json_data = """
[
    {
        "name": "John Doe",
        "age": 30,
        "city": "New York"
    },
    {
        "name": "Jane Smith",
        "age": 25,
        "city": "Los Angeles"
    }
]
"""

json_path = "example_data.json"
with open(json_path, "w") as file:
    file.write(json_data)

schema = StructType([
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True),
    StructField("city", StringType(), True)
])

try:
    df = spark.read.schema(schema).json(json_path)
except Exception as e:
    print(f"Error reading JSON file: {e}")
    spark.stop()
    exit(1)

df.printSchema()
df.show()

delta_path = "example_delta_table"
df.write.format("delta").mode("overwrite").save(delta_path)
delta_table = DeltaTable.forPath(spark, delta_path)
delta_df = delta_table.toDF()
delta_df.show()

spark.stop()

Этот код создает пример данных JSON, сохраняет их в файл, считывает данные JSON с помощью PySpark, а затем сохраняет их как таблицу Delta Lake.

Однако когда я запускаю код, в таблице Delta Lake сохраняются только нулевые значения.

& C:/Users/no2si/AppData/Local/Programs/Python/Python311/python.exe c:/Users/no2si/Documents/MarketReSearch/TodayhomeScrape/deltalpp.py
:: loading settings :: url = jar:file:/C:/spark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml
Ivy Default Cache set to: C:\Users\no2si\.ivy2\cache
The jars for the packages stored in: C:\Users\no2si\.ivy2\jars
io.delta#delta-core_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-58d527be-f279-42cc-a057-5a43146af2cd;1.0
        confs: [default]
        found io.delta#delta-core_2.12;2.4.0 in central
        found io.delta#delta-storage;2.4.0 in central
        found org.antlr#antlr4-runtime;4.9.3 in central
:: resolution report :: resolve 173ms :: artifacts dl 8ms
        :: modules in use:
        io.delta#delta-core_2.12;2.4.0 from central in [default]
        io.delta#delta-storage;2.4.0 from central in [default]
        org.antlr#antlr4-runtime;4.9.3 from central in [default]
        ---------------------------------------------------------------------        
        |                  |            modules            ||   artifacts   |        
        |       conf       | number| search|dwnlded|evicted|| number|dwnlded|        
        ---------------------------------------------------------------------        
        |      default     |   3   |   0   |   0   |   0   ||   3   |   0   |        
        ---------------------------------------------------------------------        
:: retrieving :: org.apache.spark#spark-submit-parent-58d527be-f279-42cc-a057-5a43146af2cd
        confs: [default]
        0 artifacts copied, 3 already retrieved (0kB/10ms)
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
root
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- city: string (nullable = true)

+----+----+----+
|name| age|city|
+----+----+----+
|null|null|null|
|null|null|null|
|null|null|null|
|null|null|null|
|null|null|null|
|null|null|null|
|null|null|null|
|null|null|null|
|null|null|null|
|null|null|null|
|null|null|null|
|null|null|null|
+----+----+----+

24/06/07 13:58:49 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
+----+----+----+
|name| age|city|
+----+----+----+
|null|null|null|
|null|null|null|
|null|null|null|
|null|null|null|
|null|null|null|
|null|null|null|
|null|null|null|
|null|null|null|
|null|null|null|
|null|null|null|
|null|null|null|
|null|null|null|
+----+----+----+

24/06/07 13:58:56 WARN SparkEnv: Exception while deleting Spark temp dir: C:\Users\no2si\Documents\MarketReSearch\TodayhomeScrape\spark_temp\spark-eb98b831-105b-46fe-8173-be24ef43c323\userFiles-6cf29488-da35-4e58-8d65-1156838ddd7c
java.io.IOException: Failed to delete: C:\Users\no2si\Documents\MarketReSearch\TodayhomeScrape\spark_temp\spark-eb98b831-105b-46fe-8173-be24ef43c323\userFiles-6cf29488-da35-4e58-8d65-1156838ddd7c\org.antlr_antlr4-runtime-4.9.3.jar
        at org.apache.spark.network.util.JavaUtils.deleteRecursivelyUsingJavaIO(JavaUtils.java:151)
        at org.apache.spark.network.util.JavaUtils.deleteRecursively(JavaUtils.java:121)
        at org.apache.spark.network.util.JavaUtils.deleteRecursivelyUsingJavaIO(JavaUtils.java:134)
        at org.apache.spark.network.util.JavaUtils.deleteRecursively(JavaUtils.java:121)
        at org.apache.spark.network.util.JavaUtils.deleteRecursively(JavaUtils.java:94)
        at org.apache.spark.util.Utils$.deleteRecursively(Utils.scala:1231)
        at org.apache.spark.SparkEnv.stop(SparkEnv.scala:108)
        at org.apache.spark.SparkContext.$anonfun$stop$25(SparkContext.scala:2175)   
        at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1509)        
        at org.apache.spark.SparkContext.stop(SparkContext.scala:2175)
        at org.apache.spark.SparkContext.stop(SparkContext.scala:2081)
        at org.apache.spark.api.java.JavaSparkContext.stop(JavaSparkContext.scala:550)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:566)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)        
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)      
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
        at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
        at java.base/java.lang.Thread.run(Thread.java:834)
24/06/07 13:58:57 ERROR ShutdownHookManager: Exception while deleting Spark temp dir: C:\Users\no2si\Documents\MarketReSearch\TodayhomeScrape\spark_temp\spark-eb98b831-105b-46fe-8173-be24ef43c323
java.io.IOException: Failed to delete: C:\Users\no2si\Documents\MarketReSearch\TodayhomeScrape\spark_temp\spark-eb98b831-105b-46fe-8173-be24ef43c323\userFiles-6cf29488-da35-4e58-8d65-1156838ddd7c\org.antlr_antlr4-runtime-4.9.3.jar
        at org.apache.spark.network.util.JavaUtils.deleteRecursivelyUsingJavaIO(JavaUtils.java:151)
        at org.apache.spark.network.util.JavaUtils.deleteRecursively(JavaUtils.java:121)
        at org.apache.spark.network.util.JavaUtils.deleteRecursivelyUsingJavaIO(JavaUtils.java:134)
        at org.apache.spark.network.util.JavaUtils.deleteRecursively(JavaUtils.java:121)
        at org.apache.spark.network.util.JavaUtils.deleteRecursivelyUsingJavaIO(JavaUtils.java:134)
        at org.apache.spark.network.util.JavaUtils.deleteRecursively(JavaUtils.java:121)
        at org.apache.spark.network.util.JavaUtils.deleteRecursively(JavaUtils.java:94)
        at org.apache.spark.util.Utils$.deleteRecursively(Utils.scala:1231)
        at org.apache.spark.util.ShutdownHookManager$.$anonfun$new$4(ShutdownHookManager.scala:65)
        at org.apache.spark.util.ShutdownHookManager$.$anonfun$new$4$adapted(ShutdownHookManager.scala:62)
        at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
        at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
        at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)       
        at org.apache.spark.util.ShutdownHookManager$.$anonfun$new$2(ShutdownHookManager.scala:62)
        at org.apache.spark.util.SparkShutdownHook.run(ShutdownHookManager.scala:214)
        at org.apache.spark.util.SparkShutdownHookManager.$anonfun$runAll$2(ShutdownHookManager.scala:188)
        at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)    
        at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:2088)      
        at org.apache.spark.util.SparkShutdownHookManager.$anonfun$runAll$1(ShutdownHookManager.scala:188)
        at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)    
        at scala.util.Try$.apply(Try.scala:213)
        at org.apache.spark.util.SparkShutdownHookManager.runAll(ShutdownHookManager.scala:188)
        at org.apache.spark.util.SparkShutdownHookManager$$anon$2.run(ShutdownHookManager.scala:178)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)        
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:834)
PS C:\Users\no2si\Documents\MarketReSearch\TodayhomeScrape> 24/06/07 13:58:57 ERROR ShutdownHookManager: Exception while deleting Spark temp dir: C:\Users\no2si\Documents\MarketReSearch\TodayhomeScrape\spark_temp\spark-eb98b831-105b-46fe-8173-be24ef43c323\userFiles-6cf29488-da35-4e58-8d65-1156838ddd7c
java.io.IOException: Failed to delete: C:\Users\no2si\Documents\MarketReSearch\TodayhomeScrape\spark_temp\spark-eb98b831-105b-46fe-8173-be24ef43c323\userFiles-6cf29488-da35-4e58-8d65-1156838ddd7c\org.antlr_antlr4-runtime-4.9.3.jar
        at org.apache.spark.network.util.JavaUtils.deleteRecursivelyUsingJavaIO(JavaUtils.java:151)
        at org.apache.spark.network.util.JavaUtils.deleteRecursively(JavaUtils.java:121)
        at org.apache.spark.network.util.JavaUtils.deleteRecursivelyUsingJavaIO(JavaUtils.java:134)
        at org.apache.spark.network.util.JavaUtils.deleteRecursively(JavaUtils.java:121)
        at org.apache.spark.network.util.JavaUtils.deleteRecursively(JavaUtils.java:94)
        at org.apache.spark.util.Utils$.deleteRecursively(Utils.scala:1231)
        at org.apache.spark.util.ShutdownHookManager$.$anonfun$new$4(ShutdownHookManager.scala:65)
        at org.apache.spark.util.ShutdownHookManager$.$anonfun$new$4$adapted(ShutdownHookManager.scala:62)
        at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
        at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
        at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)       
        at org.apache.spark.util.ShutdownHookManager$.$anonfun$new$2(ShutdownHookManager.scala:62)
        at org.apache.spark.util.SparkShutdownHook.run(ShutdownHookManager.scala:214)
        at org.apache.spark.util.SparkShutdownHookManager.$anonfun$runAll$2(ShutdownHookManager.scala:188)
        at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)    
        at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:2088)      
        at org.apache.spark.util.SparkShutdownHookManager.$anonfun$runAll$1(ShutdownHookManager.scala:188)
        at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)    
        at scala.util.Try$.apply(Try.scala:213)
        at org.apache.spark.util.SparkShutdownHookManager.runAll(ShutdownHookManager.scala:188)
        at org.apache.spark.util.SparkShutdownHookManager$$anon$2.run(ShutdownHookManager.scala:178)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)        
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:834)
SUCCESS: The process with PID 7444 (child process of PID 14792) has been terminated.
SUCCESS: The process with PID 14792 (child process of PID 13788) has been terminated.
SUCCESS: The process with PID 13788 (child process of PID 1864) has been terminated. 

Что мне следует изменить, чтобы решить эту проблему? Я был бы признателен, если бы вы рассказали мне, как правильно хранить данные JSON в таблице Delta Lake.

Кроме того, я также был бы признателен за любые советы о причинах и решениях предупреждений и сообщений об ошибках, появляющихся в журналах.

Спасибо.

Я попытался убедиться, что файл JSON загружается правильно и указанный путь к хранилищу действителен.

Чтобы проверить, правильно ли загружается файл JSON, я добавил код для печати содержимого файла JSON после его сохранения. Это позволило мне убедиться, что данные JSON записываются в файл, как и ожидалось.

df.show() ничего не показывает, значит, оно не было загружено должным образом. Я предполагаю, что ему не нравится [], т. е. вы сохранили как массив. Попробуйте по одному объекту JSON в строке, без форматирования, без [], без запятых между объектами.
Alexander Pavlov 07.06.2024 21:16
Как сделать HTTP-запрос в Javascript?
Как сделать HTTP-запрос в Javascript?
В JavaScript вы можете сделать HTTP-запрос, используя объект XMLHttpRequest или более новый API fetch. Вот пример для обоих методов:
0
1
56
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Ваш json-файл использует более одной строки, поэтому вам нужно использовать многострочный параметр для чтения файла. Добавьте .option("multiline", "true") при чтении файла json.

df = spark.read.schema(schema).option("multiline", "true").json(json_path)

Использованная литература:

Ошибка _corrupt_record при чтении файла JSON в Spark

https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameReader.json.html

Он работает идеально, я хорошо разбираюсь в Java и тестирую его как Dataset<Row> df = spark.read().schema(schema).option("multiline", "true").json("C:\\0001.TempWS\\example_data.json");

jornathan 01.07.2024 01:28

Другие вопросы по теме