Airflowでよく使う便利なOperator一覧
IT技術
はじめに
Airflowは分析基盤でよく使われるワークフロー管理プラットフォーム、僕なりに例えると「リッチなcrontab」ですね。
いろいろなタスクをOperatorというクラスを使って定義するのですが、今回はよく使う便利なOperatorをまとめてみました。
DummyOperator
何も実行しないOperatorです。
まだ具体的なタスクの実装ができていない場合の仮のタスクとしたり、複数タスクをまとめる終了タスクとして使用したり、条件分岐したあとの何もしないタスクが必要な時に使用したりします。
また、DAGを途中から実行したいことがよくあるのですが、その時に選択できる中間ポイントとして使うこともあります。
以下の例だとstart_bのDownstreamをClearするとtask_b1とtask_b2だけ実行することができます。
最初にAirflowを勉強していた時は何の意味があるんだ? と思っていましたが、たくさんのタスクのフローを定義しているとほぼ必ず必要になるOperatorです。
1from airflow.operators.dummy_operator import DummyOperator
2
3start = DummyOperator(task_id='start', dag=dag)
4start_a = DummyOperator(task_id='start_a', dag=dag)
5start_b = DummyOperator(task_id='start_b', dag=dag)
6end = DummyOperator(task_id='end', dag=dag)
7start >> start_a >> [task_a1, task_a2, task_a3] >> start_b >> [task_b1, task_b2] >> end
BashOperator
Bashを実行するOperatorです。
システムコマンドや.sh
ファイルを実行することができます。
Bashゆえに何でもできてしまうのですが、Bashばかり使っていると逆にわかりにくくなってしまうことも多いです。
1from airflow.operators.bash_operator import BashOperator
2task = BashOperator(
3 task_id='echo'
4 bash_command='echo hello'
5 dag=dag
6)
PythonOperator
Python関数を実行するOperatorです。
Pythonで処理を書けるのでいろいろなことができます。
もちろんpandasやnumpy, sklearnを使ってなにかすることができますが、あまりに重いような処理はGKEなど別の環境で動かしたほうがいいこともあります。
1from airflow.operators.python_operator import PythonOperator
2
3def my_python_function():
4 # ここにPythonのコードを書く
5 print("Hello from PythonOperator!")
6
7python_task = PythonOperator(
8 task_id='my_python_task',
9 python_callable=my_python_function,
10 dag=dag
11)
BranchPythonOperator
Python関数の実行結果によって、次に実行するべきタスクを自動で決定することができます。
条件付きのワークフローが必要な場面でとても便利です。
python_callableで渡す関数は、次に実行するタスクのtask_idを返却するようにします。
例えば、対象のデータソースが更新されているかチェックして、更新していなければ実行をスキップするようにして、効率の良いデータパイプラインを作ることができます。
1from airflow.operators.python_operator import BranchPythonOperator
2
3def check_dataset_update_time():
4 # ここで条件判定を行う
5 if some_condition:
6 return 'do_task'
7 else:
8 return 'skip_task'
9
10branch_task = BranchPythonOperator(
11 task_id='branch_task',
12 python_callable=check_dataset_update_time,
13 dag=dag
14)
15
16do_task = DummyOperator(task_id='do_task', dag=dag)
17skip_task = DummyOperator(task_id='skip_task', dag=dag)
18
19branch_task >> [do_task, skip_task]
BigQueryInsertJobOperator
BigQuery関係のクラスはいろいろあるのですが、古くて非推奨のOperatorもあるので注意が必要です。
このOperatorは、BigQueryのジョブを実行するオペレーターです。
destinationTableを指定すると、そのテーブルにクエリ結果が入ります。
destinationTableを指定せずに、CREATE OR REPLACE TABLE などを使うこともできますし、DELETE文などを実行することもできます、SQLでどのようなクエリでも実行することができます。
1from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
2
3task = BigQueryInsertJobOperator(
4 task_id='bq_task',
5 configuration={
6 "query": {
7 "query": "SELECT * FROM
TaskGroup
Operatorとは少し異なりますが、タスクをグループにまとめて管理しやすくするクラスです。
Airflow2.0で追加されました。
TaskGroupをクリックすることで、開閉することができますし、マウスをホバーするとTaskGroupの中のタスク全てでの実行時間も表示されます。
特にタスクの数が多い場合や複雑なDAGの場合でもシンプルに見せることができますし、コードもシンプルになりますのでとても便利です。
1from airflow.utils.task_group import TaskGroup
2
3start = DummyOperator(task_id='start', dag=dag)
4with TaskGroup("step1") as step1:
5 task1 = DummyOperator(task_id='task1', dag=dag)
6 task2 = DummyOperator(task_id='task2', dag=dag)
7 task3 = DummyOperator(task_id='task3', dag=dag)
8 task1 >> task2 >> task3
9
10with TaskGroup("step2") as step2:
11 task4 = DummyOperator(task_id='task4', dag=dag)
12 task5 = DummyOperator(task_id='task5', dag=dag)
13 task4 >> task5
14
15end = DummyOperator(task_id='end', dag=dag)
16start >> step1 >> step2 >> end
まとめ
今回は、Airflowでよく使うOperatorを紹介しました。
他にも、SlackWebhookOperator、EmailOperator、GCSToBigQueryOperator、ComputeEngineStartInstanceOperator、EC2StopInstanceOperatorなどとてもたくさんのOperatorがあります。できることが山ほどあって楽しいですね。
適切なOperatorを選んで構築することで、タスクの可読性や保守性が大幅に向上します。
Airflowを使うときは、やりたいことにあったOperatorを選び、効率的なワークフローを構築してみてください。
ライトコードでは、エンジニアを積極採用中!
ライトコードでは、エンジニアを積極採用しています!社長と一杯しながらお話しする機会もご用意しております。そのほかカジュアル面談等もございますので、くわしくは採用情報をご確認ください。
採用情報へ
競馬が好きです。