# 조건에 따라 다른 Task 의존성 작성하기
# Graph View
다음처럼 특정 조건(Branch)에 따라 분기 처리되어 실행되는 Task Instance가 달라지는 의존성을 가지는 DAG을 작성해봅시다.
# Code
import random
from datetime import datetime, timedelta
from time import sleep
from typing import List
from airflow import DAG
from airflow.operators.python import BranchPythonOperator, PythonOperator
from pendulum.tz.timezone import Timezone
with DAG(
dag_id="04_conditional_tasks_with_branch",
description="Task 의존성에 Branch를 통해 조건문을 가지는 DAG 예제입니다.",
default_args={
"owner": "heumsi",
"retries": 1,
"retry_delay": timedelta(minutes=1),
},
start_date=datetime(2022, 1, 20, tzinfo=Timezone("Asia/Seoul")),
schedule_interval="@once",
tags=["examples", "01_writing_various_task_flows"],
) as dag:
def dump() -> None:
sleep(3)
def select_random(task_ids: List[str]) -> str:
return random.choice(task_ids)
t1 = PythonOperator(task_id="task_1", python_callable=dump)
t2 = PythonOperator(task_id="task_2", python_callable=dump)
t3 = PythonOperator(task_id="task_3", python_callable=dump)
branch = BranchPythonOperator(
task_id="branch",
python_callable=select_random,
op_kwargs={
"task_ids": [t2.task_id, t3.task_id],
},
)
t1 >> branch >> [t2, t3]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
32-38
라인에BranchPythonOperator
로 Task Instance를 정의합니다.select_random
함수를task_ids
의 파라미터 값인[t2.task_id, t3.task_id]
를 함께 넘기며 호출합니다.
26-27
라인의select_random
함수는task_ids
리스트 값중 하나를 랜덤으로 선택해 반환합니다.- 즉
t2.task_id
,t3.task_id
중 하나가 선택됩니다. - 선택된
task
가 실행됩니다.
- 즉
40
라인에BranchPythonOperator
로 정의한 Task Instance 뒤에 분기에 따라 실행할 Task Instance를 담은 리스트가 있습니다.
조건에 따라 다른 Task 의존성을 갖는 방법을 정리해보면 다음과 같습니다.
- 조건에 따라 분기 처리해야 할 Task Instance들을
List
에 담아BranchPythonOperator
Task Instance에 의존하도록 설정합니다., BranchPythonOperator
Task Instance는 이List
에 담긴 Task Instance 중 하나의 Task Instance의task_id
를 반환합니다.