• トップ
  • ブログ一覧
  • AirflowでJinjaテンプレートを使ってSQLを実行する
  • AirflowでJinjaテンプレートを使ってSQLを実行する

    新田(エンジニア)新田(エンジニア)
    2023.11.16

    IT技術

    はじめに

    新田新田
    こんにちは、普段は分析基盤や分析のお仕事をしている新田です。
    この記事では、AirflowでJinjaテンプレートを活用したSQLクエリを動的に生成し、BigQueryでそのクエリを実行する方法をまとめます。
    JinjaはPythonのテンプレートエンジンで、HTMLを動的に生成するために使われることが多いですが、SQLでも「大体同じなのに少し違うクエリ」が複数あるようなときに大活躍しますよ。

    AirflowでJinjaテンプレートを使う方法

    AirflowはなんとデフォルトでJinjaテンプレートエンジンをサポートしています。
    特に何もしなくてもDAGでJinjaのプレースホルダや変数をタスクのパラメータやクエリ内で直接使用することができます。
    また、Operatorに引数を渡すことで渡した引数を埋め込むことができます。

    SQLのテンプレートファイルを作成する

    まずは、SQLファイルを作成します。Pythonファイル内にクエリを記述しても良いですが、SQLファイルに記述した方が管理がしやすくておすすめです。
    ここでは例として、指定した父を持つ馬の芝/ダート別、枠番別の勝率を指定するクエリを作り、data/sql/aggregate_by_track_and_waku.sql に配置しました。
    {{ params.father_set }}のところに注目してください。ここではパラメータとしていますが、DAGから渡した値がここに入るようになります。

    1SELECT
    2  track,
    3  wakuban,
    4  AVG(CASE WHEN result = 1 THEN 1 ELSE 0 END) AS tanshou_ratio
    5FROM

    BigQueryInsertJobOperatorでタスクを定義する

    今度はクエリを実行するタスクをDAGに定義します。
    まず、DAGクラスの引数 template_searchpath にAirflowがテンプレートを探すときの基準のパスをリストで指定します。ようはSQLのテンプレートを格納しているフォルダですね。
    次に、BigQueryInsertJobOperatorの引数configurationにクエリを渡しますが、Jinjaの書き方で作ほど作成したSQLのテンプレートファイルをincludeするようにしておきます。
    paramsにJinjaで利用する変数を渡します。今回は主要な芝の種牡馬で集計してみましょう、('ディープインパクト', 'ハーツクライ', 'ステイゴールド')をfather_setとします。

    1from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
    2
    3query_file_name = "aggregate_by_track_and_waku.sql"
    4with DAG(
    5    'bigquery_jinja2_example', start_date=datetime(2023, 11, 1),
    6    template_searchpath=["/home/airflow/gcs/data/sql/"],
    7    schedule_interval=None
    8) as dag:
    9    task = BigQueryInsertJobOperator(
    10        task_id='aggregate_famous_siba_father',
    11        configuration={
    12            "query": {
    13                "query": f"{{% include '{query_file_name}' %}}",
    14                "useLegacySql": False,
    15                "destinationTable": {
    16                    "projectId": PROJECT_NAME,
    17                    "datasetId": DESTINATION_DATASET_NAME,
    18                    "tableId": DESTINATION_TABLE_NAME,
    19                },
    20            }
    21        },
    22        params={"father_set": "('ディープインパクト', 'ハーツクライ', 'ステイゴールド')"},
    23    )

    では、AirflowのWebUIから実行してみます。
    成功で実行完了したら、TaskInstanceをクリックしてRenderedのボタンをクリックします。

    ここでは、渡したパラメータが埋め込まれた、実際に実行されたクエリを確認することができます。便利ですね。もちろん、father_setで別の条件を指定するだけで、別の条件によるクエリが実行できます。

    if分岐させる

    単に変数を埋め込むだけならPythonのf-stringなんかでも事足りますので、もう少しだけ複雑な例をみてみましょう。
    先ほどのクエリでfather_setを渡さない場合、つまり全体での集計結果を出します。

    1SELECT
    2  track,
    3  wakuban,
    4  AVG(CASE WHEN result = 1 THEN 1 ELSE 0 END) AS tanshou_ratio
    5FROM

    params.father_set|default(none) は、father_setがparamsに存在しない場合のデフォルト値がnoneとなるように指定しています。
    noneの場合はこのif文の条件がfalseになるので、WHERE句はレンダリングされません。
    DAG側は概ね同様です。paramsだけ渡さないようにします。

    1with DAG(
    2    'bigquery_jinja2_example2', start_date=datetime(2023, 11, 1),
    3    template_searchpath=["/home/airflow/gcs/data/sql/"],
    4    schedule_interval=None
    5) as dag:
    6    task = BigQueryInsertJobOperator(
    7        task_id='aggregate_famous_siba_father2',
    8        configuration={
    9            "query": {
    10                "query": f"{{% include '{query_file_name}' %}}",
    11                "useLegacySql": False,
    12                "destinationTable": {
    13                    "projectId": PROJECT_NAME,
    14                    "datasetId": DESTINATION_DATASET_NAME,
    15                    "tableId": DESTINATION_TABLE_NAME,
    16                },
    17                "writeDisposition": "WRITE_TRUNCATE",
    18            }
    19        },
    20    )
    同じSQLファイルで全体の集計ができました。

     

    グローバルな変数をjinja2に渡す

    開発環境と本番環境で異なるデータセットを参照したいような場合は、ひとつひとつのクエリでparamsに指定するのではなく、環境全体で指定したいですよね。
    そのような場合はDAGにuser_defined_macrosを指定することでシンプルに記述することができます。

    1SELECT
    2  track,
    3  wakuban,
    4  AVG(CASE WHEN result = 1 THEN 1 ELSE 0 END) AS tanshou_ratio
    5FROM

    テーブル名を指定しているところで{{ KEIBA_PROJECT_ID }}.{{ KEIBA_DATASET }}.race_cardsとしました。
    params.としないところがポイントです。
    DAGは以下のように定義します。
    DAGの引数user_defined_macrosにDAG内でグローバルに使う変数を辞書として渡しています。

    1with DAG(
    2    'bigquery_jinja2_example3', start_date=datetime(2023, 11, 1),
    3    template_searchpath=["/home/airflow/gcs/data/sql/"],
    4    user_defined_macros={"KEIBA_PROJECT_ID": "sample_project_id", "KEIBA_DATASET": "sample_keiba_dataset_dev"},
    5    schedule_interval=None
    6) as dag:
    7    task = BigQueryInsertJobOperator(
    8        task_id='aggregate_famous_siba_father3',
    9        configuration={
    10            "query": {
    11                "query": f"{{% include '{query_file_name}' %}}",
    12                "useLegacySql": False,
    13                "destinationTable": {
    14                    "projectId": PROJECT_NAME,
    15                    "datasetId": DESTINATION_DATASET_NAME,
    16                    "tableId": DESTINATION_TABLE_NAME,
    17                },
    18                "writeDisposition": "WRITE_TRUNCATE",
    19            }
    20        },
    21    )

    ここでは辞書をベタ書きしてDAGに渡していますが、configファイルなどに切り出して各DAGで共通して参照するようにすると良さそうです。
    また、user_defined_macrosは変数だけでなく関数も定義できるらしいです。(あまりやりすぎても複雑になりそうですが)

    まとめ

    AirflowとBigQueryにjinja2テンプレートの機能を組み合わせて使うことで、変数や条件に応じて動的にクエリを生成し実行することができます。
    jinja2テンプレートといえばWebで利用するものだと思っていたのですがSQLを書く時にも大活躍です。使っていない方は試してみてください。

    新田(エンジニア)

    新田(エンジニア)

    おすすめ記事