Sensor?
- 특정 조건이 충족될 때까지 작업의 실행을 대기시키는 Operator
- 주로 외부 이벤트를 모니터링하거나 상태 확인에 사용
주요 특징
- 상태 확인
- 외부 시스템 또는 상태를 주기적으로 확인
- 조건 충족 시 다음 Task 실행
- Poke & Reschedule 모드
- Poke : Sensor가 계속 실행되며 주기적으로 상태 확인
- Reschedule : Sensor가 계속 실행되지 않고, 일정 시간 후 다시 상태 확인
- Timeout
- Sensor가 조건 충족 여부를 확인하다가 지정된 시간 내에 조건이 충족되지 않으면 실패 철리
- Sensor가 조건 충족 여부를 확인하다가 지정된 시간 내에 조건이 충족되지 않으면 실패 철리
종류
- ExternalTaskSensor
- 다른 DAG의 특정 Task 완료 여부 확인
- FileSensor
- 특정 경로에 파일이 생성될 때까지 대기
- HttpSensor
- 특정 HTTP 요청이 성공적으로 완료될 때까지 대기
- S3KeySensor
- S3 버킷에 특정 파일(Key)이 존재할 때까지 대기
- SqlSensor
- 데이터베이스에서 특정 조건이 충족될 때까지 대기
- 데이터베이스에서 특정 조건이 충족될 때까지 대기
주요 매개변수
- poke_interval : 상태를 확인하는 주기(초), 기본값 60초
- timeout : 조건 충족을 기다리는 최대 시간(초)
- soft_fail : True로 설정 시 Sensor가 timeout되어도 DAG가 실패하지 않고 종료
- mode : poke(default value) or reschedule
mode
Poke
- Sensor가 활성 상태를 유지
- 설정된 poke_interval 주기마다 조건 확인
- Sensor가 대기 중인 동안 Worker 슬롯 점유
- 기본 동작 모드
wait_for_file = FileSensor(
task_id='wait_for_file',
filepath='/path/to/file.txt',
poke_interval=10, # 10초 간격으로 파일 존재 확인
timeout=600, # 10분 동안 대기
mode='poke' # 기본값
)
Reschedule
- Sensor가 일정 시간동안 비활성 상태로 유지
- poke_interval 간격으로 다시 활성화되어 조건 확인
- Sensor가 대기 중일 때는 Worker 슬롯을 반환하여 다른 작업에 사용 가능
- 리소스 효율적
wait_for_file = FileSensor(
task_id='wait_for_file',
filepath='/path/to/file.txt',
poke_interval=10, # 10초 간격으로 상태 확인
timeout=600, # 10분 동안 대기
mode='reschedule' # Reschedule 모드 활성화
)
주의사항
- 장시간 대기 시
- poke_interval, timeout 값의 적절한 조정 필요
- Sensor의 과도한 사용으로 인해 Scheduler 성능에 영향 발생 가능
- 리소스 효율성
- 리소스 효율을 위해 poke가 아닌 reschedule 모드 권장
'데이터 > Airflow' 카테고리의 다른 글
[Airflow] 10. Task Group? (0) | 2024.11.19 |
---|---|
[Airflow] 9. BranchPythonOperator? (1) | 2024.11.19 |
[Airflow] 7. DAG Trigger? (0) | 2024.11.19 |
[Airflow] 6. Slack Alert (0) | 2024.11.18 |
[Airflow] 5. Backfill? (1) | 2024.11.12 |