# Custom Operator
Airflow Operator는 직접 Custom 하게 작성할 수 있습니다. 작성자가 직접 Custom Operator를 만드는 방법에 대해 알아봅시다.
# Graph View
다음과 같은 간단한 Task 의존성을 가지는 DAG을 작성할 것입니다.
이 떄 print_sql
Task는 제가 직접 작성한 Custom Operator로 만들어진 Task입니다.
# Code
from datetime import datetime
from typing import Any, Dict
from airflow import DAG
from airflow.models.baseoperator import BaseOperator
from pendulum.tz.timezone import Timezone
class PrintSQLOperator(BaseOperator):
ui_color = "#000000"
ui_fgcolor = "#ffffff"
template_fields = ["name", "sql"]
template_ext = [".sql"]
def __init__(self, name: str, sql: str, **kwargs) -> None:
super().__init__(**kwargs)
self.name = name
self.sql = sql
def execute(self, context: Dict[str, Any]) -> None:
print(f"name: {self.name}")
print(f"sql: {self.sql}")
with DAG(
dag_id="06_custom_operator",
description="CustomOperator를 사용하는 DAG 예제입니다.",
default_args={
"owner": "heumsi",
"retries": 0,
},
start_date=datetime(2022, 1, 20, tzinfo=Timezone("Asia/Seoul")),
schedule_interval="@once",
tags=["examples", "04_using_various_operators"],
) as dag:
task = PrintSQLOperator(
task_id="print_sql",
name="{{ task_instance.task_id }}",
sql="06_custom_operator.sql",
)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
9-22
라인에PrintSQLOperator
라는 이름의 Custom Operator를 정의합니다.- Custom Operator는
BaseOperator
를 상속받아야 합니다. - 또한
execute
메서드를 정의해야 합니다.- Custom Operator로 정의된 Task Instance 실행 시
execute
함수 내용이 실행됩니다. - 따라서 Custom Operator의 메인 로직이
execute
함수 내에 담겨야 합니다.
- Custom Operator로 정의된 Task Instance 실행 시
ui_color
파라미터로 Web UI에서의 색을 지정할 수 있습니다. (위 Graph View에서 검은색)ui_fgcolor
파라미터로 Web UI에서의 색을 지정할 수 있습니다. (위 Graph View에서 흰색)template_fields
파라미터로 템플릿 문법을 지원할 필드를 지정할 수 있습니다.template_ext
파라미터로 템플리 문법을 지원할 파일 확장자를 지정할 수 있습니다.
- Custom Operator는
37-41
라인에PrintSQLOperator
로 Task Instance를 정의합니다.name
파라미터는PrintSQLOperator.template_fields
에 포함되기 때문에 템플릿 변수를 받을 수 있습니다.sql
파라미터는06_custom_operator.sql
라는 파일을 받고 있습니다.06_custom_operator.sql
은 다음과 같은 외부 파일입니다.SELECT * FROM my_table WHERE created_at >= '{{ data_interval_start.strftime("%Y-%m-%d %H:%M%:%S") }}'
1.sql
이PrintSQLOperator.template_ext
에 포함되기 때문에.sql
파일을 템플릿 문법으로 읽어올 수 있습니다.
# Web UI
실행 결과는 다음과 같습니다.