Python Operator
- Airflow 1.x에서 주로 사용
- Python 함수를 Task로 실행할 때 PythonOperator 객체 사용
- 명시적으로 함수를 Task로 래핑하여 실행
예시 코드
더보기
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
# 추출 함수
def extract(url):
print(f"Extracting data from {url}")
return ['data1', 'data2', 'data3'] # 예시 데이터
# 변환 함수
def transform(data):
print(f"Transforming data: {data}")
return [item.upper() for item in data] # 대문자 변환
# 적재 함수
def load(data):
print(f"Loading data: {data}")
# DAG 정의
dag = DAG(
dag_id='etl_example',
start_date=datetime(2023, 4, 6),
schedule_interval='@daily', # 매일 실행
catchup=False,
)
# 각 함수에 대한 PythonOperator 정의
extract_task = PythonOperator(
task_id='extract_task',
python_callable=extract,
op_args=['http://example.com'], # URL을 인자로 전달
dag=dag
)
transform_task = PythonOperator(
task_id='transform_task',
python_callable=transform,
op_args=["{{ task_instance.xcom_pull(task_ids='extract_task') }}"], # extract_task의 결과를 XCom으로 가져옴
dag=dag
)
load_task = PythonOperator(
task_id='load_task',
python_callable=load,
op_args=["{{ task_instance.xcom_pull(task_ids='transform_task') }}"], # transform_task의 결과를 XCom으로 가져옴
dag=dag
)
# 태스크 간 순서 정의
extract_task >> transform_task >> load_task
특징
- python_callabe을 통해 호출할 Python 함수를 지정
- 각 Task를 PythonOperator 객체로 생성하며, 실행할 함수를 python_callable 인자에 전달
- XCom을 명시적으로 사용
- Type hint 미지원
Task Decorator
- Airflow 2.x부터 도입된 방식
- Python 함수를 Task로 변환하는 더 간결하고 직관적인 방법
예시 코드
더보기
from airflow import DAG
from airflow.decorators import task
from datetime import datetime
# 추출 함수
@task
def extract(url):
print(f"Extracting data from {url}")
return ['data1', 'data2', 'data3'] # 예시 데이터
# 변환 함수
@task
def transform(data):
print(f"Transforming data: {data}")
return [item.upper() for item in data] # 대문자 변환
# 적재 함수
@task
def load(data):
print(f"Loading data: {data}")
# DAG 정의
dag = DAG(
dag_id='etl_example_task_decorator',
start_date=datetime(2023, 4, 6),
schedule_interval='@daily', # 매일 실행
catchup=False,
)
# 각 함수 호출
data = extract('http://example.com')
transformed_data = transform(data)
load(transformed_data)
특징
- Python 함수를 @task 데코레이터로 간단하게 Task로 변환
- @dag 데코레이터로 DAG 정의를 간결하게 작성 가능
- XCom을 명시적으로 사용할 필요 없이, 반환 값을 XCom에 자동으로 저장
- Type hint 지원
Python Operator VS Task Decorator
특성 | PythonOperator | Task Decorator |
---|---|---|
사용법 | 명시적으로 PythonOperator 객체를 생성하고 python_callable 에 함수를 전달 |
함수 위에 @task 데코레이터를 추가하여 간편하게 태스크로 변환 |
XCom 처리 | 수동으로 XCom을 처리 (ti.xcom_push() 등) |
자동으로 XCom 처리 (반환 값을 자동으로 XCom을 통해 전달) |
타입 힌트 | 없음 | 지원 (타입 힌트를 통해 함수 인자와 반환값을 정의 가능) |
가독성 | 다소 길고 복잡 | 코드가 간결하고 Pythonic, 가독성 ↑ |
유연성 | 고급 설정이 필요한 경우 유리 (op_args , op_kwargs 등) |
직관적이고 간단한 경우 유리, 복잡한 설정이 필요한 경우에는 제한적 |
버전 호환성 | Airflow 1.x 및 2.x 모두 호환 | Airflow 2.x 이상에서만 사용 가능 |
'데이터 > Airflow' 카테고리의 다른 글
[Airflow] 6. Slack Alert (0) | 2024.11.18 |
---|---|
[Airflow] 5. Backfill? (1) | 2024.11.12 |
[Airflow] 4. Airflow.cfg? (0) | 2024.11.11 |
[Airflow] 2. XCom? (1) | 2024.11.10 |
[Airflow] 1. Airflow? (1) | 2024.11.07 |