# 직전 DAG Run의 Task 상태에 따라 Task 실행 여부를 결정하기
# Graph View
다음처럼 간단한 Task 의존성을 가지는 DAG을 작성해볼 것입니다.
간단해 보이지만 하나 다른 점은 이전 DAG Run의 Task Instance의 상태가 성공인 경우에만 현재 DAG Run의 Task Instance을 실행한다는 것입니다. Grahp View에서는 이러한 내용이 잘 보이지 않으니 Tree View를 살펴보겠습니다.
# Tree View
작성할 DAG을 실행하면 다음과 같은 Tree View를 얻게 됩니다.
자세히 살펴보면, 첫 번째 DAG Run의 두 번째 Task Instnace가 실패한 상태이기 때문에 두 번째 DAG Run의 두 번째 Task Instance가 실행되지 않은 상태임을 알 수 있습니다. (세 번째 Task Instance 역시 두 번째 Task Instance에 의존이 있기 때문에 실행되지 않았습니다.)
# Code
from datetime import timedelta
from time import sleep
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
with DAG(
dag_id="05_conditional_tasks_with_depends_on_past",
description="Task 의존성에 depends_on_past를 통해 조건문을 가지는 DAG 예제입니다.",
default_args={
"owner": "heumsi",
"retries": 0,
"retry_delay": timedelta(minutes=1),
},
start_date=days_ago(2),
schedule_interval=timedelta(days=1),
tags=["examples", "01_writing_various_task_flows"],
) as dag:
def dump() -> None:
sleep(3)
def must_fail() -> None:
raise
t1 = PythonOperator(task_id="task_1", python_callable=dump)
t2 = PythonOperator(
task_id="task_2", python_callable=must_fail, depends_on_past=True
)
t3 = PythonOperator(task_id="task_3", python_callable=dump)
t1 >> t2 >> t3
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
PythonOperator
의depends_on_past
파라미터 값을True
로 넘겨줍니다.depends_on_past
의 기본 값은False
입니다.PythonOperator
뿐 아니라 제공되는 모든Operator
에depends_on_past
가 존재합니다.
TIP
depends_on_past=True
를 다음처럼 default_args
에 넣어주면, 모든 Task에 대해 직전 DAG Run의 Task 각각의 상태에 따라 실행할 수 있게 됩니다.
with DAG(
...
default_args={
...
"depends_on_past": True
},
...
) as dag:
...
1
2
3
4
5
6
7
8
9
2
3
4
5
6
7
8
9