# 조건에 따라 다른 Task 의존성 작성하기

# Graph View

다음처럼 특정 조건(Branch)에 따라 분기 처리되어 실행되는 Task Instance가 달라지는 의존성을 가지는 DAG을 작성해봅시다.

image-20220123003919600

# Code







 


















 
 




 
 
 
 
 
 
 

 

import random
from datetime import datetime, timedelta
from time import sleep
from typing import List

from airflow import DAG
from airflow.operators.python import BranchPythonOperator, PythonOperator
from pendulum.tz.timezone import Timezone

with DAG(
    dag_id="04_conditional_tasks_with_branch",
    description="Task 의존성에 Branch를 통해 조건문을 가지는 DAG 예제입니다.",
    default_args={
        "owner": "heumsi",
        "retries": 1,
        "retry_delay": timedelta(minutes=1),
    },
    start_date=datetime(2022, 1, 20, tzinfo=Timezone("Asia/Seoul")),
    schedule_interval="@once",
    tags=["examples", "01_writing_various_task_flows"],
) as dag:

    def dump() -> None:
        sleep(3)

    def select_random(task_ids: List[str]) -> str:
        return random.choice(task_ids)

    t1 = PythonOperator(task_id="task_1", python_callable=dump)
    t2 = PythonOperator(task_id="task_2", python_callable=dump)
    t3 = PythonOperator(task_id="task_3", python_callable=dump)
    branch = BranchPythonOperator(
        task_id="branch",
        python_callable=select_random,
        op_kwargs={
            "task_ids": [t2.task_id, t3.task_id],
        },
    )

    t1 >> branch >> [t2, t3]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
  • 32-38 라인에 BranchPythonOperator 로 Task Instance를 정의합니다.
    • select_random 함수를 task_ids 의 파라미터 값인 [t2.task_id, t3.task_id] 를 함께 넘기며 호출합니다.
  • 26-27 라인의 select_random 함수는 task_ids 리스트 값중 하나를 랜덤으로 선택해 반환합니다.
    • t2.task_id, t3.task_id 중 하나가 선택됩니다.
    • 선택된 task가 실행됩니다.
  • 40 라인에 BranchPythonOperator 로 정의한 Task Instance 뒤에 분기에 따라 실행할 Task Instance를 담은 리스트가 있습니다.

조건에 따라 다른 Task 의존성을 갖는 방법을 정리해보면 다음과 같습니다.

  • 조건에 따라 분기 처리해야 할 Task Instance들을 List에 담아 BranchPythonOperator Task Instance에 의존하도록 설정합니다.,
  • BranchPythonOperator Task Instance는 이 List 에 담긴 Task Instance 중 하나의 Task Instance의 task_id를 반환합니다.

# Web UI

image-20220122171121697

Last Updated: 3/1/2022, 1:26:42 PM