Как динамически генерировать имя динамического фрейма Pyspark

У меня есть таблица, в которой есть данные, как показано на диаграмме. Я хочу создать результаты хранения в динамически генерируемых именах фреймов данных.

Например, здесь, в приведенном ниже примере, я хочу создать два разных имени фрейма данных. dnb_df и es_df и сохранить результат чтения в этих двух фреймах и распечатать структуру каждого фрейма данных

Когда я запускаю приведенный ниже код, получаю ошибку

SyntaxError: невозможно назначить оператору (TestGlue2.py, строка 66)


import sys
import boto3
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
from pyspark.sql.functions import regexp_replace, col


args = getResolvedOptions(sys.argv, ['JOB_NAME'])





sc = SparkContext()
#sc.setLogLevel('DEBUG')
glueContext = GlueContext(sc)
spark = glueContext.spark_session

#logger = glueContext.get_logger()
#logger.DEBUG('Hello Glue')
job = Job(glueContext)
job.init(args["JOB_NAME"], args)



client = boto3.client('glue', region_name='XXXXXX')
response = client.get_connection(Name='XXXXXX')
connection_properties = response['Connection']['ConnectionProperties']
URL = connection_properties['JDBC_CONNECTION_URL']
url_list = URL.split("/")
host = "{}".format(url_list[-2][:-5])
new_host=host.split('@',1)[1]
port = url_list[-2][-4:]
database = "{}".format(url_list[-1])
Oracle_Username = "{}".format(connection_properties['USERNAME'])
Oracle_Password = "{}".format(connection_properties['PASSWORD'])

#print("Oracle_Username:",Oracle_Username)
#print("Oracle_Password:",Oracle_Password)
print("Host:",host)
print("New Host:",new_host)
print("Port:",port)
print("Database:",database)
Oracle_jdbc_url = "jdbc:oracle:thin:@//"+new_host+":"+port+"/"+database
print("Oracle_jdbc_url:",Oracle_jdbc_url)
source_df = spark.read.format("jdbc").option("url", Oracle_jdbc_url).option("dbtable", "(select * from schema.table order by VENDOR_EXECUTION_ORDER) ").option("user", Oracle_Username).option("password", Oracle_Password).load()
vendor_data=source_df.collect()
for row  in vendor_data :
    vendor_query=row.SRC_QUERY
   row.VENDOR_NAME+'_df'= spark.read.format("jdbc").option("url", 
               Oracle_jdbc_url).option("dbtable", vendor_query).option("user", 
            Oracle_Username).option("password", Oracle_Password).load()
    print(row.VENDOR_NAME+'_df')


Добавлен вариант использования на картинке

Что такое строка 66?

UpmostScarab 19.02.2023 02:47

Отвечает ли это на ваш вопрос? SyntaxError: невозможно назначить оператору

UpmostScarab 19.02.2023 02:49

Это дает мне ошибку в последней строке, где я пытаюсь распечатать результат фрейма данных

purnima Bhatia 19.02.2023 03:29

@UpmostScarab Я ищу способ динамически генерировать имена фреймов данных, а затем использовать эти имена для печати результата, хранящегося в фреймах данных.

purnima Bhatia 19.02.2023 03:32

Вы хотите присвоить фрейму данных разные динамически генерируемые имена, если я правильно понял? В этом случае я не думаю, что это относится к искре, а к Python в целом.

Ronak Jain 19.02.2023 05:32

Кроме того, я не думаю, что вам следует назначать Dataframe для строки, вместо этого вы можете преобразовать строку как dict

Ronak Jain 19.02.2023 05:33

Привет @ronak Я получаю сообщение об ошибке при присоединении, в реальности у меня есть другой столбец org_code, на основе которого я присоединяюсь AnalysisException: USING column ORG_CODE cannot be resolved on the left side of the join. The left-side columns: [VENDOR_NAME, SRC_QUERY, VENDOR_EXECUTION_ORDER]

purnima Bhatia 20.02.2023 19:09

Кажется, столбец отсутствует в левом фрейме данных, или, может быть, вы используете неправильный фрейм данных, можете ли вы поделиться кодом?

Ronak Jain 20.02.2023 19:15

Да, Ронак, я определил проблему. Это была глупая ошибка. Извини за это :)

purnima Bhatia 20.02.2023 19:20
Почему в Python есть оператор "pass"?
Почему в Python есть оператор "pass"?
Оператор pass в Python - это простая концепция, которую могут быстро освоить даже новички без опыта программирования.
Некоторые методы, о которых вы не знали, что они существуют в Python
Некоторые методы, о которых вы не знали, что они существуют в Python
Python - самый известный и самый простой в изучении язык в наши дни. Имея широкий спектр применения в области машинного обучения, Data Science,...
Основы Python Часть I
Основы Python Часть I
Вы когда-нибудь задумывались, почему в программах на Python вы видите приведенный ниже код?
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
Алиса и Боб имеют неориентированный граф из n узлов и трех типов ребер:
Оптимизация кода с помощью тернарного оператора Python
Оптимизация кода с помощью тернарного оператора Python
И последнее, что мы хотели бы показать вам, прежде чем двигаться дальше, это
Советы по эффективной веб-разработке с помощью Python
Советы по эффективной веб-разработке с помощью Python
Как веб-разработчик, Python может стать мощным инструментом для создания эффективных и масштабируемых веб-приложений.
0
9
62
2
Перейти к ответу Данный вопрос помечен как решенный

Ответы 2

Обновление: как обсуждалось в комментариях, ваше требование состоит в том, чтобы в дальнейшем присоединиться ко всем другим кадрам данных.

for row in vendor_data:
  rowAsDict=row.asDict()
  # Here you can use any variable as rowAsDict is not going to be used anywhere else anyway 
  rowAsDict[rowAsDict["VENDOR_NAME"]+"_df"] = spark.sql(rowAsDict["SOURCE_QUERY"])
  main_dataframe=main_dataframe.join(rowAsDict[rowAsDict["VENDOR_NAME"]+"_df"], "acc_id")

Ввод main_dataframe:

source_df :

View1 и View2:

Вывод main_dataframe

Если я правильно понял, вам нужно генерировать VENDOR_NAME_DF динамически.

Вы не сможете назначать объект Row, и при этом не будет полезно назначать dataframe строке, поскольку вы не можете создать Dataframe со столбцом типа Dataframe.

Тем не менее, вы можете преобразовать строку в словарь с помощью asDict и использовать его вместо этого.

Это будет работать:

vendor_data=source_df.collect()

for row in vendor_data:
  rowAsDict=row.asDict()
  # Replace this with spark.read() or any way to create a Dataframe
  rowAsDict[rowAsDict["VENDOR_NAME"]+"_df"] = spark.sql(rowAsDict["SOURCE_QUERY"]) 
  rowAsDict[rowAsDict["VENDOR_NAME"]+"_df"].show() 

Входной источник_DF:

Результат SOURCE_QUERY:

Выход (из rowAsDict[rowAsDict["VENDOR_NAME"]+"_df"].show()):

Последняя строка AsDict:

{'VENDOR_NAME': 'Name1', 'SOURCE_QUERY': 'select * from view1', 'Name1_df': DataFrame[id: string, date: string, Code: string]}

На самом деле, после получения данных в кадрах данных, я хотел объединить их вместе. Возможно ли это?

purnima Bhatia 19.02.2023 13:03

Вы хотите объединить все собранные кадры данных вместе или объединить их с кадром данных source_df?

Ronak Jain 19.02.2023 13:38

Я хочу присоединиться к фрейму данных source_df. Также я хотел бы упомянуть, что в настоящее время я запускаю и заполняю все фреймы данных поставщиков в цикле. Вскоре я изменю его, чтобы обрабатывать, где все фреймы данных поставщиков будут заполняться параллельно, а затем присоединять их все к исходному фрейму данных. надеюсь, что это поможет. дайте мне знать, если все еще есть путаница, я обновлю свой вопрос более подробно.

purnima Bhatia 19.02.2023 13:48

Есть ли общий идентификатор, с которым мы можем присоединиться к этим кадрам данных?

Ronak Jain 19.02.2023 14:03

Привет, Ронак. Я отредактировал свой вопрос и добавил вариант использования в формате Excel.

purnima Bhatia 19.02.2023 14:47

позвольте мне попробовать это и дам вам знать .. спасибо

purnima Bhatia 19.02.2023 14:55

будет ли это решение работать, когда я попытаюсь создать фрейм данных каждого поставщика параллельно, а затем выполнить соединение

purnima Bhatia 19.02.2023 18:20

@purnimaBhatia Вероятно, да, поскольку порядок присоединения не имеет значения. Но я не очень хорошо разбираюсь в Python, поэтому не могу комментировать.

Ronak Jain 19.02.2023 18:37
Ответ принят как подходящий

Добавьте последние две строки в цикл for, вы сможете получить результаты. Во-первых, создается временная таблица с использованием динамического имени df. Во-вторых, показать данные в этой временной таблице.

for row  in vendor_data :
    vendor_query=row.SRC_QUERY
    spark.read.format("jdbc").option("url", 
               Oracle_jdbc_url).option("dbtable", vendor_query).option("user", 
            Oracle_Username).option("password", Oracle_Password).load().createOrReplaceTempView(row.VENDOR_NAME+'_df')   
    spark.sql("select * from "+row.VENDOR_NAME+"_df").show()
    

можно ли впоследствии объединить эти временные представления?

purnima Bhatia 20.02.2023 13:22

Да, они могут быть, они такие же, как кадры данных, единственная разница в том, что мы используем искровой API для соединения и выполнения других преобразований, когда это фрейм данных. Когда это временная таблица, вы можете выполнять те же преобразования, используя обычный sql, используя spark.sql

Meena Arumugam 20.02.2023 15:56

Другие вопросы по теме