Airflow - Slack Alert
- Airflow의 DAG 실행 중 에러가 발생하면 지정된 Slack 채널로 에러 메시지 전송
장점
- 실시간 알림 기능 : Airflow DAG의 상태를 실시간으로 Slack 채널에서 확인 가능
- 협업 효율성 향상 : Slack 채널에서의 알람을 통해 다수의 사용자가 에러를 빠르게 확인 가능
- 커스터마이징 : 알림 메시지 내용 및 형식을 필요에 따라 수정 가능하며 특정 이벤트에만 동작하도록 설정 가능
사용법
- Alert를 보낼 Slack 워크스페이스와 채널 결정
- https://api.slack.com/messaging/webhooks에 접속하여 App 생성
더보기
2-1. Create your Slack app 클릭
2-2. From scratch 선택
2-3. App Name & 워크스페이스 설정 후 Create App
2-4. Incoming Webhooks 메뉴로 이동 후 활성화
2-5. Add New Webhook to Workspace 클릭 후 채널 설정
2-6. Webhook URL을 통해 Slack Alert 설정
3. Airflow Web UI에서 slakc_url 추가 - [Webhook URL] services/ 뒤의 문자열
4. slack.py 파일 생성
from airflow.models import Variable
import logging
import requests
def on_failure_callback(context):
"""
https://airflow.apache.org/_modules/airflow/operators/slack_operator.html
Define the callback to post on Slack if a failure is detected in the Workflow
:return: operator.execute
"""
text = str(context['task_instance'])
text += "```" + str(context.get('exception')) +"```"
send_message_to_a_slack_channel(text, ":scream:")
def on_success_callback(context):
"""
https://airflow.apache.org/_modules/airflow/operators/slack_operator.html
Define the callback to post on Slack if a failure is detected in the Workflow
:return: operator.execute
"""
text = str(context['task_instance'])
text += '성공적으로 실행되었습니다.'
send_message_to_a_slack_channel(text, ":scream:")
# def send_message_to_a_slack_channel(message, emoji, channel, access_token):
def send_message_to_a_slack_channel(message, emoji):
# url = "https://slack.com/api/chat.postMessage"
url = "https://hooks.slack.com/services/"+Variable.get("slack_url")
headers = {
'content-type': 'application/json',
}
data = { "username": "rlawngh's Airflow", "text": message, "icon_emoji": emoji }
r = requests.post(url, json=data, headers=headers)
return r
5. Slack Alert를 사용할 DAG 코드에 설정 추가
from airflow import DAG
from airflow.decorators import task
from datetime import datetime
# slack alert 함수를 생성한 py
from plugins import slack
@task
def print_hello():
print("hello!")
return "hello!"
@task
def print_goodbye():
print("goodbye!")
return "goodbye!"
with DAG(
dag_id = 'HelloWorld_v2',
start_date = datetime(2022,5,5),
catchup=False,
tags=['example'],
schedule = '0 2 * * *',
default_args = {
# 'retries': 1,
# 'retry_delay': timedelta(minutes=3),
'on_failure_callback': slack.on_failure_callback, # DAG 실패 시 Alert
'on_success_callback': slack.on_success_callback, # DAG 성공 시 Alert
}
) as dag:
# Assign the tasks to the DAG in order
print_hello() >> print_goodbye()
6. Slack 채널 및 메시지 확인
'데이터 > Airflow' 카테고리의 다른 글
[Airflow] 8. Sensor? (0) | 2024.11.19 |
---|---|
[Airflow] 7. DAG Trigger? (0) | 2024.11.19 |
[Airflow] 5. Backfill? (1) | 2024.11.12 |
[Airflow] 4. Airflow.cfg? (0) | 2024.11.11 |
[Airflow] 3. Python Operator / Task Decorator (0) | 2024.11.10 |