
AirflowでJinjaテンプレートを使ってSQLを実行する
2023.11.16
はじめに

この記事では、AirflowでJinjaテンプレートを活用したSQLクエリを動的に生成し、BigQueryでそのクエリを実行する方法をまとめます。
JinjaはPythonのテンプレートエンジンで、HTMLを動的に生成するために使われることが多いですが、SQLでも「大体同じなのに少し違うクエリ」が複数あるようなときに大活躍しますよ。
AirflowでJinjaテンプレートを使う方法
AirflowはなんとデフォルトでJinjaテンプレートエンジンをサポートしています。
特に何もしなくてもDAGでJinjaのプレースホルダや変数をタスクのパラメータやクエリ内で直接使用することができます。
また、Operatorに引数を渡すことで渡した引数を埋め込むことができます。
SQLのテンプレートファイルを作成する
まずは、SQLファイルを作成します。Pythonファイル内にクエリを記述しても良いですが、SQLファイルに記述した方が管理がしやすくておすすめです。
ここでは例として、指定した父を持つ馬の芝/ダート別、枠番別の勝率を指定するクエリを作り、data/sql/aggregate_by_track_and_waku.sql
に配置しました。
{{ params.father_set }}
のところに注目してください。ここではパラメータとしていますが、DAGから渡した値がここに入るようになります。
1 2 3 4 5 6 7 8 9 10 | SELECT track, wakuban, AVG(CASE WHEN result = 1 THEN 1 ELSE 0 END) AS tanshou_ratio FROM sample_project_id.sample_keiba_dataset_dev.race_results WHERE father IN {{ params.father_set }} GROUP BY track, wakuban |
BigQueryInsertJobOperatorでタスクを定義する
今度はクエリを実行するタスクをDAGに定義します。
まず、DAGクラスの引数 template_searchpath
にAirflowがテンプレートを探すときの基準のパスをリストで指定します。ようはSQLのテンプレートを格納しているフォルダですね。
次に、BigQueryInsertJobOperatorの引数configurationにクエリを渡しますが、Jinjaの書き方で作ほど作成したSQLのテンプレートファイルをincludeするようにしておきます。
paramsにJinjaで利用する変数を渡します。今回は主要な芝の種牡馬で集計してみましょう、('ディープインパクト', 'ハーツクライ', 'ステイゴールド')
をfather_setとします。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 | from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator query_file_name = "aggregate_by_track_and_waku.sql" with DAG( 'bigquery_jinja2_example', start_date=datetime(2023, 11, 1), template_searchpath=["/home/airflow/gcs/data/sql/"], schedule_interval=None ) as dag: task = BigQueryInsertJobOperator( task_id='aggregate_famous_siba_father', configuration={ "query": { "query": f"{{% include '{query_file_name}' %}}", "useLegacySql": False, "destinationTable": { "projectId": PROJECT_NAME, "datasetId": DESTINATION_DATASET_NAME, "tableId": DESTINATION_TABLE_NAME, }, } }, params={"father_set": "('ディープインパクト', 'ハーツクライ', 'ステイゴールド')"}, ) |
では、AirflowのWebUIから実行してみます。
成功で実行完了したら、TaskInstanceをクリックしてRendered
のボタンをクリックします。
ここでは、渡したパラメータが埋め込まれた、実際に実行されたクエリを確認することができます。便利ですね。もちろん、father_setで別の条件を指定するだけで、別の条件によるクエリが実行できます。
if分岐させる
単に変数を埋め込むだけならPythonのf-stringなんかでも事足りますので、もう少しだけ複雑な例をみてみましょう。
先ほどのクエリでfather_setを渡さない場合、つまり全体での集計結果を出します。
1 2 3 4 5 6 7 8 9 10 11 12 | SELECT track, wakuban, AVG(CASE WHEN result = 1 THEN 1 ELSE 0 END) AS tanshou_ratio FROM sample_project_id.sample_keiba_dataset_dev.race_cards {%- if params.father_set|default(none) %} WHERE father IN {{ params.father_set }} {%- endif %} GROUP BY track, wakuban |
params.father_set|default(none)
は、father_setがparamsに存在しない場合のデフォルト値がnoneとなるように指定しています。
noneの場合はこのif文の条件がfalseになるので、WHERE句はレンダリングされません。
DAG側は概ね同様です。paramsだけ渡さないようにします。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 | with DAG( 'bigquery_jinja2_example2', start_date=datetime(2023, 11, 1), template_searchpath=["/home/airflow/gcs/data/sql/"], schedule_interval=None ) as dag: task = BigQueryInsertJobOperator( task_id='aggregate_famous_siba_father2', configuration={ "query": { "query": f"{{% include '{query_file_name}' %}}", "useLegacySql": False, "destinationTable": { "projectId": PROJECT_NAME, "datasetId": DESTINATION_DATASET_NAME, "tableId": DESTINATION_TABLE_NAME, }, "writeDisposition": "WRITE_TRUNCATE", } }, ) |

グローバルな変数をjinja2に渡す
開発環境と本番環境で異なるデータセットを参照したいような場合は、ひとつひとつのクエリでparamsに指定するのではなく、環境全体で指定したいですよね。
そのような場合はDAGにuser_defined_macrosを指定することでシンプルに記述することができます。
1 2 3 4 5 6 7 8 9 10 11 12 | SELECT track, wakuban, AVG(CASE WHEN result = 1 THEN 1 ELSE 0 END) AS tanshou_ratio FROM {{ KEIBA_PROJECT_ID }}.{{ KEIBA_DATASET }}.race_cards {%- if params.father_set|default(none) %} WHERE father IN {{ params.father_set }} {%- endif %} GROUP BY track, wakuban |
テーブル名を指定しているところで{{ KEIBA_PROJECT_ID }}.{{ KEIBA_DATASET }}.race_cards
としました。
params.としないところがポイントです。
DAGは以下のように定義します。
DAGの引数user_defined_macrosにDAG内でグローバルに使う変数を辞書として渡しています。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 | with DAG( 'bigquery_jinja2_example3', start_date=datetime(2023, 11, 1), template_searchpath=["/home/airflow/gcs/data/sql/"], user_defined_macros={"KEIBA_PROJECT_ID": "sample_project_id", "KEIBA_DATASET": "sample_keiba_dataset_dev"}, schedule_interval=None ) as dag: task = BigQueryInsertJobOperator( task_id='aggregate_famous_siba_father3', configuration={ "query": { "query": f"{{% include '{query_file_name}' %}}", "useLegacySql": False, "destinationTable": { "projectId": PROJECT_NAME, "datasetId": DESTINATION_DATASET_NAME, "tableId": DESTINATION_TABLE_NAME, }, "writeDisposition": "WRITE_TRUNCATE", } }, ) |
ここでは辞書をベタ書きしてDAGに渡していますが、configファイルなどに切り出して各DAGで共通して参照するようにすると良さそうです。
また、user_defined_macrosは変数だけでなく関数も定義できるらしいです。(あまりやりすぎても複雑になりそうですが)
まとめ
AirflowとBigQueryにjinja2テンプレートの機能を組み合わせて使うことで、変数や条件に応じて動的にクエリを生成し実行することができます。
jinja2テンプレートといえばWebで利用するものだと思っていたのですがSQLを書く時にも大活躍です。使っていない方は試してみてください。
書いた人はこんな人

IT技術11月 20, 2023Airflowでよく使う便利なOperator一覧
IT技術11月 16, 2023AirflowでJinjaテンプレートを使ってSQLを実行する
IT技術5月 22, 2023【ISUCON部】ChatGPTとWebプログラミング
IT技術8月 20, 2021統計検定準1級(CBT)に合格した話