# 병렬 Task 의존성 작성하기

# 사전 준비

Task를 병렬적(Parallel)으로 실행하기 위해서는 다음을 먼저 준비해야 합니다.

  1. Meta Database를 SQLite(default)가 아닌 Meta Database(MySQL, Postgres 등)로 바꿔주기
  2. Executor를 Sequential Executor(default)가 Executor(Local Executor 등)으로 바꿔주기

별도의 작업을 하지 않으면 Airflow는 기본적으로 Database는 SQLite를 사용하고 Executor는 Sequential Executor를 사용합니다. 그러나 이 값들은 실제 운영(Production) 환경에 올리기 적합하지 않으며, 공식 문서에도 실제 운영 환경에 Airflow를 배포할 때는 이 기본 값들을 바꿔서 쓰라고 말하고 있습니다.

여기서는 Meta Database를 Postgres로, Executor는 Local Executor로 바꾸어보겠습니다.

# 도커로 Postgres 배포하기

Postgres를 가장 간단하게 배포하는 중 하나는 도커를 이용하여 배포하는 것입니다.

TIP

도커에 대해 처음 들어보시는 분들은 subicura 님의 초보를 위한 도커 안내서 (opens new window)를 읽어보시길 추천드립니다.

새로운 셸을 열어 다음 명령어로 Postgres 컨테이너를 실행합니다.

$ docker run --name postgres -e POSTGRES_USER=airflow -e POSTGRES_PASSWORD=1234 -p 5432:5432 postgres:13
1

컨테이너가 제대로 동작하는지 또다른 셸에서 다음처럼 확인할 수 있습니다.

$ docker ps

CONTAINER ID   IMAGE         COMMAND                  CREATED         STATUS              PORTS                    NAMES
c0b60f349279   postgres:13   "docker-entrypoint.s…"   3 minutes ago   Up About a minute   0.0.0.0:5432->5432/tcp   postgre
1
2
3
4

# Airflow에 Postgres 연결하기

가상환경에 진입 후, Postgres와 연결할 수 있는 드라이버 관련 파이썬 패키지를 다음처럼 설치합니다.

$ pip install psycopg2
1

설치가 완료되면 $AIRFLOW_HOME 경로에서 airflow.cfg 파일을 찾습니다. airflow.cfg 파일 내에서 sql_alchemy_conn 를 검색해보면 이 값이 다음처럼 설정이 되어있습니다.

sql_alchemy_conn = sqlite:///./airflow.db
1

이 값을 다음처럼 postgres를 사용하도록 변경합니다.

sql_alchemy_conn = postgresql+psycopg2://airflow:1234@localhost:5432/airflow
1

TIP

sql_alchemy_conn 의 포맷은 SQLAlchemy에서 Engine 인스턴스를 만들 때 사용하는 URL 포맷입니다. 이에 대한 내용은 SQLAlchemy 공식 문서 (opens new window)에서 확인하실 수 있습니다.

참고로 SQLAlchemy (opens new window)는 파이썬 애플리케이션에서 Database와 연결하기 위해 사용하는 가장 대표적인 라이브러리이며, 수많은 파이썬 오픈소스에서 사용되고 있습니다.

설정을 완료했다면, Airflow에서 Database를 다시 초기화 해야 합니다. 다음 명령어로 초기화합니다.

$ airflow db init
1

이제 Airflow 관련 Meta Database로 SQLite가 아닌 새로 배포한 Postgres를 연결하도록 설정 세팅을 완료했습니다.

# Local Executor 사용하기

$AIRFLOW_HOME 경로의 airflow.cfg 파일 내에서 executor 를 찾습니다. 이 값은 다음처럼 되어있을 것입니다.

executor = SequentialExecutor
1

이 값을 다음처럼 바꿔줍니다.

executor = LocalExecutor
1

이제 Scheduler가 사용할 Executor로 SequentialExecutor Executor가 아닌 LocalExecutor를 사용하도록 설정 세팅을 완료했습니다.

# Airflow Scheduler 재기동하기

바꿔준 이후 Airflow Scheduler를 다시 실행합니다. (만약 켜져있는 상태면 먼저 종료시킵니다.)

$ airflow scheduler
1

# Graph View

다음과 같이 병렬로 Task 의존성을 가지는 DAG을 작성해봅시다.

image-20220122122532097

# Code






























 
 

from datetime import datetime, timedelta
from time import sleep

from airflow import DAG
from airflow.operators.python import PythonOperator
from pendulum.tz.timezone import Timezone

with DAG(
    dag_id="02_parallel_tasks",
    description="병렬적인 Task 의존성을 가지는 DAG 예제입니다.",
    default_args={
        "owner": "heumsi",
        "retries": 1,
        "retry_delay": timedelta(minutes=1),
    },
    start_date=datetime(2022, 1, 20, tzinfo=Timezone("Asia/Seoul")),
    schedule_interval="@once",
    tags=["examples", "01_writing_various_task_flows"],
) as dag:

    def dump() -> None:
        sleep(3)

    task_1 = PythonOperator(task_id="task_1", python_callable=dump)
    task_2 = PythonOperator(task_id="task_2", python_callable=dump)
    task_3 = PythonOperator(task_id="task_3", python_callable=dump)
    task_4 = PythonOperator(task_id="task_4", python_callable=dump)
    task_5 = PythonOperator(task_id="task_5", python_callable=dump)

    task_1 >> task_2 >> task_5
    task_3 >> task_4 >> task_5
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
  • Task 간 의존성은 with(...) 구문 내 어디에 작성되어도 괜찮습니다.
  • Scheduler는 DAG 파일을 파싱할 때 Task 간 의존성을 정의하는 라인을 모두 파싱하여 DAG을 그립니다.

# Web UI

image-20220122122006564

Last Updated: 3/1/2022, 1:26:42 PM