BranchPythonOperator?
- Airflow에서 특정 조건에 따라 DAG의 실행 경로를 분기할 수 있도록 돕는 Operator
- 실행 후 다음에 실행할 Task ID 반환
특징
- 조건부 흐름 제어 : 분기 로직을 Python 함수로 구현하여 특정 조건에 따라 실행 경로 결정
- 한 번에 하나의 경로만 선택 : 반환된 Task ID만 실행되며, 선택되지 않은 경로의 Task는 Skipped 상태
- 동적 Task 실행 : 데이터 또는 외부 입력에 따라 DAG의 실행 흐름 변경
작동 방식
- 분기 로직 실행 : 분기 로직을 작성한 함수 실행을 통해 Task ID 반환
- Task 선택 : 반환된 Task만 실행되며, 나머지 Task는 Skipped 상태
- DAG 흐름 진행 : 선택된 Task 실행 후 다음 Task 실행
예시
from airflow import DAG
from airflow.operators.empty import EmptyOperator
from airflow.operators.python import BranchPythonOperator
from datetime import datetime
default_args = {
'start_date': datetime(2023, 1, 1)
}
dag = DAG(
'Learn_BranchPythonOperator',
schedule='@daily',
default_args=default_args)
def decide_branch(**context):
current_hour = datetime.now().hour
print(f"current_hour: {current_hour}")
if current_hour < 12:
return 'morning_task'
else:
return 'afternoon_task'
branching_operator = BranchPythonOperator(
task_id='branching_task',
python_callable=decide_branch,
dag=dag
)
morning_task = EmptyOperator(
task_id='morning_task',
dag=dag
)
afternoon_task = EmptyOperator(
task_id='afternoon_task',
dag=dag
)
# branching_operator >> morning_task
# branching_operator >> afternoon_task
branching_operator >> [morning_task, afternoon_task]
장점
- 리소스 절약 : 조건에 따라 필요하지 않은 Task를 건너뛰어 리소스 절약
- 워크플로우 제어 : DAG가 모든 Task를 실행할 필요가 없고, 특정 작업만 실행 가능
- 조건에 따른 DAG 경로 분기 : 날짜, 데이터 상태, 외부 입력 등을 기준으로 실행 흐름 변경 가능
'데이터 > Airflow' 카테고리의 다른 글
[Airflow] 10. Task Group? (0) | 2024.11.19 |
---|---|
[Airflow] 8. Sensor? (0) | 2024.11.19 |
[Airflow] 7. DAG Trigger? (0) | 2024.11.19 |
[Airflow] 6. Slack Alert (0) | 2024.11.18 |
[Airflow] 5. Backfill? (1) | 2024.11.12 |