Я пытаюсь передать строку f в параметр conf TriggerDagRunOperator. Эти задачи выполняются в цикле for, и external_dag_id изменяется на каждой итерации.
Я пробовал следующее:
previous_task_id = 'get_config_for_dag_' + external_dag_id
conf=f"{{{{ ti.xcom_pull(task_ids = {previous_task_id} , key='return_value') }}}}"
Однако я получаю ошибку jinja:
jinja2.exceptions.UndefinedError: «get_config_for_dag_bla_bla_sbx» не определен
Кто-нибудь видит ошибку в строке f, которую я строю?
Вот более полная часть кода:
dag_info = ['bla_bla_sbx']
for external_dag_id in dag_info:
get_config_for_dag = PythonOperator(
task_id = "get_config_for_dag_" + external_dag_id,
python_callable=get_dict_value,
op_kwargs = {
'config_dict': "{{ ti.xcom_pull(task_ids='build_dags_configuration', key='return_value') }}",
'current_dag_id': external_dag_id
}
)
previous_task_id = 'get_config_for_dag_' + external_dag_id
run_dag = TriggerDagRunOperator(
task_id = "run_dag_" + external_dag_id,
wait_for_completion=True,
trigger_dag_id=external_dag_id,
pool = "dag_pool",
conf=f"{{{{ ti.xcom_pull(task_ids = {previous_task_id} , key='return_value') }}}}",
)
Попробуйте добавить цитату в task_ids : '{previous_task_id}'
f"{{{{ ti.xcom_pull(task_ids='{previous_task_id}' , key='return_value') }}}}"
В итоге получилось так:
f"{{{{ ti.xcom_pull(task_ids='get_config_for_dag_{external_dag_id}' , key='return_value') }}}}"
Я думаю, что это было связано с кавычками, но по какой-то причине это не сработало.