
Airflowでよく使う便利なOperator一覧
2023.11.20
はじめに

Airflowは分析基盤でよく使われるワークフロー管理プラットフォーム、僕なりに例えると「リッチなcrontab」ですね。
いろいろなタスクをOperatorというクラスを使って定義するのですが、今回はよく使う便利なOperatorをまとめてみました。
DummyOperator
何も実行しないOperatorです。
まだ具体的なタスクの実装ができていない場合の仮のタスクとしたり、複数タスクをまとめる終了タスクとして使用したり、条件分岐したあとの何もしないタスクが必要な時に使用したりします。
また、DAGを途中から実行したいことがよくあるのですが、その時に選択できる中間ポイントとして使うこともあります。
以下の例だとstart_bのDownstreamをClearするとtask_b1とtask_b2だけ実行することができます。
最初にAirflowを勉強していた時は何の意味があるんだ? と思っていましたが、たくさんのタスクのフローを定義しているとほぼ必ず必要になるOperatorです。
1 2 3 4 5 6 7 | from airflow.operators.dummy_operator import DummyOperator start = DummyOperator(task_id='start', dag=dag) start_a = DummyOperator(task_id='start_a', dag=dag) start_b = DummyOperator(task_id='start_b', dag=dag) end = DummyOperator(task_id='end', dag=dag) start >> start_a >> [task_a1, task_a2, task_a3] >> start_b >> [task_b1, task_b2] >> end |
BashOperator
Bashを実行するOperatorです。
システムコマンドや.sh
ファイルを実行することができます。
Bashゆえに何でもできてしまうのですが、Bashばかり使っていると逆にわかりにくくなってしまうことも多いです。
1 2 3 4 5 6 | from airflow.operators.bash_operator import BashOperator task = BashOperator( task_id='echo' bash_command='echo hello' dag=dag ) |
PythonOperator
Python関数を実行するOperatorです。
Pythonで処理を書けるのでいろいろなことができます。
もちろんpandasやnumpy, sklearnを使ってなにかすることができますが、あまりに重いような処理はGKEなど別の環境で動かしたほうがいいこともあります。
1 2 3 4 5 6 7 8 9 10 11 | from airflow.operators.python_operator import PythonOperator def my_python_function(): # ここにPythonのコードを書く print("Hello from PythonOperator!") python_task = PythonOperator( task_id='my_python_task', python_callable=my_python_function, dag=dag ) |
BranchPythonOperator
Python関数の実行結果によって、次に実行するべきタスクを自動で決定することができます。
条件付きのワークフローが必要な場面でとても便利です。
python_callableで渡す関数は、次に実行するタスクのtask_idを返却するようにします。
例えば、対象のデータソースが更新されているかチェックして、更新していなければ実行をスキップするようにして、効率の良いデータパイプラインを作ることができます。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 | from airflow.operators.python_operator import BranchPythonOperator def check_dataset_update_time(): # ここで条件判定を行う if some_condition: return 'do_task' else: return 'skip_task' branch_task = BranchPythonOperator( task_id='branch_task', python_callable=check_dataset_update_time, dag=dag ) do_task = DummyOperator(task_id='do_task', dag=dag) skip_task = DummyOperator(task_id='skip_task', dag=dag) branch_task >> [do_task, skip_task] |
BigQueryInsertJobOperator
BigQuery関係のクラスはいろいろあるのですが、古くて非推奨のOperatorもあるので注意が必要です。
このOperatorは、BigQueryのジョブを実行するオペレーターです。
destinationTableを指定すると、そのテーブルにクエリ結果が入ります。
destinationTableを指定せずに、CREATE OR REPLACE TABLE などを使うこともできますし、DELETE文などを実行することもできます、SQLでどのようなクエリでも実行することができます。
1 2 3 4 5 6 7 | from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator task = BigQueryInsertJobOperator( task_id='bq_task', configuration={ "query": { "query": "SELECT * FROM <code>sample_project_id.sample_dataset_id.sample_source_table_id</code>", "destinationTable": { "projectId": "sample_project_id", "datasetId": "sample_dataset_id", "tableId": "sample_destination_table_id" }, "writeDisposition": "WRITE_TRUNCATE", "useLegacySql": False, } } ) |
TaskGroup
Operatorとは少し異なりますが、タスクをグループにまとめて管理しやすくするクラスです。
Airflow2.0で追加されました。
TaskGroupをクリックすることで、開閉することができますし、マウスをホバーするとTaskGroupの中のタスク全てでの実行時間も表示されます。
特にタスクの数が多い場合や複雑なDAGの場合でもシンプルに見せることができますし、コードもシンプルになりますのでとても便利です。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 | from airflow.utils.task_group import TaskGroup start = DummyOperator(task_id='start', dag=dag) with TaskGroup("step1") as step1: task1 = DummyOperator(task_id='task1', dag=dag) task2 = DummyOperator(task_id='task2', dag=dag) task3 = DummyOperator(task_id='task3', dag=dag) task1 >> task2 >> task3 with TaskGroup("step2") as step2: task4 = DummyOperator(task_id='task4', dag=dag) task5 = DummyOperator(task_id='task5', dag=dag) task4 >> task5 end = DummyOperator(task_id='end', dag=dag) start >> step1 >> step2 >> end |
まとめ
今回は、Airflowでよく使うOperatorを紹介しました。
他にも、SlackWebhookOperator、EmailOperator、GCSToBigQueryOperator、ComputeEngineStartInstanceOperator、EC2StopInstanceOperatorなどとてもたくさんのOperatorがあります。できることが山ほどあって楽しいですね。
適切なOperatorを選んで構築することで、タスクの可読性や保守性が大幅に向上します。
Airflowを使うときは、やりたいことにあったOperatorを選び、効率的なワークフローを構築してみてください。
書いた人はこんな人

IT技術11月 20, 2023Airflowでよく使う便利なOperator一覧
IT技術11月 16, 2023AirflowでJinjaテンプレートを使ってSQLを実行する
IT技術5月 22, 2023【ISUCON部】ChatGPTとWebプログラミング
IT技術8月 20, 2021統計検定準1級(CBT)に合格した話