데이터/Airflow

[Airflow] 3. Python Operator / Task Decorator

김줘 2024. 11. 10. 18:44

Python Operator

  • Airflow 1.x에서 주로 사용
  • Python 함수를 Task로 실행할 때 PythonOperator 객체 사용
  • 명시적으로 함수를 Task로 래핑하여 실행

예시 코드

더보기
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

# 추출 함수
def extract(url):
    print(f"Extracting data from {url}")
    return ['data1', 'data2', 'data3']  # 예시 데이터

# 변환 함수
def transform(data):
    print(f"Transforming data: {data}")
    return [item.upper() for item in data]  # 대문자 변환

# 적재 함수
def load(data):
    print(f"Loading data: {data}")

# DAG 정의
dag = DAG(
    dag_id='etl_example',
    start_date=datetime(2023, 4, 6),
    schedule_interval='@daily',  # 매일 실행
    catchup=False,
)

# 각 함수에 대한 PythonOperator 정의
extract_task = PythonOperator(
    task_id='extract_task',
    python_callable=extract,
    op_args=['http://example.com'],  # URL을 인자로 전달
    dag=dag
)

transform_task = PythonOperator(
    task_id='transform_task',
    python_callable=transform,
    op_args=["{{ task_instance.xcom_pull(task_ids='extract_task') }}"],  # extract_task의 결과를 XCom으로 가져옴
    dag=dag
)

load_task = PythonOperator(
    task_id='load_task',
    python_callable=load,
    op_args=["{{ task_instance.xcom_pull(task_ids='transform_task') }}"],  # transform_task의 결과를 XCom으로 가져옴
    dag=dag
)

# 태스크 간 순서 정의
extract_task >> transform_task >> load_task


특징

  • python_callabe을 통해 호출할 Python 함수를 지정
  • 각 Task를 PythonOperator 객체로 생성하며, 실행할 함수를 python_callable 인자에 전달
  • XCom을 명시적으로 사용
  • Type hint 미지원


Task Decorator

  • Airflow 2.x부터 도입된 방식
  • Python 함수를 Task로 변환하는 더 간결하고 직관적인 방법

예시 코드

더보기
from airflow import DAG
from airflow.decorators import task
from datetime import datetime

# 추출 함수
@task
def extract(url):
    print(f"Extracting data from {url}")
    return ['data1', 'data2', 'data3']  # 예시 데이터

# 변환 함수
@task
def transform(data):
    print(f"Transforming data: {data}")
    return [item.upper() for item in data]  # 대문자 변환

# 적재 함수
@task
def load(data):
    print(f"Loading data: {data}")

# DAG 정의
dag = DAG(
    dag_id='etl_example_task_decorator',
    start_date=datetime(2023, 4, 6),
    schedule_interval='@daily',  # 매일 실행
    catchup=False,
)

# 각 함수 호출
data = extract('http://example.com')
transformed_data = transform(data)
load(transformed_data)

특징

  • Python 함수를 @task 데코레이터로 간단하게 Task로 변환
  • @dag 데코레이터로 DAG 정의를 간결하게 작성 가능
  • XCom을 명시적으로 사용할 필요 없이, 반환 값을 XCom에 자동으로 저장
  • Type hint 지원


Python Operator VS Task Decorator

특성 PythonOperator Task Decorator
사용법 명시적으로 PythonOperator 객체를 생성하고 python_callable에 함수를 전달 함수 위에 @task 데코레이터를 추가하여 간편하게 태스크로 변환
XCom 처리 수동으로 XCom을 처리 (ti.xcom_push() 등) 자동으로 XCom 처리 (반환 값을 자동으로 XCom을 통해 전달)
타입 힌트 없음 지원 (타입 힌트를 통해 함수 인자와 반환값을 정의 가능)
가독성 다소 길고 복잡 코드가 간결하고 Pythonic, 가독성 ↑
유연성 고급 설정이 필요한 경우 유리 (op_args, op_kwargs 등) 직관적이고 간단한 경우 유리, 복잡한 설정이 필요한 경우에는 제한적
버전 호환성 Airflow 1.x 및 2.x 모두 호환 Airflow 2.x 이상에서만 사용 가능