# 직전 DAG Run의 상태에 따라 Task 실행 여부를 결정하기
# Graph View
다음처럼 간단한 Task 의존성을 가지는 DAG을 작성해볼 것입니다.
간단해 보이지만 하나 다른 점은 이전 DAG Run의 상태가 성공인 경우에만 현재 DAG Run을 실행한다는 것입니다. Grahp View에서는 이러한 내용이 잘 보이지 않으니 Tree View를 살펴보겠습니다.
TIP
직전 내용과 다른 점은, 직전 내용은 이전 DAG Run의 Task Instance 상태가 주요 요인이었다면, 이번 내용은 이전 DAG Run 그 자체의 상태가 주요 요인이라는 것입니다. 즉 실행 단위가 Task Instance가 아닌 DAG Run 입니다.
# Tree View
작성할 DAG을 실행하면 다음과 같은 Tree View를 얻게 됩니다.
자세히 살펴보면, 첫 번째 DAG Run이 실패한 상태고, 두 번째 DAG Run의 실행 중 상태이지만, 어떠한 Task Instance도 실행되지 않은 것을 볼 수 있습니다. 두 번째 DAG Run의 모든 Task Instance가 실행되지 않은 이유는 직전 DAG Run이 실패한 상태이기 때문입니다.
# Code
from datetime import timedelta
from time import sleep
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
with DAG(
dag_id="06_conditional_tasks_with_wait_for_downstream",
description="Task 의존성에 with_wait_for_downstream을 통해 조건문을 가지는 DAG 예제입니다.",
default_args={
"owner": "heumsi",
"retries": 0,
"retry_delay": timedelta(minutes=1),
},
start_date=days_ago(2),
schedule_interval=timedelta(days=1),
tags=["examples", "01_writing_various_task_flows"],
) as dag:
def dump() -> None:
sleep(3)
def must_fail() -> None:
raise
t1 = PythonOperator(
task_id="task_1", python_callable=dump, wait_for_downstream=True
)
t2 = PythonOperator(task_id="task_2", python_callable=must_fail)
t3 = PythonOperator(task_id="task_3", python_callable=dump)
t1 >> t2 >> t3
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
PythonOperator
의wait_for_downstream
파라미터 값을True
로 넘겨줍니다.wait_for_downstream
의 기본 값은False
입니다.PythonOperator
뿐 아니라 제공되는 모든Operator
에depends_on_past
가 존재합니다.
TIP
wait_for_downstream=True
를 다음처럼 default_args
에 넣어주면, 모든 Task에 대해 직전 DAG Run의 Task 각각의 상태에 따라 실행할 수 있게 됩니다.
with DAG(
...
default_args={
...
"wait_for_downstream": True
},
...
) as dag:
...
1
2
3
4
5
6
7
8
9
2
3
4
5
6
7
8
9