[Airflow] 3. Python Operator / Task Decorator

2024. 11. 10. 18:44·데이터/Airflow

Python Operator

  • Airflow 1.x에서 주로 사용
  • Python 함수를 Task로 실행할 때 PythonOperator 객체 사용
  • 명시적으로 함수를 Task로 래핑하여 실행

예시 코드

더보기
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

# 추출 함수
def extract(url):
    print(f"Extracting data from {url}")
    return ['data1', 'data2', 'data3']  # 예시 데이터

# 변환 함수
def transform(data):
    print(f"Transforming data: {data}")
    return [item.upper() for item in data]  # 대문자 변환

# 적재 함수
def load(data):
    print(f"Loading data: {data}")

# DAG 정의
dag = DAG(
    dag_id='etl_example',
    start_date=datetime(2023, 4, 6),
    schedule_interval='@daily',  # 매일 실행
    catchup=False,
)

# 각 함수에 대한 PythonOperator 정의
extract_task = PythonOperator(
    task_id='extract_task',
    python_callable=extract,
    op_args=['http://example.com'],  # URL을 인자로 전달
    dag=dag
)

transform_task = PythonOperator(
    task_id='transform_task',
    python_callable=transform,
    op_args=["{{ task_instance.xcom_pull(task_ids='extract_task') }}"],  # extract_task의 결과를 XCom으로 가져옴
    dag=dag
)

load_task = PythonOperator(
    task_id='load_task',
    python_callable=load,
    op_args=["{{ task_instance.xcom_pull(task_ids='transform_task') }}"],  # transform_task의 결과를 XCom으로 가져옴
    dag=dag
)

# 태스크 간 순서 정의
extract_task >> transform_task >> load_task


특징

  • python_callabe을 통해 호출할 Python 함수를 지정
  • 각 Task를 PythonOperator 객체로 생성하며, 실행할 함수를 python_callable 인자에 전달
  • XCom을 명시적으로 사용
  • Type hint 미지원


Task Decorator

  • Airflow 2.x부터 도입된 방식
  • Python 함수를 Task로 변환하는 더 간결하고 직관적인 방법

예시 코드

더보기
from airflow import DAG
from airflow.decorators import task
from datetime import datetime

# 추출 함수
@task
def extract(url):
    print(f"Extracting data from {url}")
    return ['data1', 'data2', 'data3']  # 예시 데이터

# 변환 함수
@task
def transform(data):
    print(f"Transforming data: {data}")
    return [item.upper() for item in data]  # 대문자 변환

# 적재 함수
@task
def load(data):
    print(f"Loading data: {data}")

# DAG 정의
dag = DAG(
    dag_id='etl_example_task_decorator',
    start_date=datetime(2023, 4, 6),
    schedule_interval='@daily',  # 매일 실행
    catchup=False,
)

# 각 함수 호출
data = extract('http://example.com')
transformed_data = transform(data)
load(transformed_data)

특징

  • Python 함수를 @task 데코레이터로 간단하게 Task로 변환
  • @dag 데코레이터로 DAG 정의를 간결하게 작성 가능
  • XCom을 명시적으로 사용할 필요 없이, 반환 값을 XCom에 자동으로 저장
  • Type hint 지원


Python Operator VS Task Decorator

특성 PythonOperator Task Decorator
사용법 명시적으로 PythonOperator 객체를 생성하고 python_callable에 함수를 전달 함수 위에 @task 데코레이터를 추가하여 간편하게 태스크로 변환
XCom 처리 수동으로 XCom을 처리 (ti.xcom_push() 등) 자동으로 XCom 처리 (반환 값을 자동으로 XCom을 통해 전달)
타입 힌트 없음 지원 (타입 힌트를 통해 함수 인자와 반환값을 정의 가능)
가독성 다소 길고 복잡 코드가 간결하고 Pythonic, 가독성 ↑
유연성 고급 설정이 필요한 경우 유리 (op_args, op_kwargs 등) 직관적이고 간단한 경우 유리, 복잡한 설정이 필요한 경우에는 제한적
버전 호환성 Airflow 1.x 및 2.x 모두 호환 Airflow 2.x 이상에서만 사용 가능
저작자표시 (새창열림)

'데이터 > Airflow' 카테고리의 다른 글

[Airflow] 6. Slack Alert  (0) 2024.11.18
[Airflow] 5. Backfill?  (1) 2024.11.12
[Airflow] 4. Airflow.cfg?  (1) 2024.11.11
[Airflow] 2. XCom?  (1) 2024.11.10
[Airflow] 1. Airflow?  (1) 2024.11.07
'데이터/Airflow' 카테고리의 다른 글
  • [Airflow] 5. Backfill?
  • [Airflow] 4. Airflow.cfg?
  • [Airflow] 2. XCom?
  • [Airflow] 1. Airflow?
김줘
김줘
김줘와 같이 데이터, 컴퓨터, IT 공부
  • 김줘
    초보개발자 김줘의 코딩일기
    김줘
  • 전체
    오늘
    어제
    • 분류 전체보기
      • 데이터 엔지니어링 데브코스
      • 데이터
        • Airflow
        • Spark
        • Kafka
        • dbt
      • TroubleShooting
      • Docker
      • AWS
      • 크롤링, 스크래핑, 시각화
        • Selenium
        • 시각화
      • 코딩테스트
        • 프로그래머스
        • 입출력과 사칙연산
        • 정렬
      • Django
      • 자바 공부
      • 끄적끄적
  • 블로그 메뉴

    • 홈
    • 태그
    • 방명록
  • 링크

  • 공지사항

  • 인기 글

  • 태그

    데이터 엔지니어링 데브코스
    TiL
    자바
    Azure
    Airflow
    초보개발자
    aws
    데이터 엔지니어
    에어플로우
    cloud
    부트캠프
    데브코스
    Java
    프로그래밍
    Python
    티스토리챌린지
    오블완
    프로그래머스
    파이썬
    데이터 엔지니어링 데브코스 4기
  • 최근 댓글

  • 최근 글

  • hELLO· Designed By정상우.v4.10.0
김줘
[Airflow] 3. Python Operator / Task Decorator
상단으로

티스토리툴바