Home [Airflow] 에러 발생시 Slack으로 알림 주기
Post
Cancel

[Airflow] 에러 발생시 Slack으로 알림 주기

Slack Token 생성

airflow slack operator

1
pip install apache-airflow-providers-slack
1
2
3
4
5
6
7
8
from airflow.operators.slack_operator import SlackAPIPostOperator
SlackAPIPostOperator(
    task_id='failure',
    token='TOKEN',
    text='Hello World !',
    channel='real-estate',  # Replace with your Slack username
    username='airflow'
)

  • airflows UI > Admin > Connections
    • Conn Type: HTTP
    • Host: https://hooks.slack.com/services/
    • Password: services/PASSWORD (services의 하위를 패스워드에 입력)

Code

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
from airflow.hooks.base_hook import BaseHook
from airflow.contrib.operators.slack_webhook_operator import SlackWebhookOperator

SLACK_CONN_ID = 'slack'
def task_fail_slack_alert(context):
    slack_webhook_token = BaseHook.get_connection(SLACK_CONN_ID).password
    slack_msg = """
            :red_circle: Task Failed. 
            *Task*: {task}  
            *Dag*: {dag} 
            *Execution Time*: {exec_date}  
            *Log Url*: {log_url} 
            """.format(
            task=context.get('task_instance').task_id,
            dag=context.get('task_instance').dag_id,
            ti=context.get('task_instance'),
            exec_date=context.get('execution_date'),
            log_url=context.get('task_instance').log_url,
        )
    failed_alert = SlackWebhookOperator(
        task_id='slack_test',
        http_conn_id='slack',
        webhook_token=slack_webhook_token,
        message=slack_msg,
        username='airflow')
    return failed_alert.execute(context=context)
1
2
3
4
5
6
7
8
args = {
    'owner': 'airflow',
    'on_failure_callback': slack_failed_callback
}

with DAG(
    dag_id='slack',
    default_args=args,
This post is licensed under CC BY 4.0 by the author.

[Airflow] Python 스크립트 BashOperator로 실행

[Bash] String 포함되었는지 확인하는 방법