데이터/Airflow

[Airflow] 9. BranchPythonOperator?

김줘 2024. 11. 19. 21:18

BranchPythonOperator?

  • Airflow에서 특정 조건에 따라 DAG의 실행 경로를 분기할 수 있도록 돕는 Operator
  • 실행 후 다음에 실행할 Task ID 반환

특징

  • 조건부 흐름 제어 : 분기 로직을 Python 함수로 구현하여 특정 조건에 따라 실행 경로 결정
  • 한 번에 하나의 경로만 선택 : 반환된 Task ID만 실행되며, 선택되지 않은 경로의 Task는 Skipped 상태
  • 동적 Task 실행 : 데이터 또는 외부 입력에 따라 DAG의 실행 흐름 변경

작동 방식

  1. 분기 로직 실행 : 분기 로직을 작성한 함수 실행을 통해 Task ID 반환
  2. Task 선택 : 반환된 Task만 실행되며, 나머지 Task는 Skipped 상태
  3. DAG 흐름 진행 : 선택된 Task 실행 후 다음 Task 실행

예시

from airflow import DAG
from airflow.operators.empty import EmptyOperator
from airflow.operators.python import BranchPythonOperator
from datetime import datetime

default_args = {
    'start_date': datetime(2023, 1, 1)
}

dag = DAG(
    'Learn_BranchPythonOperator',
    schedule='@daily',
    default_args=default_args)


def decide_branch(**context):
    current_hour = datetime.now().hour
    print(f"current_hour: {current_hour}")
    if current_hour < 12:
        return 'morning_task'
    else:
        return 'afternoon_task'


branching_operator = BranchPythonOperator(
    task_id='branching_task',
    python_callable=decide_branch,
    dag=dag
)


morning_task = EmptyOperator(
    task_id='morning_task',
    dag=dag
)


afternoon_task = EmptyOperator(
    task_id='afternoon_task',
    dag=dag
)

# branching_operator >> morning_task
# branching_operator >> afternoon_task
branching_operator >> [morning_task, afternoon_task]


장점

  • 리소스 절약 : 조건에 따라 필요하지 않은 Task를 건너뛰어 리소스 절약
  • 워크플로우 제어 : DAG가 모든 Task를 실행할 필요가 없고, 특정 작업만 실행 가능
  • 조건에 따른 DAG 경로 분기 : 날짜, 데이터 상태, 외부 입력 등을 기준으로 실행 흐름 변경 가능