데이터/Airflow

[Airflow] 8. Sensor?

김줘 2024. 11. 19. 21:04

Sensor?

  • 특정 조건이 충족될 때까지 작업의 실행을 대기시키는 Operator
  • 주로 외부 이벤트를 모니터링하거나 상태 확인에 사용

주요 특징

  • 상태 확인
    • 외부 시스템 또는 상태를 주기적으로 확인
    • 조건 충족 시 다음 Task 실행
  • Poke & Reschedule 모드
    • Poke : Sensor가 계속 실행되며 주기적으로 상태 확인
    • Reschedule : Sensor가 계속 실행되지 않고, 일정 시간 후 다시 상태 확인
  • Timeout
    • Sensor가 조건 충족 여부를 확인하다가 지정된 시간 내에 조건이 충족되지 않으면 실패 철리

종류

  • ExternalTaskSensor
    • 다른 DAG의 특정 Task 완료 여부 확인
  • FileSensor
    • 특정 경로에 파일이 생성될 때까지 대기
  • HttpSensor
    • 특정 HTTP 요청이 성공적으로 완료될 때까지 대기
  • S3KeySensor
    • S3 버킷에 특정 파일(Key)이 존재할 때까지 대기
  • SqlSensor
    • 데이터베이스에서 특정 조건이 충족될 때까지 대기

주요 매개변수

  • poke_interval : 상태를 확인하는 주기(초), 기본값 60초
  • timeout : 조건 충족을 기다리는 최대 시간(초)
  • soft_fail : True로 설정 시 Sensor가 timeout되어도 DAG가 실패하지 않고 종료
  • mode : poke(default value) or reschedule

mode

Poke

  • Sensor가 활성 상태를 유지
  • 설정된 poke_interval 주기마다 조건 확인
  • Sensor가 대기 중인 동안 Worker 슬롯 점유
  • 기본 동작 모드
wait_for_file = FileSensor(
    task_id='wait_for_file',
    filepath='/path/to/file.txt',
    poke_interval=10,  # 10초 간격으로 파일 존재 확인
    timeout=600,  # 10분 동안 대기
    mode='poke'  # 기본값
)

Reschedule

  • Sensor가 일정 시간동안 비활성 상태로 유지
  • poke_interval 간격으로 다시 활성화되어 조건 확인
  • Sensor가 대기 중일 때는 Worker 슬롯을 반환하여 다른 작업에 사용 가능
  • 리소스 효율적
wait_for_file = FileSensor(
    task_id='wait_for_file',
    filepath='/path/to/file.txt',
    poke_interval=10,  # 10초 간격으로 상태 확인
    timeout=600,  # 10분 동안 대기
    mode='reschedule'  # Reschedule 모드 활성화
)

 

주의사항

  • 장시간 대기 시
    • poke_interval, timeout 값의 적절한 조정 필요
    • Sensor의 과도한 사용으로 인해 Scheduler 성능에 영향 발생 가능
  • 리소스 효율성
    • 리소스 효율을 위해 poke가 아닌 reschedule 모드 권장