• トップ
  • ブログ一覧
  • 【dbt × BigQuery】増分更新について挙動を詳しく確認してみる【insert_overwrite(Dynamic partitions), append】
  • 【dbt × BigQuery】増分更新について挙動を詳しく確認してみる【insert_overwrite(Dynamic partitions), append】

    モリ(エンジニア)モリ(エンジニア)
    2025.06.23

    IT技術

    こんにちは!

    普段データ関連のお仕事をしています。

    現在所属しているチームではデータマートの作成の際にdbtを使用しており、コスト削減のために差分更新の対応中です。

    差分更新処理を書いていると「あれ?どんな挙動になるんだっけ?」ということが多いので、そんな自分に向けて増分更新の詳しい挙動を確認して、備忘録がてらブログを書いていきたいと思います。

    環境・準備

    • BigQuery
    • dbt

    差分更新の種類

    僕が所属しているチームでは「insert_overwrite(Dynamic partitions)」方式と「append」方式を採用しているので、今回はその2つについて挙動を見ていきたいと思います。

    insert_overwrite(Dynamic partitions)

    下記のモデルの挙動を確認していきます。

    1{{
    2    config(
    3        partition_by={
    4            "field": "time_jst",
    5            "data_type": "datetime",
    6            "granularity": "day",
    7        },
    8        cluster_by=["user_id"],
    9        materialized="incremental",
    10        incremental_strategy="insert_overwrite",
    11    )
    12}}
    13
    14select user_id, datetime(timestamp, "+9") as time_jst
    15from {{ ref("source_table_name") }}
    16{% if is_incremental() %} 
    17where create_date >= current_date("+9") - 7 
    18{% endif %}

    初回実行時は、下記のようなクエリが生成されます。

    1create or replace table `create_table_name`
    2  
    3partition by datetime_trunc(time_jst, day)
    4cluster by user_id
    5as (
    6  select user_id, datetime(timestamp, "+9") as time_jst
    7  from `source_table_name`
    8);

    これで「create_table_name」というテーブルが作成されます。

    そして、差分更新をするために再度実行してみると下記のようなクエリが生成されました。

    1create or replace table `create_table_name__dbt_tmp`
    2  
    3partition by datetime_trunc(time_jst, day)
    4cluster by user_id
    5as (
    6  select user_id, datetime(timestamp, "+9") as time_jst
    7  from `source_table_name`
    8  where create_date >= current_date("+9") - 7 
    9);
    10
    11-- generated script to merge partitions into `create_table_name`
    12declare dbt_partitions_for_replacement array;
    13
    14-- 1. temp table already exists, we used it to check for schema changes
    15-- 2. define partitions to update
    16set (dbt_partitions_for_replacement) = (
    17    select as struct
    18        -- IGNORE NULLS: this needs to be aligned to _dbt_max_partition, which ignores null
    19        array_agg(distinct datetime_trunc(time_jst, day) IGNORE NULLS)
    20    from `create_table_name__dbt_tmp`
    21);
    22
    23-- 3. run the merge statement
    24merge into `create_table_name` as DBT_INTERNAL_DEST
    25    using (
    26    select
    27    * from `create_table_name__dbt_tmp`
    28  ) as DBT_INTERNAL_SOURCE
    29    on FALSE
    30when not matched by source
    31      and datetime_trunc(DBT_INTERNAL_DEST.time_jst, day) in unnest(dbt_partitions_for_replacement) 
    32    then delete
    33when not matched then insert
    34    (`user_id`, `time_jst`)
    35values
    36    (`user_id`, `time_jst`)
    37;
    38
    39-- 4. clean up the temp table
    40drop table if exists `create_table_name__dbt_tmp`

    詳しく見ていきます。

     

    まず1~9行目

    1create or replace table `create_table_name__dbt_tmp`
    2  
    3partition by datetime_trunc(time_jst, day)
    4cluster by user_id
    5as (
    6  select user_id, datetime(timestamp, "+9") as time_jst
    7  from `source_table_name`
    8  where create_date >= current_date("+9") - 7 
    9);

    create_table_name__dbt_tmp」という名前の一時テーブルが作成されます

    また、差分更新になるので、モデルの16行目〜18行目に書かれているis_incremental()マクロ内のwhere句が適用されたクエリが生成されています。(上記8行目)

     

    続いて11~21行目

    1-- generated script to merge partitions into `create_table_name`
    2declare dbt_partitions_for_replacement array;
    3
    4-- 1. temp table already exists, we used it to check for schema changes
    5-- 2. define partitions to update
    6set (dbt_partitions_for_replacement) = (
    7    select as struct
    8        -- IGNORE NULLS: this needs to be aligned to _dbt_max_partition, which ignores null
    9        array_agg(distinct datetime_trunc(time_jst, day) IGNORE NULLS)
    10    from `create_table_name__dbt_tmp`
    11);

     

    declare dbt_partitions_for_replacement array;

    12行目ではarray型の「dbt_partitions_for_replacement」という変数を宣言しています。

     

    datetime_trunc(time_jst, day)

    19行目では、上記で作成したcreate_table_name__dbt_tmpテーブルのtime_jstカラムを日付で丸めています。

    例えば、time_jstが「2025-05-20 14:23:00」という値だった場合datetime_trunc(time_jst, day)は「2025-05-20 00:00:00」となります。

     

    array_agg(distinct datetime_trunc(time_jst, day) IGNORE NULLS)

    さらに19行目の記述を詳しく見ていくと

    • array_agg()は引数の値を配列にする。
    • distinctで、重複した日付は取り除かれる。
    • IGNORE NULLSにより、NULL 値は配列に含まれない。

    となるので、重複しない日付の配列が作成されます(NULLを含まない)

     

    select as struct

    そして17行目で配列を構造体として返します。

     

    set (dbt_partitions_for_replacement) = ( );

    select文で作成した構造体を「dbt_partitions_for_replacement」という変数に代入しています。

     

    続いて23~37行目

    1-- 3. run the merge statement
    2merge into `create_table_name` as DBT_INTERNAL_DEST
    3    using (
    4    select
    5    * from `create_table_name__dbt_tmp`
    6  ) as DBT_INTERNAL_SOURCE
    7    on FALSE
    8when not matched by source
    9      and datetime_trunc(DBT_INTERNAL_DEST.time_jst, day) in unnest(dbt_partitions_for_replacement) 
    10    then delete
    11when not matched then insert
    12    (`user_id`, `time_jst`)
    13values
    14    (`user_id`, `time_jst`)
    15;

    23~29行目のMERGE文で、ターゲットテーブル(1回目の実行で作られたcreate_table_name)とソーステーブル(差分更新のために一時的に作られたcreate_table_name__dbt_tmpテーブル)をJOINしようとしていますが、条件がFALSEとなっているのでJOINされません。

    そして、30行目の「when not matched by source」はターゲットテーブルにしか存在しない行に対して実行されるため、30~32行目のはターゲットテーブルのレコードが全て削除されるのですが、and datetime_trunc(DBT_INTERNAL_DEST.time_jst, day) in unnest(dbt_partitions_for_replacement)の記述により、dbt_partitions_for_replacement変数に格納された日付の行のみ(日付パーティション)削除されるようになっています。

    そして、33~36行目の記述でソーステーブルのレコードを全てターゲットテーブルにinsertしています。

    最後に40行目で一時テーブルを削除して、処理完了となっています。

     

    つまり、insert_overwrite(Dynamic partitions)

    1. ソーステーブルからモデルに書かれているフィルタを適用したクエリで一時テーブルを作成(1~9行目)
    2. 1 で作成した一時テーブルのパーティションを抽出(11~21行目)
    3. 差分更新するターゲットテーブルのパーティション列の値と、2で抽出したパーティションの値と一致する、ターゲットテーブルのレコードを削除する(24~32行目)
    4. 1 で作成した一時テーブルのレコードを全てターゲットテーブルにinsertする(33~37行目)
    5. 1 で作成した一時テーブルを削除する(40行目)

    という流れで差分更新を実現していることがわかりました!

     

    append

    下記のモデルの挙動を確認していきます。

    1{{
    2    config(
    3        partition_by={
    4            "field": "time_jst",
    5            "data_type": "datetime",
    6            "granularity": "day",
    7        },
    8        materialized="incremental",
    9    )
    10}}
    11
    12select user_id, datetime(timestamp, "+9") as time_jst
    13from {{ ref("source_table_name") }}
    14{% if is_incremental() %}
    15    where
    16        date(timestamp, "+9") >= current_date("+9") - 7
    17        and datetime(timestamp) > (
    18            select max(time_jst)
    19            from {{ this }}
    20            where date(time_jst) >= current_date("+9") - 14
    21        )
    22{% endif %}

    初回実行時は、下記のようなクエリが生成されます。

    1create or replace table `create_table_name`
    2  
    3partition by datetime_trunc(time_jst, day)
    4cluster by user_id
    5as (
    6  select user_id, datetime(timestamp, "+9") as time_jst
    7  from `source_table_name`
    8);

    これでcreate_table_nameというテーブルが作成されます。

    そして、差分更新をするために再度実行してみると下記のようなクエリが生成されました。

    1create or replace table `create_table_name__dbt_tmp`
    2  
    3partition by datetime_trunc(time_jst, day)
    4cluster by user_id
    5as (
    6  select user_id, datetime(timestamp, "+9") as time_jst
    7  from `source_table_name`
    8  where
    9      date(timestamp, "+9") >= current_date("+9") - 7
    10      and datetime(timestamp) > (
    11          select max(time_jst)
    12          from `create_table_name`
    13          where date(time_jst) >= current_date("+9") - 14
    14      )
    15);
    16
    17merge into `create_table_name` as DBT_INTERNAL_DEST
    18using (
    19  select
    20  * from `create_table_name__dbt_tmp`
    21) as DBT_INTERNAL_SOURCE
    22on (FALSE)
    23when not matched then insert
    24  (`user_id`, `access_time_jst`)
    25values
    26  (`user_id`, `access_time_jst`)

    詳しく見ていきます。

     

    まず1~15行目

    1create or replace table `create_table_name__dbt_tmp`
    2  
    3partition by datetime_trunc(time_jst, day)
    4cluster by user_id
    5as (
    6  select user_id, datetime(timestamp, "+9") as time_jst
    7  from `source_table_name`
    8  where
    9      date(timestamp, "+9") >= current_date("+9") - 7
    10      and datetime(timestamp) > (
    11          select max(time_jst)
    12          from `create_table_name`
    13          where date(time_jst) >= current_date("+9") - 14
    14      )
    15);

    create_table_name__dbt_tmp」という名前の一時テーブルが作成されます

    また、差分更新になるので、モデルの14行目〜22行目に書かれているis_incremental()マクロ内のwhere句が適用されたクエリが生成されています。(上記8~14行目)

    where区では初回実行で作成されたcreate_table_nameテーブルの中で一番最新のtime_jstを取得して(上記11~13行目)、その時間より新しい時間のレコードのみをsource_table_nameから取得して一時テーブルを作成しています。

     

    続いて17~26行目

    1merge into `create_table_name` as DBT_INTERNAL_DEST
    2using (
    3  select
    4  * from `create_table_name__dbt_tmp`
    5) as DBT_INTERNAL_SOURCE
    6on (FALSE)
    7when not matched then insert
    8  (`user_id`, `access_time_jst`)
    9values
    10  (`user_id`, `access_time_jst`)

    上記1~6行目のMERGE文で、ターゲットテーブル(1回目の実行で作られたcreate_table_name)とソーステーブル(差分更新のために一時的に作られたcreate_table_name__dbt_tmpテーブル)をJOINしようとしていますが、条件がFALSEとなっているのでJOINされません。

    そして、上記7行目の「when not matched」はソーステーブルにしか存在しない行に対して実行されるため、上記7~10行目はソーステーブルのレコードを全てターゲットテーブルにinsertしています。(ソーステーブルはターゲットテーブルの中で一番最新のtime_jstより新しい時間のレコードのみでテーブルが作成されているため、ターゲットテーブルに存在するレコードはない)

     

    つまり、append

    1. ソーステーブルからモデルに書かれているフィルタを適用したクエリで一時テーブルを作成(1~15行目)
    2. 1 で作成した一時テーブルのレコードを全てターゲットテーブルにinsertする(17~26行目)

    という流れで差分更新を実現していることがわかりました!

     

    終わりに

    普段の業務で使用頻度の少ないMERGE文が入ってきた途端、なんだこの処理は?となってしまいましたが、丁寧に見ていくことで差分更新処理の理解を深められました。

    今回紹介した2種類以外にも差分更新の種類があるので、次回の記事の題材にしようかな〜なんて思います!

     

    参考記事・動画

    ライトコードでは、エンジニアを積極採用中!

    ライトコードでは、エンジニアを積極採用しています!社長と一杯しながらお話しする機会もご用意しております。そのほかカジュアル面談等もございますので、くわしくは採用情報をご確認ください。

    採用情報へ

    モリ(エンジニア)
    モリ(エンジニア)
    Show more...

    おすすめ記事

    エンジニア大募集中!

    ライトコードでは、エンジニアを積極採用中です。

    特に、WEBエンジニアとモバイルエンジニアは是非ご応募お待ちしております!

    また、フリーランスエンジニア様も大募集中です。

    background