# XComs - Task 간 데이터 주고받기
Airflow의 DAG 내에서 각각 Task Instance들의 실행은 격리되어 있습니다. 따라서 기본적으로 Task Instance 간 메모리를 공유하고 있지 않으며, 데이터를 주고 받으려면 별도의 동작이 필요합니다.
XComs는 Cross Communications의 줄임말로 Task Instance 간 데이터를 주고 받고싶을 때 사용하는 Airflow 기능입니다.
WARNING
XComs의 저장 범위는 DAG Run 내에서만 유효합니다. 서로 다른 DAG Run에서는 XComs를 통한 데이터 주고 받기가 불가능합니다.
# Grpah View
다음과 같이 XCom 을 활용한 Task 의존성을 가지는 DAG을 작성해볼 것 입니다.
# Code
전체 코드는 다음과 같습니다.
from datetime import datetime
from airflow import DAG
from airflow.models import TaskInstance
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from pendulum.tz.timezone import Timezone
with DAG(
dag_id="01_xcoms",
description="XCom을 활용하는 DAG 예제입니다.",
default_args={
"owner": "heumsi",
"retries": 0,
},
start_date=datetime(2022, 1, 20, tzinfo=Timezone("Asia/Seoul")),
schedule_interval="@once",
tags=["examples", "05_etc_features"],
) as dag:
def push_value_to_xcom(**kwargs) -> None:
task_instance: TaskInstance = kwargs["task_instance"]
task_instance.xcom_push(key="hello", value="world")
def return_value_to_xcom() -> str:
return "hello world"
def pull_value_from_xcom(**kwargs) -> None:
task_instance: TaskInstance = kwargs["task_instance"]
value_from_pushed = task_instance.xcom_pull(key="hello")
print(value_from_pushed) # world
value_from_returned = task_instance.xcom_pull(task_ids="return_value_to_xcom")
print(value_from_returned) # hello world
value_from_pushed_through_template = task_instance.xcom_pull(key="welcome")
print(value_from_pushed_through_template) # airflow
task_1 = PythonOperator(
task_id="push_value_to_xcom", python_callable=push_value_to_xcom
)
task_2 = PythonOperator(
task_id="return_value_to_xcom", python_callable=return_value_to_xcom
)
task_3 = BashOperator(
task_id="push_value_to_xcom_through_template",
bash_command='echo "{{ task_instance.xcom_push(key="welcome", value="airflow") }}"',
)
task_4 = PythonOperator(
task_id="pull_value_from_xcom", python_callable=pull_value_from_xcom
)
task_1 >> task_2 >> task_3 >> task_4
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
XComs를 활용하여 Task간 넘기는 방법은 다음처럼 이루어집니다.
- Task A Instance에서 XCom에 데이터를 저장한다. (push)
- Task B Instance에서 XCom에 저장된 데이터를 가져온다. (pull)
# XCom에 데이터 저장하기
위 코드에서 XCom에 데이터를 저장하는 부분은 다음과 같습니다.
from datetime import datetime
from airflow import DAG
from airflow.models import TaskInstance
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from pendulum.tz.timezone import Timezone
with DAG(
dag_id="01_xcoms",
description="XCom을 활용하는 DAG 예제입니다.",
default_args={
"owner": "heumsi",
"retries": 0,
},
start_date=datetime(2022, 1, 20, tzinfo=Timezone("Asia/Seoul")),
schedule_interval="@once",
tags=["examples", "05_etc_features"],
) as dag:
def push_value_to_xcom(**kwargs) -> None:
task_instance: TaskInstance = kwargs["task_instance"]
task_instance.xcom_push(key="hello", value="world")
def return_value_to_xcom() -> str:
return "hello world"
def pull_value_from_xcom(**kwargs) -> None:
task_instance: TaskInstance = kwargs["task_instance"]
value_from_pushed = task_instance.xcom_pull(key="hello")
print(value_from_pushed) # world
value_from_returned = task_instance.xcom_pull(task_ids="return_value_to_xcom")
print(value_from_returned) # hello world
value_from_pushed_through_template = task_instance.xcom_pull(key="welcome")
print(value_from_pushed_through_template) # airflow
task_1 = PythonOperator(
task_id="push_value_to_xcom", python_callable=push_value_to_xcom
)
task_2 = PythonOperator(
task_id="return_value_to_xcom", python_callable=return_value_to_xcom
)
task_3 = BashOperator(
task_id="push_value_to_xcom_through_template",
bash_command='echo "{{ task_instance.xcom_push(key="welcome", value="airflow") }}"',
)
task_4 = PythonOperator(
task_id="pull_value_from_xcom", python_callable=pull_value_from_xcom
)
task_1 >> task_2 >> task_3 >> task_4
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
- XCom에 데이터를 저장하는 첫 번째 방법은
kwargs
에서task_instance
를 가져와xcom_push
메서드를 사용하는 것입니다.21-23
라인이 이 방법을 사용합니다.- Xcom에 데이터를 저장할 때는 Key-Value 형태로 합니다.
- XCom에 데이터를 저장하는 두 번째 방법은 Callable 객체의 리턴 값을 활용하는 것입니다.
25-26
라인이 이 방법을 사용합니다.- 리턴 값은 Key-Value 형태에서 '값'이 되며, '키' 값은 자동적으로
return_value
라는 값이 됩니다.
- XCom에 데이터를 저장하는 세 번째 방법은 템플릿 문법을 활용하는 것입니다.
48-51
라인이 이 방법을 사용합니다.task_instance
는 템플릿 변수로 제공 되므로안에서 사용할 수 있습니다.
TIP
PythtonOperator
의 경우 반환 값이 XCom로 푸시됩니다.
BashOperator
의 경우 stdout의 가장 마지막 줄만 XCom으로 푸시됩니다.
(이 기능을 끄고 싶다면 BashOperator
의 생성자 파라미터 중 do_xcom_push=False
로 지정하면 됩니다.)
두 경우 모두 XCom의 Key는 return_value
로 자동 지정됩니다.
# XCom에 저장된 데이터 불러오기
위 코드에서 XCom에 저장된 데이터를 불러오는 부분은 다음과 같습니다.
from datetime import datetime
from airflow import DAG
from airflow.models import TaskInstance
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from pendulum.tz.timezone import Timezone
with DAG(
dag_id="01_xcoms",
description="XCom을 활용하는 DAG 예제입니다.",
default_args={
"owner": "heumsi",
"retries": 0,
},
start_date=datetime(2022, 1, 20, tzinfo=Timezone("Asia/Seoul")),
schedule_interval="@once",
tags=["examples", "05_etc_features"],
) as dag:
def push_value_to_xcom(**kwargs) -> None:
task_instance: TaskInstance = kwargs["task_instance"]
task_instance.xcom_push(key="hello", value="world")
def return_value_to_xcom() -> str:
return "hello world"
def pull_value_from_xcom(**kwargs) -> None:
task_instance: TaskInstance = kwargs["task_instance"]
value_from_pushed = task_instance.xcom_pull(key="hello")
print(value_from_pushed) # world
value_from_returned = task_instance.xcom_pull(task_ids="return_value_to_xcom")
print(value_from_returned) # hello world
value_from_pushed_through_template = task_instance.xcom_pull(key="welcome")
print(value_from_pushed_through_template) # airflow
task_1 = PythonOperator(
task_id="push_value_to_xcom", python_callable=push_value_to_xcom
)
task_2 = PythonOperator(
task_id="return_value_to_xcom", python_callable=return_value_to_xcom
)
task_3 = BashOperator(
task_id="push_value_to_xcom_through_template",
bash_command='echo "{{ task_instance.xcom_push(key="welcome", value="airflow") }}"',
)
task_4 = PythonOperator(
task_id="pull_value_from_xcom", python_callable=pull_value_from_xcom
)
task_1 >> task_2 >> task_3 >> task_4
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
- XCom에 데이터를 불러오는 방법은
kwargs
에서task_instance
를 가져와xcom_pull
메서드를 사용하는 것입니다.key
파라미터에 불러올 데이터의 Key를 넘겨주면 Value를 받을 수 있습니다.- XCom에 데이터를
return
으로 저장했을 시,xcom_pull
메서드에서task_ids
파라미터에 데이터를 저장한task_id
를 넘겨주어return
으로 저장한 데이터를 받아올 수 있습니다. (34
번 라인)
# Web UI
실행 결과를 확인해보면 다음과 같습니다.
pull_value_from_xcom
Task Instance의 실행 로그
XCom에 저장된 데이터는 웹 UI에서 상단 메뉴의 Admin - XComs에서 다음처럼 확인할 수 있습니다.
WARNING
XCom는 Task 간의 간단한 데이터를 주고 받기 위한 용도이며, 대용량 파일이나 데이터를 주고받기에는 적절치 않습니다. XCom은 Database에 저장되는데, XCom으로 주고받을 수 있는 데이터 사이즈 한계는 Database에 따라 다음과 같습니다.
- SQLite: 2 Go
- Postgres: 1 Go
- MySQL: 64 KB