Slack Token 생성
airflow slack operator
1
pip install apache-airflow-providers-slack
1
2
3
4
5
6
7
8
from airflow.operators.slack_operator import SlackAPIPostOperator
SlackAPIPostOperator(
task_id='failure',
token='TOKEN',
text='Hello World !',
channel='real-estate', # Replace with your Slack username
username='airflow'
)
airflows UI > Admin > Connections
Conn Type
: HTTPHost:
https://hooks.slack.com/services/Password
: services/PASSWORD
(services의 하위를 패스워드에 입력)
Code
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
from airflow.hooks.base_hook import BaseHook
from airflow.contrib.operators.slack_webhook_operator import SlackWebhookOperator
SLACK_CONN_ID = 'slack'
def task_fail_slack_alert(context):
slack_webhook_token = BaseHook.get_connection(SLACK_CONN_ID).password
slack_msg = """
:red_circle: Task Failed.
*Task*: {task}
*Dag*: {dag}
*Execution Time*: {exec_date}
*Log Url*: {log_url}
""".format(
task=context.get('task_instance').task_id,
dag=context.get('task_instance').dag_id,
ti=context.get('task_instance'),
exec_date=context.get('execution_date'),
log_url=context.get('task_instance').log_url,
)
failed_alert = SlackWebhookOperator(
task_id='slack_test',
http_conn_id='slack',
webhook_token=slack_webhook_token,
message=slack_msg,
username='airflow')
return failed_alert.execute(context=context)
1
2
3
4
5
6
7
8
args = {
'owner': 'airflow',
'on_failure_callback': slack_failed_callback
}
with DAG(
dag_id='slack',
default_args=args,
- 참고
- https://medium.com/datareply/integrating-slack-alerts-in-airflow-c9dcd155105
- https://github.com/apache/airflow/blob/main/airflow/providers/slack/example_dags/example_slack.py
- https://airflow.apache.org/docs/apache-airflow-providers-slack/stable/_api/airflow/providers/slack/operators/slack/index.html#module-airflow.providers.slack.operators.slack