3. Airflow Web UI에서 slakc_url 추가 - [Webhook URL] services/ 뒤의 문자열
4. slack.py 파일 생성
from airflow.models import Variable
import logging
import requests
def on_failure_callback(context):
"""
https://airflow.apache.org/_modules/airflow/operators/slack_operator.html
Define the callback to post on Slack if a failure is detected in the Workflow
:return: operator.execute
"""
text = str(context['task_instance'])
text += "```" + str(context.get('exception')) +"```"
send_message_to_a_slack_channel(text, ":scream:")
def on_success_callback(context):
"""
https://airflow.apache.org/_modules/airflow/operators/slack_operator.html
Define the callback to post on Slack if a failure is detected in the Workflow
:return: operator.execute
"""
text = str(context['task_instance'])
text += '성공적으로 실행되었습니다.'
send_message_to_a_slack_channel(text, ":scream:")
# def send_message_to_a_slack_channel(message, emoji, channel, access_token):
def send_message_to_a_slack_channel(message, emoji):
# url = "https://slack.com/api/chat.postMessage"
url = "https://hooks.slack.com/services/"+Variable.get("slack_url")
headers = {
'content-type': 'application/json',
}
data = { "username": "rlawngh's Airflow", "text": message, "icon_emoji": emoji }
r = requests.post(url, json=data, headers=headers)
return r
5. Slack Alert를 사용할 DAG 코드에 설정 추가
from airflow import DAG
from airflow.decorators import task
from datetime import datetime
# slack alert 함수를 생성한 py
from plugins import slack
@task
def print_hello():
print("hello!")
return "hello!"
@task
def print_goodbye():
print("goodbye!")
return "goodbye!"
with DAG(
dag_id = 'HelloWorld_v2',
start_date = datetime(2022,5,5),
catchup=False,
tags=['example'],
schedule = '0 2 * * *',
default_args = {
# 'retries': 1,
# 'retry_delay': timedelta(minutes=3),
'on_failure_callback': slack.on_failure_callback, # DAG 실패 시 Alert
'on_success_callback': slack.on_success_callback, # DAG 성공 시 Alert
}
) as dag:
# Assign the tasks to the DAG in order
print_hello() >> print_goodbye()