# Operator와 함께 사용하기
Taskflow API와 기존의 방식인 Operator를 같이 사용하는 방법을 알아봅시다.
# Graph View
다음과 같은 Task 의존성을 갖는 DAG을 작성할 것입니다.
# Code
from datetime import datetime
from airflow.decorators import dag, task
from airflow.operators.python import PythonOperator
from pendulum.tz.timezone import Timezone
@dag(
dag_id="03_with_operators",
description="Operator와 Taskflow API를 같이 활용하는 DAG 예제입니다.",
default_args={
"owner": "heumsi",
"retries": 0,
},
start_date=datetime(2022, 1, 20, tzinfo=Timezone("Asia/Seoul")),
schedule_interval="@once",
tags=["examples", "06_taskflow_api"],
)
def main():
def get_first_word_from_param_and_return(param: str) -> str:
"""문자열 param의 첫 번째 단어를 출력하고, param을 반환합니다."""
first_word = param.split()[0]
print(first_word)
return param
@task
def get_last_word_from_param_and_return(param: str) -> str:
"""문자열 param의 마지막 단어를 출력하고, param을 반환합니다."""
last_word = param.split()[-1]
print(last_word)
return param
def print_param(param: str) -> None:
"""문자열 param을 출력합니다"""
print(param)
task_1 = PythonOperator(
task_id="get_first_word_from_param_and_return",
python_callable=get_first_word_from_param_and_return,
op_args=[
"welcome to airflow tutorials",
],
)
task_2 = get_last_word_from_param_and_return(task_1.output)
task_3 = PythonOperator(
task_id="print_param", python_callable=print_param, op_args=[task_2]
)
task_1 >> task_2 >> task_3
dag = main()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
- 전체적으로 Operator를 사용하는 방식과 큰 차이가 없습니다.
27-33
번 라인에@task
데코레이터로 Task를 정의한 부분이 있습니다.46
번 라인에 이 Task를 Task Instance로 정의합니다.- 이 때
task_1.output
으로task_1
의return
값을 가져올 수 있습니다.
- 이 때
51
번 라인에는 Task 간 의존 관계를 정의합니다. 크게 다른 부분은 없습니다.
# Web UI & Logs
DAG을 실행하면 다음과 같은 화면을 얻습니다.
각 Task Instance의 로그를 살펴보면 다음과 같습니다.
# get_first_word_from_param_and_return
[2022-01-31, 15:52:14 UTC] {logging_mixin.py:109} INFO - welcome
1
# get_last_word_from_param_and_return
[2022-01-31, 15:57:04 UTC] {logging_mixin.py:109} INFO - tutorials
1
# print_param
[2022-01-31, 15:57:07 UTC] {logging_mixin.py:109} INFO - welcome to airflow tutorials
1