[Airflow] 7. DAG Trigger?

2024. 11. 19. 20:40·데이터/Airflow

DAG Trigger?

  • 하나의 DAG가 완료되거나, 특정 조건을 만족하면 다른 DAG를 트리거하는 기능
  • 다중 DAG 간의 의존성을 설정할 때 유용
  • 하나의 DAG 실행이 끝난 후 다른 DAG를 자동으로 실행 가능
  • DAG Trigger를 통해 여러개의 DAG를 연속적으로 실행하거나, 조건에 맞는 시점에 실행 가능

DAG Trigger 방법

Explict Trigger

  • TriggerDagRunOperator 사용
  • DAG A가 명시적으로 DAG B를 트리거

사용방법

더보기

1. airflow.cfg 파일의 [dag_run_conf_overrides_params] 값을 True로 설정

 

2. Trigger하는 DAG에서 TriggerDagRunOperator 정의

from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.dagrun_operator import TriggerDagRunOperator
from datetime import datetime

# 첫 번째 DAG 정의
dag1 = DAG('dag1', start_date=datetime(2024, 11, 19))

# 두 번째 DAG을 트리거하는 작업
trigger_task = TriggerDagRunOperator(
    task_id='trigger_task',
    trigger_dag_id='dag2',  # 트리거할 DAG의 ID
    conf={"param1": "value1", "param2": "value2"},  # conf 매개변수 전달
    dag=dag1
)

 

3. Trigger되는 DAG에서 매개변수 사용

from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.models import DagRun
from datetime import datetime

# 두 번째 DAG 정의
dag2 = DAG('dag2', start_date=datetime(2024, 11, 19))

def print_conf(**kwargs):
    # conf 파라미터 사용
    dag_run = kwargs.get('dag_run')
    if dag_run:
        conf = dag_run.conf  # 전달된 conf 값 가져오기
        print(conf)

# 두 번째 DAG의 작업
task1 = PythonOperator(
    task_id='print_conf_task',
    python_callable=print_conf,
    provide_context=True,
    dag=dag2
)

 


Reactive Trigger

  • ExternalTaskSensor 사용
  • DAG B가 DAG A의 Task가 끝나기를 대기
    • DAG A는 DAG B의 대기 여부를 모름

사용방법

더보기

1. Trigger의 대상이 될 DAG 작성

from airflow import DAG
from airflow.operators.dummy import DummyOperator
from datetime import datetime

# 첫 번째 DAG 정의
dag1 = DAG('dag1', start_date=datetime(2024, 11, 19), schedule_interval=None, catchup=False)

# DAG 1 작업 정의
with dag1:
    task1 = DummyOperator(task_id='task1')

 

2. 대상의 상태를 확인하여 자기 자신을 Trigger 시킬 DAG 정의

from airflow import DAG
from airflow.sensors.external_task_sensor import ExternalTaskSensor
from airflow.operators.dummy import DummyOperator
from datetime import datetime

# 두 번째 DAG 정의
dag2 = DAG('dag2', start_date=datetime(2024, 11, 19), schedule_interval=None, catchup=False)

# DAG 2 작업 정의
with dag2:
    # 다른 DAG(Task1)이 완료될 때까지 대기
    wait_for_task1 = ExternalTaskSensor(
        task_id='wait_for_task1',
        external_dag_id='dag1',  # 감시할 DAG ID
        external_task_id='task1',  # 감시할 Task ID
        poke_interval=60,  # 상태 확인 간격
        timeout=600  # 대기 시간 초과 시 에러 발생
    )

    proceed = DummyOperator(task_id='proceed_after_task1')

    wait_for_task1 >> proceed
저작자표시 (새창열림)

'데이터 > Airflow' 카테고리의 다른 글

[Airflow] 9. BranchPythonOperator?  (2) 2024.11.19
[Airflow] 8. Sensor?  (1) 2024.11.19
[Airflow] 6. Slack Alert  (0) 2024.11.18
[Airflow] 5. Backfill?  (1) 2024.11.12
[Airflow] 4. Airflow.cfg?  (1) 2024.11.11
'데이터/Airflow' 카테고리의 다른 글
  • [Airflow] 9. BranchPythonOperator?
  • [Airflow] 8. Sensor?
  • [Airflow] 6. Slack Alert
  • [Airflow] 5. Backfill?
김줘
김줘
김줘와 같이 데이터, 컴퓨터, IT 공부
  • 김줘
    초보개발자 김줘의 코딩일기
    김줘
  • 전체
    오늘
    어제
    • 분류 전체보기
      • 데이터 엔지니어링 데브코스
      • 데이터
        • Airflow
        • Spark
        • Kafka
        • dbt
      • TroubleShooting
      • Docker
      • AWS
      • 크롤링, 스크래핑, 시각화
        • Selenium
        • 시각화
      • 코딩테스트
        • 프로그래머스
        • 입출력과 사칙연산
        • 정렬
      • Django
      • 자바 공부
      • 끄적끄적
  • 블로그 메뉴

    • 홈
    • 태그
    • 방명록
  • 링크

  • 공지사항

  • 인기 글

  • 태그

    Java
    Airflow
    데이터 엔지니어링 데브코스 4기
    Azure
    자바
    데이터 엔지니어링 데브코스
    에어플로우
    TiL
    aws
    데브코스
    프로그래밍
    초보개발자
    파이썬
    티스토리챌린지
    프로그래머스
    오블완
    Python
    cloud
    부트캠프
    데이터 엔지니어
  • 최근 댓글

  • 최근 글

  • hELLO· Designed By정상우.v4.10.0
김줘
[Airflow] 7. DAG Trigger?
상단으로

티스토리툴바