# 커스텀하게 Task간 의존성 작성하기
Taskflow API를 사용하면 Task 간 의존 관계를 명시적으로 정의해주지 않아도 자동으로 파악하여 설정되었습니다. 이는 직접 정의해줄 필요가 없어서 편리할 수 있는 반면, 의존 관계가 명시적이지 않아 불편할 수도 있습니다.
이번에는 Taskflow API에서도 명시적으로 Task간 의존 관계를 정의하는 방법에 대해 알아봅시다.
# Graph View
다음과 같은 Task 의존성을 갖는 DAG을 작성할 것입니다.
# Code
from datetime import datetime
from airflow.decorators import dag, task
from pendulum.tz.timezone import Timezone
@dag(
dag_id="02_custom_dependencies",
description="Taskflow API의 의존성을 커스텀하게 작성하는 DAG 예제입니다.",
default_args={
"owner": "heumsi",
"retries": 0,
},
start_date=datetime(2022, 1, 20, tzinfo=Timezone("Asia/Seoul")),
schedule_interval="@once",
tags=["examples", "06_taskflow_api"],
)
def main():
@task
def print_hello() -> None:
"""hello를 출력합니다."""
print("hello")
@task
def get_first_word_from_param_and_return(param: str) -> str:
"""문자열 param의 첫 번째 단어를 출력하고, param을 반환합니다."""
first_word = param.split()[0]
print(first_word)
return param
@task
def get_last_word_from_param(param: str) -> None:
"""문자열 param의 마지막 단어를 출력합니다."""
last_word = param.split()[-1]
print(last_word)
task_1 = print_hello()
task_2 = get_first_word_from_param_and_return("welcome to airflow tutorials")
task_3 = get_last_word_from_param(task_2)
task_1 >> task_2 >> task_3
dag = main()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
40-42
라인을 보면@task
를 붙인 함수를 호출한 뒤 그 반환 값을 저장합니다.- 이 값은 Task Instance가 됩니다.
44
라인에 이 Task Instnace간 의존 관계를 정의합니다.
# Web UI & Logs
DAG을 실행하면 다음과 같은 화면을 얻습니다.
각 Task Instance의 로그를 살펴보면 다음과 같습니다.
# print_hello
[2022-01-31, 15:43:04 UTC] {logging_mixin.py:109} INFO - hello
1
# get_first_word_from_param_and_return
[2022-01-31, 15:43:06 UTC] {logging_mixin.py:109} INFO - welcome
1
# get_last_word_from_param
[2022-01-31, 15:43:51 UTC] {logging_mixin.py:109} INFO - tutorials
1
← 개념 Operator와 함께 사용하기 →