# Variables - 전역 변수 사용하기
Variables는 어느 DAG에서든 불러올 수 있는 Airflow에서 제공하는 일종의 전역 변수입니다.
# Web UI
먼저 Variables로 데이터를 저장해봅시다.
웹 UI에서 상단 메뉴의 Admin - Variables 페이지로 진입합니다.
진입한 페이지에서 + 버튼을 클릭합니다.
Key와 Val, 그리고 Description 폼에 다음과 같이 입력합니다.
Save 버튼을 클릭하면 다음처럼 Variable이 생성됩니다.
다시 + 버튼을 눌러 하나 더 만들어봅시다. 이번엔 다음처럼 Val 값으로 JSON 데이터를 입력합니다.
Save 버튼으로 Variable을 생성합니다.
총 2개의 Variable을 생성했습니다.
TIP
Variable은 위처럼 웹 UI에서 만들 수도 있지만, CLI나 환경 변수로도 만들 수 있습니다. 이에 대한 자세한 내용은 아래 공식 문서를 확인해주세요.
# Graph View
다음과 같이 Variable 을 활용한 Task 의존성을 가지는 DAG을 작성해볼 것 입니다.
# Code
전체 코드는 다음과 같습니다.
from datetime import datetime
from airflow import DAG
from airflow.models import Variable
from airflow.operators.python import PythonOperator
from pendulum.tz.timezone import Timezone
with DAG(
dag_id="02_variables",
description="Variables를 활용하는 DAG 예제입니다.",
default_args={
"owner": "heumsi",
"retries": 0,
},
start_date=datetime(2022, 1, 20, tzinfo=Timezone("Asia/Seoul")),
schedule_interval="@once",
tags=["examples", "05_etc_features"],
) as dag:
def print_variables() -> None:
hello = Variable.get(key="hello")
print(hello) # world
not_existing_key = Variable.get(
key="not_existing_key", default_var="default value"
)
print(not_existing_key) # default value
json_data = Variable.get(key="json_data", deserialize_json=True)
print(json_data["welcome"]) # aiflow
print(json_data["author"]) # heumsi
def print_variables_through_template(
hello: str,
author: str,
not_existing_key: str,
) -> None:
print(hello) # world
print(author) # heumsi
print(not_existing_key) # default value
task_1 = PythonOperator(task_id="print_variables", python_callable=print_variables)
task_2 = PythonOperator(
task_id="print_variables_through_templates",
python_callable=print_variables_through_template,
op_kwargs={
"hello": "{{ var.value.hello }}",
"author": "{{ var.json.json_data.author }}",
"not_existing_key": "{{ var.value.get('not_existing_key', 'default value') }}",
},
)
task_1 >> task_2
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
코드에서 Variable을 불러오는 방법은 2가지 방법이 있는데 각 부분을 하나씩 알아봅시다.
# Variable.get()
으로 불러오기
from datetime import datetime
from airflow import DAG
from airflow.models import Variable
from airflow.operators.python import PythonOperator
from pendulum.tz.timezone import Timezone
with DAG(
dag_id="02_variables",
description="Variables를 활용하는 DAG 예제입니다.",
default_args={
"owner": "heumsi",
"retries": 0,
},
start_date=datetime(2022, 1, 20, tzinfo=Timezone("Asia/Seoul")),
schedule_interval="@once",
tags=["examples", "05_etc_features"],
) as dag:
def print_variables() -> None:
hello = Variable.get(key="hello")
print(hello) # world
not_existing_key = Variable.get(
key="not_existing_key", default_var="default value"
)
print(not_existing_key) # default value
json_data = Variable.get(key="json_data", deserialize_json=True)
print(json_data["welcome"]) # aiflow
print(json_data["author"]) # heumsi
def print_variables_through_template(
hello: str,
author: str,
not_existing_key: str,
) -> None:
print(hello) # world
print(author) # heumsi
print(not_existing_key) # default value
task_1 = PythonOperator(task_id="print_variables", python_callable=print_variables)
task_2 = PythonOperator(
task_id="print_variables_through_templates",
python_callable=print_variables_through_template,
op_kwargs={
"hello": "{{ var.value.hello }}",
"author": "{{ var.json.json_data.author }}",
"not_existing_key": "{{ var.value.get('not_existing_key', 'default value') }}",
},
)
task_1 >> task_2
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
Variable.get()
메서드로 Variable을 불러옵니다.key
파라미터로 불러올 Variable의 Key를 넘겨줘야 합니다.default_var
파라미터로key
에 해당하는 Variable이 없는 경우의 기본 값을 지정할 수 있습니다.deserialize_json
파라미터를True
로 주면 Val이 JSON 데이터인 경우, 이를 파싱하여 파이썬Dict
자료구조 형태로 받아옵니다.
# 템플릿 문법으로 불러오기
from datetime import datetime
from airflow import DAG
from airflow.models import Variable
from airflow.operators.python import PythonOperator
from pendulum.tz.timezone import Timezone
with DAG(
dag_id="02_variables",
description="Variables를 활용하는 DAG 예제입니다.",
default_args={
"owner": "heumsi",
"retries": 0,
},
start_date=datetime(2022, 1, 20, tzinfo=Timezone("Asia/Seoul")),
schedule_interval="@once",
tags=["examples", "05_etc_features"],
) as dag:
def print_variables() -> None:
hello = Variable.get(key="hello")
print(hello) # world
not_existing_key = Variable.get(
key="not_existing_key", default_var="default value"
)
print(not_existing_key) # default value
json_data = Variable.get(key="json_data", deserialize_json=True)
print(json_data["welcome"]) # aiflow
print(json_data["author"]) # heumsi
def print_variables_through_template(
hello: str,
author: str,
not_existing_key: str,
) -> None:
print(hello) # world
print(author) # heumsi
print(not_existing_key) # default value
task_1 = PythonOperator(task_id="print_variables", python_callable=print_variables)
task_2 = PythonOperator(
task_id="print_variables_through_templates",
python_callable=print_variables_through_template,
op_kwargs={
"hello": "{{ var.value.hello }}",
"author": "{{ var.json.json_data.author }}",
"not_existing_key": "{{ var.value.get('not_existing_key', 'default value') }}",
},
)
task_1 >> task_2
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
47-49
라인처럼 템플릿 문법으로 불러올 수 있습니다.
# Log
print_variables
Task Instance의 실행 로그는 다음과 같습니다.
print_variables_through_templates
Task Instance의 실행 로그는 다음과 같습니다.