• トップ
  • ブログ一覧
  • Airflowでよく使う便利なOperator一覧
  • Airflowでよく使う便利なOperator一覧

    新田(エンジニア)新田(エンジニア)
    2023.11.20

    IT技術

    はじめに

    新田新田
    こんにちは、分析基盤や分析のお仕事をしている新田です。
    Airflowは分析基盤でよく使われるワークフロー管理プラットフォーム、僕なりに例えると「リッチなcrontab」ですね。
    いろいろなタスクをOperatorというクラスを使って定義するのですが、今回はよく使う便利なOperatorをまとめてみました。

    DummyOperator

    何も実行しないOperatorです。
    まだ具体的なタスクの実装ができていない場合の仮のタスクとしたり、複数タスクをまとめる終了タスクとして使用したり、条件分岐したあとの何もしないタスクが必要な時に使用したりします。
    また、DAGを途中から実行したいことがよくあるのですが、その時に選択できる中間ポイントとして使うこともあります。
    以下の例だとstart_bのDownstreamをClearするとtask_b1とtask_b2だけ実行することができます。
    最初にAirflowを勉強していた時は何の意味があるんだ? と思っていましたが、たくさんのタスクのフローを定義しているとほぼ必ず必要になるOperatorです。

    1from airflow.operators.dummy_operator import DummyOperator
    2
    3start = DummyOperator(task_id='start', dag=dag)
    4start_a = DummyOperator(task_id='start_a', dag=dag)
    5start_b = DummyOperator(task_id='start_b', dag=dag)
    6end = DummyOperator(task_id='end', dag=dag)
    7start >> start_a >> [task_a1, task_a2, task_a3] >> start_b >> [task_b1, task_b2] >> end

    BashOperator

    Bashを実行するOperatorです。
    システムコマンドや.shファイルを実行することができます。
    Bashゆえに何でもできてしまうのですが、Bashばかり使っていると逆にわかりにくくなってしまうことも多いです

    1from airflow.operators.bash_operator import BashOperator
    2task = BashOperator(
    3    task_id='echo'
    4    bash_command='echo hello'
    5    dag=dag
    6)

    PythonOperator

    Python関数を実行するOperatorです。
    Pythonで処理を書けるのでいろいろなことができます。
    もちろんpandasやnumpy, sklearnを使ってなにかすることができますが、あまりに重いような処理はGKEなど別の環境で動かしたほうがいいこともあります。

    1from airflow.operators.python_operator import PythonOperator
    2
    3def my_python_function():
    4    # ここにPythonのコードを書く
    5    print("Hello from PythonOperator!")
    6
    7python_task = PythonOperator(
    8    task_id='my_python_task',
    9    python_callable=my_python_function,
    10    dag=dag
    11)

    BranchPythonOperator

    Python関数の実行結果によって、次に実行するべきタスクを自動で決定することができます。
    条件付きのワークフローが必要な場面でとても便利です。
    python_callableで渡す関数は、次に実行するタスクのtask_idを返却するようにします。
    例えば、対象のデータソースが更新されているかチェックして、更新していなければ実行をスキップするようにして、効率の良いデータパイプラインを作ることができます。

    1from airflow.operators.python_operator import BranchPythonOperator
    2
    3def check_dataset_update_time():
    4    # ここで条件判定を行う
    5    if some_condition:
    6        return 'do_task'
    7    else:
    8        return 'skip_task'
    9
    10branch_task = BranchPythonOperator(
    11    task_id='branch_task',
    12    python_callable=check_dataset_update_time,
    13    dag=dag
    14)
    15
    16do_task = DummyOperator(task_id='do_task', dag=dag)
    17skip_task = DummyOperator(task_id='skip_task', dag=dag)
    18
    19branch_task >> [do_task, skip_task]

    BigQueryInsertJobOperator

    BigQuery関係のクラスはいろいろあるのですが、古くて非推奨のOperatorもあるので注意が必要です。
    このOperatorは、BigQueryのジョブを実行するオペレーターです。
    destinationTableを指定すると、そのテーブルにクエリ結果が入ります。
    destinationTableを指定せずに、CREATE OR REPLACE TABLE などを使うこともできますし、DELETE文などを実行することもできます、SQLでどのようなクエリでも実行することができます。

    1from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
    2
    3task = BigQueryInsertJobOperator(
    4    task_id='bq_task',
    5    configuration={
    6        "query": {
    7            "query": "SELECT * FROM

    TaskGroup

    Operatorとは少し異なりますが、タスクをグループにまとめて管理しやすくするクラスです。
    Airflow2.0で追加されました。
    TaskGroupをクリックすることで、開閉することができますし、マウスをホバーするとTaskGroupの中のタスク全てでの実行時間も表示されます。
    特にタスクの数が多い場合や複雑なDAGの場合でもシンプルに見せることができますし、コードもシンプルになりますのでとても便利です。

    1from airflow.utils.task_group import TaskGroup
    2
    3start = DummyOperator(task_id='start', dag=dag)
    4with TaskGroup("step1") as step1:
    5    task1 = DummyOperator(task_id='task1', dag=dag)
    6    task2 = DummyOperator(task_id='task2', dag=dag)
    7    task3 = DummyOperator(task_id='task3', dag=dag)
    8    task1 >> task2 >> task3
    9
    10with TaskGroup("step2") as step2:
    11    task4 = DummyOperator(task_id='task4', dag=dag)
    12    task5 = DummyOperator(task_id='task5', dag=dag)
    13    task4 >> task5
    14
    15end = DummyOperator(task_id='end', dag=dag)
    16start >> step1 >> step2 >> end

    まとめ

    今回は、Airflowでよく使うOperatorを紹介しました。
    他にも、SlackWebhookOperator、EmailOperator、GCSToBigQueryOperator、ComputeEngineStartInstanceOperator、EC2StopInstanceOperatorなどとてもたくさんのOperatorがあります。できることが山ほどあって楽しいですね。
    適切なOperatorを選んで構築することで、タスクの可読性や保守性が大幅に向上します。
    Airflowを使うときは、やりたいことにあったOperatorを選び、効率的なワークフローを構築してみてください。

    新田(エンジニア)

    新田(エンジニア)

    おすすめ記事