В dbt я хочу добавить таблицу, если она существует. По сути, я создаю инкрементную модель, в которой я нахожу значения с задержкой предыдущего дня. Для первого запуска я хочу объединить сегодняшние данные вместе с {{this}}
, я знаю, что для первого дня {{this}}
не имеет значения, поэтому я пытаюсь пропусти это в профсоюзе.
Я использую приведенный ниже запрос, но он возвращает ошибку:
{{
config(
materialized='incremental',
unique_key='date_run',
full_refresh=true
)
}}
with subs_count as
(
select
current_date() as date_run,
count(distinct email_address) as distinct_email,
subscriber
from {{ source('fan_table_sandbox', 'international_fan_data') }}
group by date_run,subscriber
),
aggregated_data as
(
select
*
from subs_count
union all
if (exists((select *
from {{this}} if exists {{this}}
order by date_run desc limit 1))
),
lag_values as
(
select
*,
lag(distinct_email) over (partition by subscriber order by date_run asc) as previous_day_count
from aggregated_data
)
select * from lag_values
Как это можно решить?
Я считаю, что вам нужно использовать is_incremental()
условие.
Есть несколько дополнительных вещей:
nba_subscriber
date_run
в aggregated_data
, а затем разделяете по subscriber
. Это приведет к тому, что с предыдущей даты у вас останется только один подписчик. Вместо этого вы можете использовать Rank().dbt run --full-refresh
все исторические данные будут удалены. Убедитесь, что это ожидаемо. В противном случае измените конфигурацию full_refresh на false, или гораздо лучшим подходом было бы добавить поле create_date в вашу таблицу international_fan_data
на этапах извлечения и загрузки. Тогда вам вообще не нужен инкремент.{{
config(
materialized='incremental',
unique_key='date_run',
full_refresh=true
)
}}
with subs_count as
(
select
current_date() as date_run,
count(distinct email_address) as distinct_email,
subscriber
from {{ source('fan_table_sandbox', 'international_fan_data') }}
group by date_run,subscriber
),
aggregated_data as
(
select
*
from subs_count
{% if is_incremental() %}
union all
select * from {{ this }}
qualify rank() over(order by date_run desc) = 1
{% endif %}
),
lag_values as
(
select
*,
lag(distinct_email) over (partition by subscriber order by date_run asc) as previous_day_count
from aggregated_data
)
select * from lag_values
это сработало, спасибо огромное!
Используйте
is_incremental
. docs.getdbt.com/docs/build/…