# 병렬 Task 리스트 의존성 작성하기
# Graph View
다음과 같은 Task 의존성을 가지는 DAG을 작성해봅시다.
# Code
from datetime import datetime, timedelta
from time import sleep
from airflow import DAG
from airflow.operators.python import PythonOperator
from pendulum.tz.timezone import Timezone
with DAG(
dag_id="03_parellel_task_list",
description="병렬적인 Task를 리스트로 가지는 DAG 예제입니다.",
default_args={
"owner": "heumsi",
"retries": 1,
"retry_delay": timedelta(minutes=1),
},
start_date=datetime(2022, 1, 20, tzinfo=Timezone("Asia/Seoul")),
schedule_interval="@once",
tags=["examples", "01_writing_various_task_flows"],
) as dag:
def dump() -> None:
sleep(3)
start_task = PythonOperator(task_id="start_task", python_callable=dump)
task_list = []
for i in range(5):
task = PythonOperator(task_id=f"task_{i}", python_callable=dump)
task_list.append(task)
end_task = PythonOperator(task_id="end_task", python_callable=dump)
start_task >> task_list >> end_task
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
List
에 Task Instance를 담아 Task 의존성 정의하는 부분에 사용할 수 있습니다.- 의존성 앞 뒤에 있는 Task Instance에
List
내 모든 Task Instance가 연결됩니다.