# DAG에서 다른 DAG Run을 트리거하기

# 사전 준비

다른 DAG에 의해 트리거될 DAG을 먼저 다음처럼 작성합니다. 트리거될 DAG은 아래 Graph View 이미지처럼 아주 간단한 Task 의존성을 가지고 있습니다.

image-20220123162512663

코드는 다음과 같습니다.

from datetime import datetime, timedelta
from time import sleep

from airflow import DAG
from airflow.operators.python import PythonOperator
from pendulum.tz.timezone import Timezone

with DAG(
    dag_id="00_to_be_triggered_by_other_dags",
    description="다른 DAG에 의해 Trigger 될 DAG 예제입니다.",
    default_args={
        "owner": "heumsi",
        "retries": 1,
        "retry_delay": timedelta(minutes=1),
    },
    start_date=datetime(2022, 1, 20, tzinfo=Timezone("Asia/Seoul")),
    schedule_interval=None,  # None으로 두었기 때문에 직접 Trigger 할 때만 DAG Run이 생성됩니다.
    tags=["examples", "02_dependencies_between_dags"],
) as dag:

    def dump() -> None:
        sleep(3)

    task_1 = PythonOperator(task_id="task_1", python_callable=dump)
    task_2 = PythonOperator(task_id="task_2", python_callable=dump)
    task_3 = PythonOperator(task_id="task_3", python_callable=dump)

    task_1 >> task_2 >> task_3
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28

작성 후 Web UI에서 이 DAG을 ON 상태로 둡니다.

image-20220123162839172

# Graph View

다음과 같은 간단한 Task 의존성을 가지는 DAG을 작성해봅시다.

image-20220123162936827

이 때 trigger_task 는 앞서 "사전 준비"에서 작성한 DAG Run을 Trigger하게 됩니다.

# Code






 



















 
 
 
 
 
 
 
 
 
 
 
 




from datetime import datetime, timedelta
from time import sleep

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from pendulum.tz.timezone import Timezone

with DAG(
    dag_id="01_trigger_dagrun",
    description="다른 DAG의 DAG Run을 Trigger하는 DAG 예제입니다",
    default_args={
        "owner": "heumsi",
        "retries": 1,
        "retry_delay": timedelta(minutes=1),
    },
    start_date=datetime(2022, 1, 20, tzinfo=Timezone("Asia/Seoul")),
    schedule_interval="@once",
    tags=["examples", "02_dependencies_between_dags"],
) as dag:

    def dump() -> None:
        sleep(3)

    task_1 = PythonOperator(task_id="task_1", python_callable=dump)
    trigger_task = TriggerDagRunOperator(
        task_id="trigger_task",
        trigger_dag_id="00_to_be_triggered_by_other_dags",
        conf=None,
        trigger_run_id=None,
        execution_date=None,
        reset_dag_run=False,
        wait_for_completion=False,
        poke_interval=60,
        allowed_states=["success"],
        failed_states=None,
    )
    task_3 = PythonOperator(task_id="task_3", python_callable=dump)

    task_1 >> trigger_task >> task_3
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
  • TriggerDagRunOperator 를 통해 다른 DAG Run을 Trigger하는 Task Instance를 작성합니다.
    • trigger_dag_id 파라미터에 Trigger할 DAG의 dag_id를 넘깁니다.
    • trigger_run_id 파라미터를 통해 Trigger된 DAG Run의 run_id 를 지정할 수 있습니다. None 으로 넘기는 경우, 자동으로 지정됩니다.
    • 나머지는 모두 기본 값 그대로 적었습니다. 자세한 내용은 공식 문서 (opens new window)를 확인해주세요.

# Web UI

작성한 DAG Run이 실행되어 모든 Task Instance가 성공했습니다.

image-20220123163031694

이 때 TriggerDagRunOperator 로 작성한 Task가 Trigger한 DAG Run 역시 다음처럼 실행되어 성공한 것을 볼 수 있습니다.

image-20220123163048155

Run Id를 살펴보면 다음처럼 자동으로 지정된 것을 볼 수 있습니다.

image-20220123163102483

현재 배포한 Airflow 내 DAG 간 의존성 현황은 Web UI 상단 메뉴의 Browse - DAG Dependencies 탭에서 다음처럼 볼 수 있습니다.

image-20220123163941903

Last Updated: 3/1/2022, 1:26:42 PM