# DAG에서 다른 DAG Run의 Task를 기다리기

# 사전 준비

직전 파트에서 만든 01_trigger_dagrun DAG이 있어야 합니다. 이에 대한 내용은 직전 파트에서 확인해주세요.

01_trigger_dagrun이 이미 있고, DAG Run이 이미 만들어진 상황이라면 Web UI에서 01_trigger_dagrun을 삭제해주세요. 시간이 조금 지나면 01_trigger_dagrun 은 OFF된 상태로 다시 Web UI에 등장할 것입니다.

위 준비를 모두 마치면 Web UI는 다음과 같이 01_trigger_dagrun 는 OFF 상태고, DAG Run 기록이 없는 상태여야 합니다. 그리고 00_to_be_triggered_by_other_dags 는 ON 상태여야 합니다.

img.png

# Graph View

다음과 같은 간단한 Task 의존성을 가지는 DAG을 작성해봅시다.

img_6.png

이 때 wait_for_01_trigger_dagrun.task_301_trigger_dagrun DAG Run의 task_3 Task Instance가 성공 상태가 되기까지 기다린 후 실행됩니다. 즉, 01_trigger_dagrun DAG 내 Task Instance가 실행이 완료되기까지 기다리게 됩니다.

# Code






 



















 
 
 
 
 
 
 
 
 
 




from datetime import datetime, timedelta
from time import sleep

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.sensors.external_task import ExternalTaskSensor
from pendulum.tz.timezone import Timezone

with DAG(
    dag_id="02_waiting_tasks_in_another_dagrun",
    description="다른 DAG Run의 Task를 기다리는 DAG 예제입니다",
    default_args={
        "owner": "heumsi",
        "retries": 1,
        "retry_delay": timedelta(minutes=1),
    },
    start_date=datetime(2022, 1, 20, tzinfo=Timezone("Asia/Seoul")),
    schedule_interval="@once",
    tags=["examples", "02_dependencies_between_dags"],
) as dag:

    def dump() -> None:
        sleep(3)

    task_1 = PythonOperator(task_id="task_1", python_callable=dump)
    wait_for_01_trigger_dagrun_task_3 = ExternalTaskSensor(
        task_id="wait_for_01_trigger_dagrun.task_3",
        external_dag_id="01_trigger_dagrun",
        external_task_id="task_3",
        allowed_states=["success"],
        failed_states=None,
        execution_delta=None,
        execution_date_fn=None,
        check_existence=False,
    )
    task_3 = PythonOperator(task_id="task_3", python_callable=dump)

    task_1 >> wait_for_01_trigger_dagrun_task_3 >> task_3
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
  • ExternalTaskSensor 를 통해 다른 DAG Run 내 Task Instance가 완료되기까지 기다리는 Task Instnace를 작성합니다.
    • external_dag_id 파라미터에 다른 DAG의 dag_id 를 넘깁니다.
    • external_task_id 파라미터에 external_dag_id로 지정한 DAG 내 기다릴 Task의 task_id를 넘깁니다.
    • allowed_states 파라미터에 기다릴 Task Instance의 기대하는 상태를 넘깁니다.
    • 나머지는 모두 기본 값 그대로 적었습니다. 자세한 내용은 공식 문서 (opens new window)를 확인해주세요.

# Web UI

작성한 DAG Run이 실행되면 다음처럼 ExternalTaskSensor 로 작성한 Task Instance에서 실행 중인 상태로 다른 DAG Run이 성공 상태로 되기까지 기다립니다.

img_2.png

이제 기다리고 있는 다른 DAG인 01_trigger_dagrun DAG을 ON 상태로 두어 실행해봅시다.

img_3.png

img_4.png

위처럼 01_trigger_dagrun DAG Run 이 실행 되고 성공한 상태가 되면, 다음처럼 기존의 waiting_tasks_in_another_dagrun DAG Run에서 기다림이 끝나고, 그 다음 Task들을 정상적으로 실행해나갑니다.

img_5.png

Last Updated: 3/1/2022, 1:26:42 PM