# 조건에 따라 다른 Task 의존성 작성하기
# Graph View
다음처럼 특정 조건(Branch)에 따라 분기 처리되어 실행되는 Task Instance가 달라지는 의존성을 가지는 DAG을 작성해봅시다.

# Code
import random
from datetime import datetime, timedelta
from time import sleep
from typing import List
from airflow import DAG
from airflow.operators.python import BranchPythonOperator, PythonOperator
from pendulum.tz.timezone import Timezone
with DAG(
dag_id="04_conditional_tasks_with_branch",
description="Task 의존성에 Branch를 통해 조건문을 가지는 DAG 예제입니다.",
default_args={
"owner": "heumsi",
"retries": 1,
"retry_delay": timedelta(minutes=1),
},
start_date=datetime(2022, 1, 20, tzinfo=Timezone("Asia/Seoul")),
schedule_interval="@once",
tags=["examples", "01_writing_various_task_flows"],
) as dag:
def dump() -> None:
sleep(3)
def select_random(task_ids: List[str]) -> str:
return random.choice(task_ids)
t1 = PythonOperator(task_id="task_1", python_callable=dump)
t2 = PythonOperator(task_id="task_2", python_callable=dump)
t3 = PythonOperator(task_id="task_3", python_callable=dump)
branch = BranchPythonOperator(
task_id="branch",
python_callable=select_random,
op_kwargs={
"task_ids": [t2.task_id, t3.task_id],
},
)
t1 >> branch >> [t2, t3]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
32-38라인에BranchPythonOperator로 Task Instance를 정의합니다.select_random함수를task_ids의 파라미터 값인[t2.task_id, t3.task_id]를 함께 넘기며 호출합니다.
26-27라인의select_random함수는task_ids리스트 값중 하나를 랜덤으로 선택해 반환합니다.- 즉
t2.task_id,t3.task_id중 하나가 선택됩니다. - 선택된
task가 실행됩니다.
- 즉
40라인에BranchPythonOperator로 정의한 Task Instance 뒤에 분기에 따라 실행할 Task Instance를 담은 리스트가 있습니다.
조건에 따라 다른 Task 의존성을 갖는 방법을 정리해보면 다음과 같습니다.
- 조건에 따라 분기 처리해야 할 Task Instance들을
List에 담아BranchPythonOperatorTask Instance에 의존하도록 설정합니다., BranchPythonOperatorTask Instance는 이List에 담긴 Task Instance 중 하나의 Task Instance의task_id를 반환합니다.
# Web UI
