Я хочу использовать объединение условий, если оно существует, с таблицей, на которую ссылается {{this}} в SQL-снежинке

В dbt я хочу добавить таблицу, если она существует. По сути, я создаю инкрементную модель, в которой я нахожу значения с задержкой предыдущего дня. Для первого запуска я хочу объединить сегодняшние данные вместе с {{this}}, я знаю, что для первого дня {{this}} не имеет значения, поэтому я пытаюсь пропусти это в профсоюзе.

Я использую приведенный ниже запрос, но он возвращает ошибку:

{{
    config(
        materialized='incremental',
        unique_key='date_run',
        full_refresh=true
    )
}}

with subs_count as 
(
    select 
        current_date() as date_run,
        count(distinct email_address) as distinct_email,
        subscriber
    from {{ source('fan_table_sandbox', 'international_fan_data') }}
    group by date_run,subscriber
),
aggregated_data as 
(
    select
        *
    from subs_count

    union all

    if (exists((select *
                from {{this}} if exists {{this}}
    order by date_run desc limit 1))
),
lag_values as 
(
    select  
        *,
        lag(distinct_email) over (partition by subscriber order by  date_run asc) as previous_day_count
    from aggregated_data
)
select * from lag_values

Как это можно решить?

Используйте is_incremental. docs.getdbt.com/docs/build/…

Adam Kipnis 10.07.2024 17:19
Как регистрировать все результаты тестов DBT в централизованной таблице snowflake
Как регистрировать все результаты тестов DBT в централизованной таблице snowflake
DBT имеет множество встроенных функций для автоматизации трудоемкой работы. Одной из таких функций является тест DBT.
0
1
50
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Я считаю, что вам нужно использовать is_incremental() условие.

Есть несколько дополнительных вещей:

  1. Несуществующий столбец nba_subscriber
  2. Вы берете последнюю версию date_run в aggregated_data, а затем разделяете по subscriber. Это приведет к тому, что с предыдущей даты у вас останется только один подписчик. Вместо этого вы можете использовать Rank().
  3. dbt run --full-refresh все исторические данные будут удалены. Убедитесь, что это ожидаемо. В противном случае измените конфигурацию full_refresh на false, или гораздо лучшим подходом было бы добавить поле create_date в вашу таблицу international_fan_data на этапах извлечения и загрузки. Тогда вам вообще не нужен инкремент.
{{
    config(
        materialized='incremental',
        unique_key='date_run',
        full_refresh=true
    )
}}

with subs_count as 
(
    select 
        current_date() as date_run,
        count(distinct email_address) as distinct_email,
        subscriber
    from {{ source('fan_table_sandbox', 'international_fan_data') }}
    group by date_run,subscriber
),

aggregated_data as 
(
    select
        *
    from subs_count

    {% if is_incremental() %}

    union all

    select * from {{ this }}
    qualify rank() over(order by date_run desc) = 1

    {% endif %}
),

lag_values as 
(
    select  
        *,
        lag(distinct_email) over (partition by subscriber order by  date_run asc) as previous_day_count
    from aggregated_data
)

select * from lag_values

это сработало, спасибо огромное!

Ambreen 11.07.2024 17:36

Другие вопросы по теме

Похожие вопросы