# SubDAG 만들기

WARNING

SubDAG은 현재(Airflow 2.2.3 기준) 일반적으로 사용이 권장되지는 않습니다. (현재는 Task Group 사용을 권장합니다.) 다만 기존에 작성된 DAG 파일을 보게되는 경우 간혹 등장하기 때문에, 한번 둘러본다는 차원으로 보시면 좋습니다.

# Graph View

다음과 같은 Task 의존성을 가지는 DAG을 작성해볼 것입니다.

image-20220123154556951

조금 특이한 점은 가운데에 subdag Task의 색이 다른 Task들과 다르다는 것입니다. 이 Task는 SubDAG 이라 부르는 Task로, 이름 그대로 DAG 안에 존재하는 또 다른 DAG 입니다.

subdag Task는 다음과 같이 또 다른 DAG으로 구성되어 있습니다.

image-20220123154746623

이처럼 SubDAG은 DAG 내 존재하는 하위 DAG입니다. 이제 이를 어떻게 작성하는지 살펴봅시다.

# Code






 







 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 


















 
 
 




from datetime import datetime, timedelta
from time import sleep

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.subdag import SubDagOperator
from pendulum.tz.timezone import Timezone


def dump() -> None:
    sleep(3)


def get_subdag(parent_dag_id: str, dag_name: str) -> DAG:
    with DAG(
        dag_id=f"{parent_dag_id}.{dag_name}",
        description=f"{parent_dag_id}의 SubDag 입니다.",
        default_args={
            "owner": "heumsi",
            "retries": 0,
            "retry_delay": timedelta(minutes=1),
        },
        start_date=datetime(2022, 1, 20, tzinfo=Timezone("Asia/Seoul")),
        schedule_interval="@once",
        tags=["examples", "01_writing_various_task_flows"],
    ) as dag:

        sub_task_start = PythonOperator(task_id=f"sub_task_start", python_callable=dump)
        sub_tasks = []
        for i in range(5):
            sub_task = PythonOperator(task_id=f"sub_task_{i}", python_callable=dump)
            sub_tasks.append(sub_task)
        sub_task_end = PythonOperator(task_id=f"sub_task_end", python_callable=dump)

        sub_task_start >> sub_tasks >> sub_task_end

        return dag


dag_id = "08_subdag"

with DAG(
    dag_id=dag_id,
    description="Task 의존성에 SubDAG을 가지는 DAG 예제입니다.",
    default_args={
        "owner": "heumsi",
        "retries": 0,
        "retry_delay": timedelta(minutes=1),
    },
    start_date=datetime(2022, 1, 20, tzinfo=Timezone("Asia/Seoul")),
    schedule_interval="@once",
    tags=["examples", "01_writing_various_task_flows"],
) as dag:

    task_start = PythonOperator(task_id="task_start", python_callable=dump)
    subdag = SubDagOperator(
        task_id="subdag", subdag=get_subdag(parent_dag_id=dag_id, dag_name="subdag")
    )
    task_end = PythonOperator(task_id="task_end", python_callable=dump)

    task_start >> subdag >> task_end
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
  • 56-58 라인에서 SubDagOperator 를 통해 SubDAG을 정의합니다.
    • 이 때 subdag 파라미터를 통해 get_subdag 함수를 호출합니다.
    • subdag 파라미터에 넘길 함수는 DAG 객체를 반환하는 함수여야 합니다.
  • 14-37 라인에 get_subdag 함수가 정의되어 있습니다.
    • 일반적인 DAG 작성과 동일합니다. 이 DAG은 SubDAG이 됩니다.
    • 하나 주의해야할 점은 SubDAG의 dag_id{parent_dag_id}. 를 prefix로 가져야 합니다. 만약 prefix 갖지 않으면 Scheduler에서 DAG 파일을 파싱할 때 에러를 뱉습니다.
    • 이 함수는 DAG 인스턴스를 반환합니다.

WARNING

SubDAG을 사용할 때는 좀 더 주의해야할 사항들이 있습니다. 자세한 내용은 공식 문서 (opens new window)에서 확인해주세요.

# Web UI

image-20220123154845833

image-20220123154859276

image-20220123154910866

Last Updated: 3/1/2022, 1:26:42 PM