DAG Trigger?
- 하나의 DAG가 완료되거나, 특정 조건을 만족하면 다른 DAG를 트리거하는 기능
- 다중 DAG 간의 의존성을 설정할 때 유용
- 하나의 DAG 실행이 끝난 후 다른 DAG를 자동으로 실행 가능
- DAG Trigger를 통해 여러개의 DAG를 연속적으로 실행하거나, 조건에 맞는 시점에 실행 가능
DAG Trigger 방법
Explict Trigger
- TriggerDagRunOperator 사용
- DAG A가 명시적으로 DAG B를 트리거
사용방법
더보기
1. airflow.cfg 파일의 [dag_run_conf_overrides_params] 값을 True로 설정
2. Trigger하는 DAG에서 TriggerDagRunOperator 정의
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.dagrun_operator import TriggerDagRunOperator
from datetime import datetime
# 첫 번째 DAG 정의
dag1 = DAG('dag1', start_date=datetime(2024, 11, 19))
# 두 번째 DAG을 트리거하는 작업
trigger_task = TriggerDagRunOperator(
task_id='trigger_task',
trigger_dag_id='dag2', # 트리거할 DAG의 ID
conf={"param1": "value1", "param2": "value2"}, # conf 매개변수 전달
dag=dag1
)
3. Trigger되는 DAG에서 매개변수 사용
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.models import DagRun
from datetime import datetime
# 두 번째 DAG 정의
dag2 = DAG('dag2', start_date=datetime(2024, 11, 19))
def print_conf(**kwargs):
# conf 파라미터 사용
dag_run = kwargs.get('dag_run')
if dag_run:
conf = dag_run.conf # 전달된 conf 값 가져오기
print(conf)
# 두 번째 DAG의 작업
task1 = PythonOperator(
task_id='print_conf_task',
python_callable=print_conf,
provide_context=True,
dag=dag2
)
Reactive Trigger
- ExternalTaskSensor 사용
- DAG B가 DAG A의 Task가 끝나기를 대기
- DAG A는 DAG B의 대기 여부를 모름
사용방법
더보기
1. Trigger의 대상이 될 DAG 작성
from airflow import DAG
from airflow.operators.dummy import DummyOperator
from datetime import datetime
# 첫 번째 DAG 정의
dag1 = DAG('dag1', start_date=datetime(2024, 11, 19), schedule_interval=None, catchup=False)
# DAG 1 작업 정의
with dag1:
task1 = DummyOperator(task_id='task1')
2. 대상의 상태를 확인하여 자기 자신을 Trigger 시킬 DAG 정의
from airflow import DAG
from airflow.sensors.external_task_sensor import ExternalTaskSensor
from airflow.operators.dummy import DummyOperator
from datetime import datetime
# 두 번째 DAG 정의
dag2 = DAG('dag2', start_date=datetime(2024, 11, 19), schedule_interval=None, catchup=False)
# DAG 2 작업 정의
with dag2:
# 다른 DAG(Task1)이 완료될 때까지 대기
wait_for_task1 = ExternalTaskSensor(
task_id='wait_for_task1',
external_dag_id='dag1', # 감시할 DAG ID
external_task_id='task1', # 감시할 Task ID
poke_interval=60, # 상태 확인 간격
timeout=600 # 대기 시간 초과 시 에러 발생
)
proceed = DummyOperator(task_id='proceed_after_task1')
wait_for_task1 >> proceed
'데이터 > Airflow' 카테고리의 다른 글
[Airflow] 9. BranchPythonOperator? (1) | 2024.11.19 |
---|---|
[Airflow] 8. Sensor? (0) | 2024.11.19 |
[Airflow] 6. Slack Alert (0) | 2024.11.18 |
[Airflow] 5. Backfill? (1) | 2024.11.12 |
[Airflow] 4. Airflow.cfg? (0) | 2024.11.11 |