# Operator와 함께 사용하기

Taskflow API와 기존의 방식인 Operator를 같이 사용하는 방법을 알아봅시다.

# Graph View

다음과 같은 Task 의존성을 갖는 DAG을 작성할 것입니다.

img.png

# Code



 
 















 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

 
 
 
 
 
 
 
 
 
 
 

 




from datetime import datetime

from airflow.decorators import dag, task
from airflow.operators.python import PythonOperator
from pendulum.tz.timezone import Timezone


@dag(
    dag_id="03_with_operators",
    description="Operator와 Taskflow API를 같이 활용하는 DAG 예제입니다.",
    default_args={
        "owner": "heumsi",
        "retries": 0,
    },
    start_date=datetime(2022, 1, 20, tzinfo=Timezone("Asia/Seoul")),
    schedule_interval="@once",
    tags=["examples", "06_taskflow_api"],
)
def main():
    def get_first_word_from_param_and_return(param: str) -> str:
        """문자열 param의 첫 번째 단어를 출력하고, param을 반환합니다."""

        first_word = param.split()[0]
        print(first_word)
        return param

    @task
    def get_last_word_from_param_and_return(param: str) -> str:
        """문자열 param의 마지막 단어를 출력하고, param을 반환합니다."""

        last_word = param.split()[-1]
        print(last_word)
        return param

    def print_param(param: str) -> None:
        """문자열 param을 출력합니다"""
        print(param)

    task_1 = PythonOperator(
        task_id="get_first_word_from_param_and_return",
        python_callable=get_first_word_from_param_and_return,
        op_args=[
            "welcome to airflow tutorials",
        ],
    )
    task_2 = get_last_word_from_param_and_return(task_1.output)
    task_3 = PythonOperator(
        task_id="print_param", python_callable=print_param, op_args=[task_2]
    )

    task_1 >> task_2 >> task_3


dag = main()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
  • 전체적으로 Operator를 사용하는 방식과 큰 차이가 없습니다.
  • 27-33 번 라인에 @task 데코레이터로 Task를 정의한 부분이 있습니다.
  • 46 번 라인에 이 Task를 Task Instance로 정의합니다.
    • 이 때 task_1.output 으로 task_1return 값을 가져올 수 있습니다.
  • 51 번 라인에는 Task 간 의존 관계를 정의합니다. 크게 다른 부분은 없습니다.

# Web UI & Logs

DAG을 실행하면 다음과 같은 화면을 얻습니다.

img_1.png

각 Task Instance의 로그를 살펴보면 다음과 같습니다.

# get_first_word_from_param_and_return

img_2.png

[2022-01-31, 15:52:14 UTC] {logging_mixin.py:109} INFO - welcome
1

# get_last_word_from_param_and_return

img_3.png

[2022-01-31, 15:57:04 UTC] {logging_mixin.py:109} INFO - tutorials
1

img_4.png

[2022-01-31, 15:57:07 UTC] {logging_mixin.py:109} INFO - welcome to airflow tutorials
1
Last Updated: 3/1/2022, 1:26:42 PM