У меня есть таблица, в которой есть данные, как показано на диаграмме. Я хочу создать результаты хранения в динамически генерируемых именах фреймов данных.
Например, здесь, в приведенном ниже примере, я хочу создать два разных имени фрейма данных. dnb_df и es_df и сохранить результат чтения в этих двух фреймах и распечатать структуру каждого фрейма данных
Когда я запускаю приведенный ниже код, получаю ошибку
SyntaxError: невозможно назначить оператору (TestGlue2.py, строка 66)
import sys
import boto3
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
from pyspark.sql.functions import regexp_replace, col
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
#sc.setLogLevel('DEBUG')
glueContext = GlueContext(sc)
spark = glueContext.spark_session
#logger = glueContext.get_logger()
#logger.DEBUG('Hello Glue')
job = Job(glueContext)
job.init(args["JOB_NAME"], args)
client = boto3.client('glue', region_name='XXXXXX')
response = client.get_connection(Name='XXXXXX')
connection_properties = response['Connection']['ConnectionProperties']
URL = connection_properties['JDBC_CONNECTION_URL']
url_list = URL.split("/")
host = "{}".format(url_list[-2][:-5])
new_host=host.split('@',1)[1]
port = url_list[-2][-4:]
database = "{}".format(url_list[-1])
Oracle_Username = "{}".format(connection_properties['USERNAME'])
Oracle_Password = "{}".format(connection_properties['PASSWORD'])
#print("Oracle_Username:",Oracle_Username)
#print("Oracle_Password:",Oracle_Password)
print("Host:",host)
print("New Host:",new_host)
print("Port:",port)
print("Database:",database)
Oracle_jdbc_url = "jdbc:oracle:thin:@//"+new_host+":"+port+"/"+database
print("Oracle_jdbc_url:",Oracle_jdbc_url)
source_df = spark.read.format("jdbc").option("url", Oracle_jdbc_url).option("dbtable", "(select * from schema.table order by VENDOR_EXECUTION_ORDER) ").option("user", Oracle_Username).option("password", Oracle_Password).load()
vendor_data=source_df.collect()
for row in vendor_data :
vendor_query=row.SRC_QUERY
row.VENDOR_NAME+'_df'= spark.read.format("jdbc").option("url",
Oracle_jdbc_url).option("dbtable", vendor_query).option("user",
Oracle_Username).option("password", Oracle_Password).load()
print(row.VENDOR_NAME+'_df')
Добавлен вариант использования на картинке
Отвечает ли это на ваш вопрос? SyntaxError: невозможно назначить оператору
Это дает мне ошибку в последней строке, где я пытаюсь распечатать результат фрейма данных
@UpmostScarab Я ищу способ динамически генерировать имена фреймов данных, а затем использовать эти имена для печати результата, хранящегося в фреймах данных.
Вы хотите присвоить фрейму данных разные динамически генерируемые имена, если я правильно понял? В этом случае я не думаю, что это относится к искре, а к Python в целом.
Кроме того, я не думаю, что вам следует назначать Dataframe для строки, вместо этого вы можете преобразовать строку как dict
Привет @ronak Я получаю сообщение об ошибке при присоединении, в реальности у меня есть другой столбец org_code, на основе которого я присоединяюсь AnalysisException: USING column ORG_CODE cannot be resolved on the left side of the join. The left-side columns: [VENDOR_NAME, SRC_QUERY, VENDOR_EXECUTION_ORDER]
Кажется, столбец отсутствует в левом фрейме данных, или, может быть, вы используете неправильный фрейм данных, можете ли вы поделиться кодом?
Да, Ронак, я определил проблему. Это была глупая ошибка. Извини за это :)
Обновление: как обсуждалось в комментариях, ваше требование состоит в том, чтобы в дальнейшем присоединиться ко всем другим кадрам данных.
for row in vendor_data:
rowAsDict=row.asDict()
# Here you can use any variable as rowAsDict is not going to be used anywhere else anyway
rowAsDict[rowAsDict["VENDOR_NAME"]+"_df"] = spark.sql(rowAsDict["SOURCE_QUERY"])
main_dataframe=main_dataframe.join(rowAsDict[rowAsDict["VENDOR_NAME"]+"_df"], "acc_id")
Ввод main_dataframe
:
source_df
:
View1
и View2
:
Вывод main_dataframe
Если я правильно понял, вам нужно генерировать VENDOR_NAME_DF
динамически.
Вы не сможете назначать объект Row, и при этом не будет полезно назначать dataframe строке, поскольку вы не можете создать Dataframe со столбцом типа Dataframe.
Тем не менее, вы можете преобразовать строку в словарь с помощью asDict
и использовать его вместо этого.
Это будет работать:
vendor_data=source_df.collect()
for row in vendor_data:
rowAsDict=row.asDict()
# Replace this with spark.read() or any way to create a Dataframe
rowAsDict[rowAsDict["VENDOR_NAME"]+"_df"] = spark.sql(rowAsDict["SOURCE_QUERY"])
rowAsDict[rowAsDict["VENDOR_NAME"]+"_df"].show()
Входной источник_DF:
Результат SOURCE_QUERY:
Выход (из rowAsDict[rowAsDict["VENDOR_NAME"]+"_df"].show()
):
Последняя строка AsDict:
{'VENDOR_NAME': 'Name1', 'SOURCE_QUERY': 'select * from view1', 'Name1_df': DataFrame[id: string, date: string, Code: string]}
На самом деле, после получения данных в кадрах данных, я хотел объединить их вместе. Возможно ли это?
Вы хотите объединить все собранные кадры данных вместе или объединить их с кадром данных source_df?
Я хочу присоединиться к фрейму данных source_df. Также я хотел бы упомянуть, что в настоящее время я запускаю и заполняю все фреймы данных поставщиков в цикле. Вскоре я изменю его, чтобы обрабатывать, где все фреймы данных поставщиков будут заполняться параллельно, а затем присоединять их все к исходному фрейму данных. надеюсь, что это поможет. дайте мне знать, если все еще есть путаница, я обновлю свой вопрос более подробно.
Есть ли общий идентификатор, с которым мы можем присоединиться к этим кадрам данных?
Привет, Ронак. Я отредактировал свой вопрос и добавил вариант использования в формате Excel.
позвольте мне попробовать это и дам вам знать .. спасибо
будет ли это решение работать, когда я попытаюсь создать фрейм данных каждого поставщика параллельно, а затем выполнить соединение
@purnimaBhatia Вероятно, да, поскольку порядок присоединения не имеет значения. Но я не очень хорошо разбираюсь в Python, поэтому не могу комментировать.
Добавьте последние две строки в цикл for, вы сможете получить результаты. Во-первых, создается временная таблица с использованием динамического имени df. Во-вторых, показать данные в этой временной таблице.
for row in vendor_data :
vendor_query=row.SRC_QUERY
spark.read.format("jdbc").option("url",
Oracle_jdbc_url).option("dbtable", vendor_query).option("user",
Oracle_Username).option("password", Oracle_Password).load().createOrReplaceTempView(row.VENDOR_NAME+'_df')
spark.sql("select * from "+row.VENDOR_NAME+"_df").show()
можно ли впоследствии объединить эти временные представления?
Да, они могут быть, они такие же, как кадры данных, единственная разница в том, что мы используем искровой API для соединения и выполнения других преобразований, когда это фрейм данных. Когда это временная таблица, вы можете выполнять те же преобразования, используя обычный sql, используя spark.sql
Что такое строка 66?