• トップ
  • ブログ一覧
  • AirflowのVariableとXComsを使ってみよう
  • AirflowのVariableとXComsを使ってみよう

    新田(エンジニア)新田(エンジニア)
    2024.01.23

    IT技術

    はじめに

    新田新田
    こんにちは、分析基盤や分析のお仕事をやっている新田です。
    Apache Airflowはワークフローを管理するオープンソースのプラットフォームで、分析基盤でよく使われます。
    この記事では、少し複雑なタスクを定義するときに必要になるVariableとXComsを使ってみました。
    この2つは、公式ドキュメントでもよく似たものとして紹介されています。

    Variable

    Variableは、Airflowのグローバルなkey/valueストアのようなものです。
    使い方はとってもシンプルです。

    1from airflow.models import Variable
    2
    3# 値のセット
    4Variable.set("sample_key", 12345)
    5
    6# 値の取得(デフォルト値も指定できます)
    7sample_value = Variable.get("sample_key", default_var=None)

    また、Jinjaテンプレートからも取得することができます。

    1from airflow.operators.bash_operator import BashOperator
    2
    3sample_task = BashOperator(
    4    task_id="sample_task",
    5    bash_command="echo {{ var.value.sample_key }}",
    6    dag=dag
    7)

    それだけではありません。AirflowのWebUIからも値を確認、書き換え、追加することができます。便利ですね。
    これらのVariableは、どのDAGからも同じようにアクセスできるグローバルなものです。

    XComs

    Variableがグローバルなkey/valueストアだったのに対して、XComs(Cross Communicationsの略)は同一DAG内のタスク同士でデータをやり取りします。
    値のセットと取得をするサンプルコードです。値のセットは、PythonOperatorを利用することが多いと思います。
    以下はPythonOperatorで値をセットして、また別のPythonOperatorで値を取得する例です。

    1def push_sample(**kwargs):
    2    ti = kwargs["ti"]
    3    # ここで何かする
    4    value = 12345
    5    # 値のセット
    6    task_instance.xcom_push(key="sample_key", value=value)
    7
    8def pull_sample(**kwargs):
    9    ti = kwargs["ti"]
    10    # 値の取得
    11    sample_value = task_instance.xcom_pull(key="sample_key", task_ids="push_task")
    12    # 取得した値を利用して何かする
    13
    14with DAG...
    15    push_task = PythonOperator(
    16        task_id="push_task", python_callable=push_sample, dag=dag
    17    )
    18    pull_task = PythonOperator(
    19        task_id="pull_task", python_callable=pull_sample, dag=dag
    20    )
    21    push_task >> pull_task

    もちろん、Jinjaテンプレートからも値を取得することができます。

    1{{ ti.xcom_pull(task_ids='push_task', key='sample_key') }}

    またAirflowUIからも値を確認することができます。

    まとめ

    VariableやXComsを利用すれば、条件によってタスクを変化させたり、何かを調べて、その調べた結果を利用してまた何かするといったような複雑なタスクを定義することができます。
    Variable、XComs自体はシンプルな機能ですので、試してみてください。

    新田(エンジニア)

    新田(エンジニア)

    おすすめ記事