Я пытаюсь перенести большой объем данных из экземпляра RDS Postgres в VPC в кластер красного смещения в том же VPC. Я пытаюсь сделать это с помощью PySpark и AWS Glue. Я хочу перенести данные только за последние 6 месяцев, однако мой запрос, похоже, выполняет загрузку всей рассматриваемой таблицы, а затем фильтрует ее, что вызывает сбои памяти. Вот код, который у меня есть до сих пор:
from awsglue.dynamicframe import DynamicFrame
from awsglue.context import GlueContext
sc = SparkContext()
sc.setLogLevel('WARN')
glueContext = GlueContext(sc)
spark = glueContext.spark_session
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "db", table_name = "table")
datasource0.printSchema()
filtered_dyF = Filter.apply(frame = datasource0, f = lambda x: x["scandate"] > "2020-05-31")
print(filtered_dyF.count())
Можно ли как-то применить этот фильтр к запросу загрузки? Этот путь в настоящее время пытается select * from table
, и я бы хотел, чтобы вместо этого select * from table where scandate > "2020-05-31"
@mck, к сожалению, мои данные в Postgres не разделены, поэтому использование предиката push down не сработает.
или вы можете использовать jdbc-ридер pyspark? вы можете прочитать запрос в фрейме данных с помощью jdbc reader
Мне нужно использовать клеевое соединение AWS, потому что моя база данных находится в облаке VPC.
В итоге я просто использовал службу миграции баз данных AWS. На самом деле было довольно безболезненно
Попробуйте этот ответ? stackoverflow.com/a/50294863/14165730