Я пытаюсь найти самое первое значение присваивания переменной, используя ast. например
import datetime
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.providers.google.cloud.transfers.postgres_to_gcs import PostgresToGCSOperator
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import GCSToBigQueryOperator
from airflow.operators.bash_operator import BashOperator
pg_connection0 = "airflow_db1"
pg_connection=pg_connection0
with DAG("demo_processing_dag",
start_date=datetime.datetime(2021, 1, 1),
schedule_interval=None) as dag:
task1 = BashOperator(
task_id = "dag_report",
bash_command = "airflow dags report --output json >> /home/airflow/gcs/data/parse.json"
)
postgres_to_gcs_task = PostgresToGCSOperator(
task_id=f'postgres_to_gcs',
postgres_conn_id=pg_connection,
sql=f'SELECT * FROM public.dag_code;',
bucket = "mybucket",
filename=f'data/dag_code.csv',
export_format='csv',
gzip=False,
use_server_side_cursor=False,
)
Мне нужен вывод как postgres_conn_id="airflow_db1"
Я пытался исследовать библиотеку ast, но получаю результат как pg_connection="pg_connection0"
ниже код, который я пробовал.
import ast
input_code = """
import datetime
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.providers.google.cloud.transfers.postgres_to_gcs import PostgresToGCSOperator
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import GCSToBigQueryOperator
from airflow.operators.bash_operator import BashOperator
pg_connection0 = "airflow_db1"
pg_connection=pg_connection0
with DAG("demo_processing_dag",
start_date=datetime.datetime(2021, 1, 1),
schedule_interval=None) as dag:
task1 = BashOperator(
task_id = "dag_report",
bash_command = "airflow dags report --output json >> /home/airflow/gcs/data/parse.json"
)
postgres_to_gcs_task = PostgresToGCSOperator(
task_id=f'postgres_to_gcs',
postgres_conn_id=pg_connection,
sql=f'SELECT * FROM public.dag_code;',
bucket = "mybucket",
filename=f'data/dag_code.csv',
export_format='csv',
gzip=False,
use_server_side_cursor=False,
)
"""
ast_tree = ast.parse(input_code)
# find the variable assignment for postgres_conn_id and extract its value
for node in ast.walk(ast_tree):
if isinstance(node, ast.Assign) and node.targets[0].id == 'pg_connection':
if isinstance(node.value, ast.Name):
postgres_conn_id = node.targets[0].id + ' = "' + node.value.id + '"'
else:
print( node.value.s)
postgres_conn_id = node.targets[0].id + ' = "' + node.value.s + '"'
break
# print the output
print(postgres_conn_id)
Как я могу получить желаемый результат Бармар
И чтобы получить значение "airflow_db1"
, вам также необходимо моделировать поток данных, запоминая каждое назначение и сопоставляя его с источником.
Я не очень хорошо знаю библиотеку AST. Но вы должны искать узлы вызова функций, а не узлы назначения.
Пройдитесь по узлам дерева, пока не найдете присваивание константе.
for node in ast.walk(ast_tree):
if isinstance(node, ast.keyword) and node.arg == 'postgres_conn_id':
value = node.value
while True:
for node2 in ast.walk(ast_tree):
if not isinstance(node2, ast.Assign):
continue
if node2.targets[0].id == value.id:
value = node2.value
break
if isinstance(value, ast.Constant):
break
print(f'{node.arg} = "{value.value}"')
Это не присвоение переменной. Это именованный аргумент функции.