[Airflow] 6. Slack Alert

2024. 11. 18. 21:48·데이터/Airflow

Airflow - Slack Alert

  • Airflow의 DAG 실행 중 에러가 발생하면 지정된 Slack 채널로 에러 메시지 전송

장점

  • 실시간 알림 기능 : Airflow DAG의 상태를 실시간으로 Slack 채널에서 확인 가능
  • 협업 효율성 향상 : Slack 채널에서의 알람을 통해 다수의 사용자가 에러를 빠르게 확인 가능
  • 커스터마이징 : 알림 메시지 내용 및 형식을 필요에 따라 수정 가능하며 특정 이벤트에만 동작하도록 설정 가능

사용법

  1. Alert를 보낼 Slack 워크스페이스와 채널 결정
  2. https://api.slack.com/messaging/webhooks에 접속하여 App 생성
더보기

2-1. Create your Slack app 클릭

 

2-2. From scratch 선택

 

2-3. App Name & 워크스페이스 설정 후 Create App

 

2-4. Incoming Webhooks 메뉴로 이동 후 활성화

 

2-5. Add New Webhook to Workspace 클릭 후 채널 설정

 

2-6. Webhook URL을 통해 Slack Alert 설정

3. Airflow Web UI에서 slakc_url 추가 - [Webhook URL] services/ 뒤의 문자열

4. slack.py 파일 생성

from airflow.models import Variable

import logging
import requests

def on_failure_callback(context):
    """
    https://airflow.apache.org/_modules/airflow/operators/slack_operator.html
    Define the callback to post on Slack if a failure is detected in the Workflow
    :return: operator.execute
    """
    text = str(context['task_instance'])
    text += "```" + str(context.get('exception')) +"```"
    send_message_to_a_slack_channel(text, ":scream:")

def on_success_callback(context):
    """
    https://airflow.apache.org/_modules/airflow/operators/slack_operator.html
    Define the callback to post on Slack if a failure is detected in the Workflow
    :return: operator.execute
    """
    text = str(context['task_instance'])
    text += '성공적으로 실행되었습니다.'
    send_message_to_a_slack_channel(text, ":scream:")


# def send_message_to_a_slack_channel(message, emoji, channel, access_token):
def send_message_to_a_slack_channel(message, emoji):
    # url = "https://slack.com/api/chat.postMessage"
    url = "https://hooks.slack.com/services/"+Variable.get("slack_url")
    headers = {
        'content-type': 'application/json',
    }
    data = { "username": "rlawngh's Airflow", "text": message, "icon_emoji": emoji }
    r = requests.post(url, json=data, headers=headers)
    return r

5. Slack Alert를 사용할 DAG 코드에 설정 추가

from airflow import DAG
from airflow.decorators import task
from datetime import datetime

# slack alert 함수를 생성한 py
from plugins import slack

@task
def print_hello():
    print("hello!")
    return "hello!"

@task
def print_goodbye():
    print("goodbye!")
    return "goodbye!"

with DAG(
    dag_id = 'HelloWorld_v2',
    start_date = datetime(2022,5,5),
    catchup=False,
    tags=['example'],
    schedule = '0 2 * * *',
    default_args = {
        # 'retries': 1,
        # 'retry_delay': timedelta(minutes=3),
        'on_failure_callback': slack.on_failure_callback, # DAG 실패 시 Alert
        'on_success_callback': slack.on_success_callback, # DAG 성공 시 Alert
    }
) as dag:

    # Assign the tasks to the DAG in order
    print_hello() >> print_goodbye()

6. Slack 채널 및 메시지 확인

저작자표시 (새창열림)

'데이터 > Airflow' 카테고리의 다른 글

[Airflow] 8. Sensor?  (1) 2024.11.19
[Airflow] 7. DAG Trigger?  (0) 2024.11.19
[Airflow] 5. Backfill?  (1) 2024.11.12
[Airflow] 4. Airflow.cfg?  (1) 2024.11.11
[Airflow] 3. Python Operator / Task Decorator  (1) 2024.11.10
'데이터/Airflow' 카테고리의 다른 글
  • [Airflow] 8. Sensor?
  • [Airflow] 7. DAG Trigger?
  • [Airflow] 5. Backfill?
  • [Airflow] 4. Airflow.cfg?
김줘
김줘
김줘와 같이 데이터, 컴퓨터, IT 공부
  • 김줘
    초보개발자 김줘의 코딩일기
    김줘
  • 전체
    오늘
    어제
    • 분류 전체보기
      • 데이터 엔지니어링 데브코스
      • 데이터
        • Airflow
        • Spark
        • Kafka
        • dbt
      • TroubleShooting
      • Docker
      • AWS
      • 크롤링, 스크래핑, 시각화
        • Selenium
        • 시각화
      • 코딩테스트
        • 프로그래머스
        • 입출력과 사칙연산
        • 정렬
      • Django
      • 자바 공부
      • 끄적끄적
  • 블로그 메뉴

    • 홈
    • 태그
    • 방명록
  • 링크

  • 공지사항

  • 인기 글

  • 태그

    Java
    티스토리챌린지
    오블완
    프로그래밍
    초보개발자
    데이터 엔지니어
    데이터 엔지니어링 데브코스 4기
    자바
    에어플로우
    Azure
    프로그래머스
    TiL
    데브코스
    Python
    cloud
    파이썬
    Airflow
    데이터 엔지니어링 데브코스
    부트캠프
    aws
  • 최근 댓글

  • 최근 글

  • hELLO· Designed By정상우.v4.10.0
김줘
[Airflow] 6. Slack Alert
상단으로

티스토리툴바