Я пытаюсь прочитать данные из таблицы Postgres с помощью Spark. Изначально я читал данные в одном потоке без использования lowerBound
, upperBound
, partitionColumn
и numPartitions
. Данные, которые я читаю, огромны, около 120 миллионов записей. Поэтому я решил читать данные параллельно, используя partitionColumn
. Я могу читать данные, но чтение их 12 параллельными потоками занимает больше времени, чем одним потоком. Я не могу понять, как я могу увидеть 12 SQL-запросов, которые генерируются для параллельного извлечения данных для каждого раздела.
Код, который я использую:
val query = s"(select * from db.testtable) as testquery"
val df = spark.read
.format("jdbc")
.option("url", jdbcurl)
.option("dbtable", query)
.option("partitionColumn","transactionbegin")
.option("numPartitions",12)
.option("driver", "org.postgresql.Driver")
.option("fetchsize", 50000)
.option("user","user")
.option("password", "password")
.option("lowerBound","2019-01-01 00:00:00")
.option("upperBound","2019-12-31 23:59:00")
.load
df.count()
Где и как я могу увидеть 12 параллельных запросов, которые создаются для параллельного чтения данных в каждом потоке?
Я вижу, что в пользовательском интерфейсе Spark создается 12 задач, но не могу найти способ увидеть, какие отдельные 12 запросов генерируются для параллельного извлечения данных из таблицы Postgres.
Можно ли каким-либо образом опустить фильтр, чтобы он считывал данные только за этот год, в данном случае за 2019 год.
Это не совсем несколько запросов, но на самом деле он покажет план выполнения, оптимизированный Spark на основе ваших запросов. Это может быть не идеально в зависимости от этапов, которые вам нужно выполнить.
Вы можете написать свой даг в виде DataFrame, и перед фактическим вызовом действия вы можете использовать для него метод explain()
. Читать ее может быть сложно, но она перевернута. Источник находится внизу, пока читаешь это. Это может показаться немного необычным, если вы попытаетесь читать, поэтому начните с базовых преобразований и продвигайтесь шаг за шагом, если вы читаете впервые.
Я не знаю, что вы имеете в виду под дополнительной информацией, а также не уверен, почему параллельное чтение может занять больше времени. Это также зависит от объема данных. Если данных очень мало для целей обучения, может случиться так, что один поток может быть быстрее. Кроме того, в плане, если вы видите операцию обмена, это может быть причиной такой задержки. Вы также можете увидеть, по какому столбцу он разделяет данные в этом плане. Я предполагаю, что это будет RoundRobin, что означает, что это в основном случайно.
Оператор SQL печатается с использованием уровня журнала «info», см. здесь . Вам нужно изменить уровень журнала Spark на «информация», чтобы увидеть SQL. Кроме того, он напечатал только условие where , как и здесь.
Вы также можете просмотреть SQL в своей базе данных Postgresql, используя представление pg_stat_statements
, которое требует установки отдельного плагина. Есть способ зарегистрировать SQL-запросы и просмотреть их, как , упомянутое здесь .
Я подозреваю, что параллелизм для вас медленный, потому что в столбце «transactionbegin» вашей таблицы нет индекса. partitionColumn
должен быть проиндексирован, иначе он будет сканировать всю таблицу во всех этих параллельных сеансах, которые задохнутся.
Спасибо за ответ @Salim. На самом деле я сам выяснил, что некоторые из них связаны с pg_stat_statements, но все же то, что вы упомянули, похоже на то, для чего я искал информацию. Я обязательно попробую.
какое объяснение даст. он просто указывает план, но не дает дополнительной информации о запросах. Я хотел знать, почему при параллельном чтении требуется больше времени, а при чтении в одном потоке - меньше времени.