У меня есть оператор ClickHouse в Airflow, и я хочу выполнять запросы к базе данных по расписанию. Мне нужно, чтобы переменная ds заменялась в фильтре запроса каждый раз при запуске задания. вот мой код:
default_args = {
'owner': 'nestor',
'depends_on_past': False,
'start_date': datetime(2024, 2, 26),
'email': ['[email protected]'],
'email_on_failure': False,
'execution_timeout': timedelta(minutes=15),
}
with DAG(
"agg_to_clickhouse",
schedule_interval=None,
default_args=default_args,
) as dag:
insert_task = ClickhouseOperator(
task_id = "insert_task",
ch_connection_id = "clickhouse_db_connection",
sql = """
INSERT INTO tmp.target_tbl
SELECT created_date,count() AS total_ride,
count(DISTINCT(passengerId)) AS total_passenger
FROM tmp.source_tbl
WHERE created_date >= '{{ds}}'
GROUP BY created_date
""",
dag=dag,
)
но он не заменяет ds значением. Я пробовал другие синтаксисы, но это не сработало.
нравиться:
'{{{{ds}}}}'
{{{{дс}}}}
"{{дс}}"
Я реализовал это самостоятельно
Хотя я рекомендую не изобретать велосипед, вы можете посмотреть , как плагин реализует шаблонные поля . Вам необходимо объявить template_fields
и template_fields_renderers
для Airflow, чтобы отобразить значение в виде шаблона с контекстом запуска.
Я добавил эту строку кода в пользовательский оператор, и это решило мою проблему:
template_fields: Sequence[str] = ("sql",)
Вы реализуете
ClickhouseOperator
самостоятельно или используете этот плагин?