моя лямбда-функция запускает работу по склеиванию с помощью boto3 Glue.start_job_run
а вот мой скрипт работы с клеем
from awsglue.utils import getResolvedOptions
import sys
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from operator import add
from pyspark.sql.functions import col, regexp_extract, max
conf = SparkConf().setAppName("pyspark-etl")
sc = SparkContext.getOrCreate(conf=conf)
args = getResolvedOptions(sys.argv,['s3_target_path_key','s3_target_path_bucket'])
bucket = args['s3_target_path_bucket']
fileName = args['s3_target_path_key']
inputFilePath = f"s3a://{bucket}/{fileName}"
finalFilePath = f"s3a://glu-job-final-juiceb"
print(bucket, fileName)
rdd = sc.textFile(inputFilePath)
rdd = rdd.flatMap(lambda x: x.split(" ")).map(lambda x : (x.split(" ")[0], 1)).reduceByKey(add)
df = rdd.toDF(schema=('rawEntities string, Count int'))
df = df.withColumn("Entities", regexp_extract(col("rawEntities"),'[^!".?@:,\'*…_()]+',0))
df = df.filter(col("Entities") != "")
df = df.select("Entities","Count").groupBy("Entities").agg(max("Count").alias("Count"))
df.write.mode("append").options(header='True').parquet(finalFilePath)
Сообщение об ошибке задания Glue: «AttributeError: объект PipelinedRDD не имеет атрибута toDF».
Погуглив, я заметил, что в клее «toDF» означает DynamicFrame to DataFrame.
Это не означает RDD для DataFrame.
Как я могу преобразовать RDD в DataFrame в клею?
Вы не можете определить типы схемы, используя toDF()
. Используя метод toDF()
, мы не можем контролировать настройку схемы. Сказав это, используя метод createDataFrame()
, мы имеем полный контроль над настройкой схемы.
См. ниже логику -
from pyspark.sql.types import *
schema = StructType([ StructField('rawEntities', StringType()), StructField('Count' , IntegerType())])
df = spark.createDataFrame(data=<your rdd>, schema = schema)
Не уверен насчет aws-glue
. Но мы придерживаемся аналогичного подхода при использовании pyspark
как локально, так и Databricks
.
Это конкретно для
aws-glue
? Стандартная искра имеет параметрschema
наRDD.toDF()
. Кроме того, ошибкаAttributeError