新田(エンジニア)
AirflowのVariableとXComsを使ってみよう
新田(エンジニア)
2024.01.23
IT技術
はじめに
新田
こんにちは、分析基盤や分析のお仕事をやっている新田です。
Apache Airflowはワークフローを管理するオープンソースのプラットフォームで、分析基盤でよく使われます。
この記事では、少し複雑なタスクを定義するときに必要になるVariableとXComsを使ってみました。
この2つは、公式ドキュメントでもよく似たものとして紹介されています。
Apache Airflowはワークフローを管理するオープンソースのプラットフォームで、分析基盤でよく使われます。
この記事では、少し複雑なタスクを定義するときに必要になるVariableとXComsを使ってみました。
この2つは、公式ドキュメントでもよく似たものとして紹介されています。
Variable
Variableは、Airflowのグローバルなkey/valueストアのようなものです。
使い方はとってもシンプルです。
1from airflow.models import Variable
2
3# 値のセット
4Variable.set("sample_key", 12345)
5
6# 値の取得(デフォルト値も指定できます)
7sample_value = Variable.get("sample_key", default_var=None)
また、Jinjaテンプレートからも取得することができます。
1from airflow.operators.bash_operator import BashOperator
2
3sample_task = BashOperator(
4 task_id="sample_task",
5 bash_command="echo {{ var.value.sample_key }}",
6 dag=dag
7)
それだけではありません。AirflowのWebUIからも値を確認、書き換え、追加することができます。便利ですね。
これらのVariableは、どのDAGからも同じようにアクセスできるグローバルなものです。
XComs
Variableがグローバルなkey/valueストアだったのに対して、XComs(Cross Communicationsの略)は同一DAG内のタスク同士でデータをやり取りします。
値のセットと取得をするサンプルコードです。値のセットは、PythonOperatorを利用することが多いと思います。
以下はPythonOperatorで値をセットして、また別のPythonOperatorで値を取得する例です。
1def push_sample(**kwargs):
2 ti = kwargs["ti"]
3 # ここで何かする
4 value = 12345
5 # 値のセット
6 task_instance.xcom_push(key="sample_key", value=value)
7
8def pull_sample(**kwargs):
9 ti = kwargs["ti"]
10 # 値の取得
11 sample_value = task_instance.xcom_pull(key="sample_key", task_ids="push_task")
12 # 取得した値を利用して何かする
13
14with DAG...
15 push_task = PythonOperator(
16 task_id="push_task", python_callable=push_sample, dag=dag
17 )
18 pull_task = PythonOperator(
19 task_id="pull_task", python_callable=pull_sample, dag=dag
20 )
21 push_task >> pull_task
もちろん、Jinjaテンプレートからも値を取得することができます。
1{{ ti.xcom_pull(task_ids='push_task', key='sample_key') }}
またAirflowUIからも値を確認することができます。
まとめ
VariableやXComsを利用すれば、条件によってタスクを変化させたり、何かを調べて、その調べた結果を利用してまた何かするといったような複雑なタスクを定義することができます。
Variable、XComs自体はシンプルな機能ですので、試してみてください。
ライトコードでは、エンジニアを積極採用中!
ライトコードでは、エンジニアを積極採用しています!社長と一杯しながらお話しする機会もご用意しております。そのほかカジュアル面談等もございますので、くわしくは採用情報をご確認ください。
採用情報へ
新田(エンジニア)
Show more...競馬が好きです。