공부, 기록

[AIRFLOW, SPARK] AIRFLOW TEAMS 알람 설정 값 본문

공부/소소한 개발

[AIRFLOW, SPARK] AIRFLOW TEAMS 알람 설정 값

무는빼주세요 2022. 9. 4. 16:08

1.아래 두 파이썬 파일을 dags 폴더에 추가

ms_teams_webhook_operator.py

# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
#
from airflow.operators.http_operator import SimpleHttpOperator
from airflow.utils.decorators import apply_defaults
from ms_teams_webhook_hook import MSTeamsWebhookHook
import logging
 
 
class MSTeamsWebhookOperator(SimpleHttpOperator):
    """
    This operator allows you to post messages to MS Teams using the Incoming Webhooks connector.
    Takes both MS Teams webhook token directly and connection that has MS Teams webhook token.
    If both supplied, the webhook token will be appended to the host in the connection.
    :param http_conn_id: connection that has MS Teams webhook URL
    :type http_conn_id: str
    :param webhook_token: MS Teams webhook token
    :type webhook_token: str
    :param message: The message you want to send on MS Teams
    :type message: str
    :param subtitle: The subtitle of the message to send
    :type subtitle: str
    :param button_text: The text of the action button
    :type button_text: str
    :param button_url: The URL for the action button click
    :type button_url : str
    :param theme_color: Hex code of the card theme, without the #
    :type message: str
    :param proxy: Proxy to use when making the webhook request
    :type proxy: str
    """
 
    template_fields = ('message', 'subtitle',)
 
    @apply_defaults
    def __init__(self,
                 http_conn_id=None,
                 webhook_token=None,
                 message="",
                 title="",
                 subtitle="",
                 button_text="",
                 button_url="",
                 theme_color="00FF00",
                 proxy=None,
                 *args,
                 **kwargs):
 
        super(MSTeamsWebhookOperator, self).__init__(endpoint=webhook_token, *args, **kwargs)
        self.http_conn_id = http_conn_id
        self.webhook_token = webhook_token
        self.message = message
        self.title = title
        self.subtitle = subtitle
        self.button_text = button_text
        self.button_url = button_url
        self.theme_color = theme_color
        self.proxy = proxy
        self.hook = None
 
    def execute(self, context):
        """
        Call the SparkSqlHook to run the provided sql query
        """
        self.hook = MSTeamsWebhookHook(
            self.http_conn_id,
            self.webhook_token,
            self.title,
            self.message,
            self.subtitle,
            self.button_text,
            self.button_url,
            self.theme_color,
            self.proxy
        )
        self.hook.execute()
        logging.info("Webhook request sent to MS Teams")

 

ms_teams_webhook_hook.py

# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
#
from airflow.hooks.http_hook import HttpHook
from airflow.exceptions import AirflowException
 
 
class MSTeamsWebhookHook(HttpHook):
    """
    This hook allows you to post messages to MS Teams using the Incoming Webhook connector.
    Takes both MS Teams webhook token directly and connection that has MS Teams webhook token.
    If both supplied, the webhook token will be appended to the host in the connection.
    :param http_conn_id: connection that has MS Teams webhook URL
    :type http_conn_id: str
    :param webhook_token: MS Teams webhook token
    :type webhook_token: str
    :param message: The message you want to send on MS Teams
    :type message: str
    :param subtitle: The subtitle of the message to send
    :type subtitle: str
    :param button_text: The text of the action button
    :type button_text: str
    :param button_url: The URL for the action button click
    :type button_url : str
    :param theme_color: Hex code of the card theme, without the #
    :type message: str
    :param proxy: Proxy to use when making the webhook request
    :type proxy: str
    """
    def __init__(self,
                 http_conn_id=None,
                 webhook_token=None,
                 message="",
                 title="",
                 subtitle="",
                 button_text="",
                 button_url="",
                 theme_color="00FF00",
                 proxy=None,
                 *args,
                 **kwargs
                 ):
        super(MSTeamsWebhookHook, self).__init__(*args, **kwargs)
        self.http_conn_id = http_conn_id
        self.webhook_token = self.get_token(webhook_token, http_conn_id)
        self.message = message
        self.title = title
        self.subtitle = subtitle
        self.button_text = button_text
        self.button_url = button_url
        self.theme_color = theme_color
        self.proxy = proxy
 
    def get_proxy(self, http_conn_id):
        conn = self.get_connection(http_conn_id)
        extra = conn.extra_dejson
        print(extra)
        return extra.get("proxy", '')
 
    def get_token(self, token, http_conn_id):
        """
        Given either a manually set token or a conn_id, return the webhook_token to use
        :param token: The manually provided token
        :param conn_id: The conn_id provided
        :return: webhook_token (str) to use
        """
        if token:
            return token
        elif http_conn_id:
            conn = self.get_connection(http_conn_id)
            extra = conn.extra_dejson
            return extra.get('webhook_token', '')
        else:
            raise AirflowException('Cannot get URL: No valid MS Teams '
                                   'webhook URL nor conn_id supplied')
    #메세지 내용 커스텀 부분
    def build_message(self):
        cardjson = """
                {{
            "@type": "MessageCard",
            "@context": "http://schema.org/extensions",
            "themeColor": "{2}",
            "title": "{5}",
            "text": "{0}"
            "sections": [{{
                "activityTitle": "Error Point",
                "activitySubtitle": "{1}",
                "markdown": true,
                "potentialAction": [
                    {{
                        "@type": "OpenUri",
                        "name": "{3}",
                        "targets": [
                            {{ "os": "default", "uri": "{4}" }}
                        ]
                    }}
                ]
            }}]
            }}
                """
        return cardjson.format(self.message, self.subtitle, self.theme_color,
                               self.button_text, self.button_url, self.title)
 
    def execute(self):
        """
        Remote Popen (actually execute the webhook call)
        :param cmd: command to remotely execute
        :param kwargs: extra arguments to Popen (see subprocess.Popen)
        """
        proxies = {}
        proxy_url = self.get_proxy(self.http_conn_id)
        print("Proxy is : " + proxy_url)
        if len(proxy_url) > 5:
            proxies = {'https': proxy_url}
 
        self.run(endpoint=self.webhook_token,
                 data=self.build_message(),
                 headers={'Content-type': 'application/json'},
                 extra_options={'proxies': proxies})

2. AIRFLOW DAGS 파일에 on_failure 함수 선언 및 default_args에 추가

from ms_teams_webhook_operator import MSTeamsWebhookOperator
 
def on_failure(context):
 
    dag_id = context['dag_run'].dag_id #실행중인 DAG
 
    task_id = context['task_instance'].task_id #실패한 TASK
    context['task_instance'].xcom_push(key=dag_id, value=True)
      
    encodets = context['ts'].replace(':','%3A').replace('.','%2E').replace('+','%2B')  #URL 특수문자 인코딩
      
    logs_url = "hostname입력/log?dag_id={}&task_id={}&execution_date={}".format(
         dag_id, task_id, encodets)
 
    teams_notification = MSTeamsWebhookOperator(
        task_id="msteams_notify_failure", trigger_rule="all_done",
        message="`{}` has failed on task: `{}`".format(dag_id, task_id),
        title='Airflow Error Alert',
        subtitle = task_id,
        button_text="View log", button_url=logs_url,
        theme_color="FF0000", http_conn_id='airflow-alert'  #AIRFLOW CONNECTIONS에 추가한 Connection_id
        proxy = "http:///")
    teams_notification.execute(context)
 
default_args = {
    'owner': 'minjae93',
    'retries': 0,
    'depends_on_past': False,
    'on_failure_callback': on_failure  #실패한 경우 실행되는 함수
}

2. AIRFLOW DAGS 파일에 on_failure 함수 선언 및 default_args에 추가

AIRFLOW DAG 생성

DAG는 TASK로 이루어지며 TASK는 작업의 단위이다. 해당 예시에서 TASK는 각각의 SPARK 파일 실행을 의미함.

from datetime import datetime, timedelta
  
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.dummy import DummyOperator
from airflow.utils.dates import days_ago
from ms_teams_webhook_operator import MSTeamsWebhookOperator
 
#
#
# 실패 알림 프로그램 코드
#
#
 
default_args = {
    'owner': 'minjae93',
    'retries': 0,
    'depends_on_past': False,
    'on_failure_callback': on_failure  #실패한 경우 실행되는 함수
}
 
#해당 DAG의 정의
dag_staging_daily= DAG(
    'staging_daily',  #DAG 명칭
    start_date=days_ago('2022-10-26'), #실행 일자
    default_args=default_args,
    schedule_interval='24 16 * * *',  #반복 주기
    catchup=True, #실행시간이 길어져서 동일한 DAG가 실해되려할 때 이전  작업을 끝낸 후 실행하는지의 여부
    is_paused_upon_creation=False,
)
## SPARK 실행 cmd 커맨드 변수 선언
cmdReservation='spark-submit ~/airflow/sparkWorkspace/sparkReservationr.py'
cmdApptracking = 'spark-submit ~/airflow/sparkWorkspace/sparkApptracking.py'
cmdGPASS='spark-submit ~/airflow/sparkWorkspace/sparkGPASS.py'
cmdLogin = 'spark-submit ~/airflow/sparkWorkspace/sparkLogin.py'
cmdMileage='spark-submit ~/airflow/sparkWorkspace/sparkMileage.py'
cmdPayment = 'spark-submit ~/airflow/sparkWorkspace/sparkPayment.py'
 
#시작을 알리는 dummy
task_start = DummyOperator(
    task_id='start',
    dag=dag_staging_daily,
)
 
#끝을 알리는 dummy
task_finish = DummyOperator(
    task_id='finish',
    trigger_rule='all_success', #모든 TASK가 성공한 경우에 정상 종료   all_done : 모든 TASK가 작동한 경우에 정상 종료
    dag=dag_staging_daily,
)
task_reservation = BashOperator(
    task_id='Reservation',
    dag=dag_staging_daily,
    bash_command=cmdReservation,
)
 
task_apptracking= BashOperator(
    task_id='Apptracking',
    dag=dag_staging_daily,
    bash_command=cmdApptracking,
)
task_gpass = BashOperator(
    task_id='GPASS',
    dag=dag_staging_daily,
    bash_command=cmdGPASS,
)
task_login = BashOperator(
    task_id='Login',
    dag=dag_staging_daily,
    bash_command=cmdLogin,
)
task_mileage = BashOperator(
    task_id='Mileage',
    dag=dag_staging_daily,
    bash_command=cmdMileage,
)
task_payment = BashOperator(
    task_id='Payment',
    dag=dag_staging_daily,
    bash_command=cmdPayment,
)
 
 
#의존관계 구성
task_start >> task_reservation >> task_apptracking >> task_gpass >> task_login >> task_mileage >> task_payment >> task_finish