일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | 5 | ||
6 | 7 | 8 | 9 | 10 | 11 | 12 |
13 | 14 | 15 | 16 | 17 | 18 | 19 |
20 | 21 | 22 | 23 | 24 | 25 | 26 |
27 | 28 | 29 | 30 |
Tags
- SQL SERVER MIGRATION
- 가장 긴 바이토닉 부분 수열 파이썬
- 백준 11054.가장 긴 바이토닉 부분 수열
- 백준 2352 반도체 설계 파이썬
- 다중 컬럼 NOT IN
- 백준 1238 파티 파이썬
- 프로그래머스 여행경로
- 램프 파이썬
- SQL SERVER 장비교체
- 프로그래머스 가장 긴 팰린드롬
- 백준 1167 트리의 지름 파이썬
- 트리의 지름 파이썬
- 백준 2146 다리 만들기
- SWEA
- 백준 1043 거짓말 파이썬
- 등굣길 파이썬
- 백준 1516 게임 개발
- 프로그래머스 순위 파이썬
- 게임 개발 파이썬
- 가장 긴 팰린드롬 파이썬
- 프로그래머스 등굣길
- 순위 파이썬
- 베스트앨범 파이썬
- 백준 1034 램프 파이썬
- 반도체 설계 파이썬
- 다리 만들기 파이썬
- 백준 1613 역사
- 프로그래머스 순위
- 역사 파이썬
- 프로그래머스 베스트앨범
Archives
- Today
- Total
공부, 기록
[AIRFLOW, SPARK] AIRFLOW TEAMS 알람 설정 값 본문
1.아래 두 파이썬 파일을 dags 폴더에 추가
ms_teams_webhook_operator.py
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
from airflow.operators.http_operator import SimpleHttpOperator
from airflow.utils.decorators import apply_defaults
from ms_teams_webhook_hook import MSTeamsWebhookHook
import logging
class MSTeamsWebhookOperator(SimpleHttpOperator):
"""
This operator allows you to post messages to MS Teams using the Incoming Webhooks connector.
Takes both MS Teams webhook token directly and connection that has MS Teams webhook token.
If both supplied, the webhook token will be appended to the host in the connection.
:param http_conn_id: connection that has MS Teams webhook URL
:type http_conn_id: str
:param webhook_token: MS Teams webhook token
:type webhook_token: str
:param message: The message you want to send on MS Teams
:type message: str
:param subtitle: The subtitle of the message to send
:type subtitle: str
:param button_text: The text of the action button
:type button_text: str
:param button_url: The URL for the action button click
:type button_url : str
:param theme_color: Hex code of the card theme, without the #
:type message: str
:param proxy: Proxy to use when making the webhook request
:type proxy: str
"""
template_fields = ('message', 'subtitle',)
@apply_defaults
def __init__(self,
http_conn_id=None,
webhook_token=None,
message="",
title="",
subtitle="",
button_text="",
button_url="",
theme_color="00FF00",
proxy=None,
*args,
**kwargs):
super(MSTeamsWebhookOperator, self).__init__(endpoint=webhook_token, *args, **kwargs)
self.http_conn_id = http_conn_id
self.webhook_token = webhook_token
self.message = message
self.title = title
self.subtitle = subtitle
self.button_text = button_text
self.button_url = button_url
self.theme_color = theme_color
self.proxy = proxy
self.hook = None
def execute(self, context):
"""
Call the SparkSqlHook to run the provided sql query
"""
self.hook = MSTeamsWebhookHook(
self.http_conn_id,
self.webhook_token,
self.title,
self.message,
self.subtitle,
self.button_text,
self.button_url,
self.theme_color,
self.proxy
)
self.hook.execute()
logging.info("Webhook request sent to MS Teams")
ms_teams_webhook_hook.py
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
from airflow.hooks.http_hook import HttpHook
from airflow.exceptions import AirflowException
class MSTeamsWebhookHook(HttpHook):
"""
This hook allows you to post messages to MS Teams using the Incoming Webhook connector.
Takes both MS Teams webhook token directly and connection that has MS Teams webhook token.
If both supplied, the webhook token will be appended to the host in the connection.
:param http_conn_id: connection that has MS Teams webhook URL
:type http_conn_id: str
:param webhook_token: MS Teams webhook token
:type webhook_token: str
:param message: The message you want to send on MS Teams
:type message: str
:param subtitle: The subtitle of the message to send
:type subtitle: str
:param button_text: The text of the action button
:type button_text: str
:param button_url: The URL for the action button click
:type button_url : str
:param theme_color: Hex code of the card theme, without the #
:type message: str
:param proxy: Proxy to use when making the webhook request
:type proxy: str
"""
def __init__(self,
http_conn_id=None,
webhook_token=None,
message="",
title="",
subtitle="",
button_text="",
button_url="",
theme_color="00FF00",
proxy=None,
*args,
**kwargs
):
super(MSTeamsWebhookHook, self).__init__(*args, **kwargs)
self.http_conn_id = http_conn_id
self.webhook_token = self.get_token(webhook_token, http_conn_id)
self.message = message
self.title = title
self.subtitle = subtitle
self.button_text = button_text
self.button_url = button_url
self.theme_color = theme_color
self.proxy = proxy
def get_proxy(self, http_conn_id):
conn = self.get_connection(http_conn_id)
extra = conn.extra_dejson
print(extra)
return extra.get("proxy", '')
def get_token(self, token, http_conn_id):
"""
Given either a manually set token or a conn_id, return the webhook_token to use
:param token: The manually provided token
:param conn_id: The conn_id provided
:return: webhook_token (str) to use
"""
if token:
return token
elif http_conn_id:
conn = self.get_connection(http_conn_id)
extra = conn.extra_dejson
return extra.get('webhook_token', '')
else:
raise AirflowException('Cannot get URL: No valid MS Teams '
'webhook URL nor conn_id supplied')
#메세지 내용 커스텀 부분
def build_message(self):
cardjson = """
{{
"@type": "MessageCard",
"@context": "http://schema.org/extensions",
"themeColor": "{2}",
"title": "{5}",
"text": "{0}"
"sections": [{{
"activityTitle": "Error Point",
"activitySubtitle": "{1}",
"markdown": true,
"potentialAction": [
{{
"@type": "OpenUri",
"name": "{3}",
"targets": [
{{ "os": "default", "uri": "{4}" }}
]
}}
]
}}]
}}
"""
return cardjson.format(self.message, self.subtitle, self.theme_color,
self.button_text, self.button_url, self.title)
def execute(self):
"""
Remote Popen (actually execute the webhook call)
:param cmd: command to remotely execute
:param kwargs: extra arguments to Popen (see subprocess.Popen)
"""
proxies = {}
proxy_url = self.get_proxy(self.http_conn_id)
print("Proxy is : " + proxy_url)
if len(proxy_url) > 5:
proxies = {'https': proxy_url}
self.run(endpoint=self.webhook_token,
data=self.build_message(),
headers={'Content-type': 'application/json'},
extra_options={'proxies': proxies})
2. AIRFLOW DAGS 파일에 on_failure 함수 선언 및 default_args에 추가
from ms_teams_webhook_operator import MSTeamsWebhookOperator
def on_failure(context):
dag_id = context['dag_run'].dag_id #실행중인 DAG
task_id = context['task_instance'].task_id #실패한 TASK
context['task_instance'].xcom_push(key=dag_id, value=True)
encodets = context['ts'].replace(':','%3A').replace('.','%2E').replace('+','%2B') #URL 특수문자 인코딩
logs_url = "hostname입력/log?dag_id={}&task_id={}&execution_date={}".format(
dag_id, task_id, encodets)
teams_notification = MSTeamsWebhookOperator(
task_id="msteams_notify_failure", trigger_rule="all_done",
message="`{}` has failed on task: `{}`".format(dag_id, task_id),
title='Airflow Error Alert',
subtitle = task_id,
button_text="View log", button_url=logs_url,
theme_color="FF0000", http_conn_id='airflow-alert' #AIRFLOW CONNECTIONS에 추가한 Connection_id
proxy = "http:///")
teams_notification.execute(context)
default_args = {
'owner': 'minjae93',
'retries': 0,
'depends_on_past': False,
'on_failure_callback': on_failure #실패한 경우 실행되는 함수
}
2. AIRFLOW DAGS 파일에 on_failure 함수 선언 및 default_args에 추가
AIRFLOW DAG 생성
DAG는 TASK로 이루어지며 TASK는 작업의 단위이다. 해당 예시에서 TASK는 각각의 SPARK 파일 실행을 의미함.
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.dummy import DummyOperator
from airflow.utils.dates import days_ago
from ms_teams_webhook_operator import MSTeamsWebhookOperator
#
#
# 실패 알림 프로그램 코드
#
#
default_args = {
'owner': 'minjae93',
'retries': 0,
'depends_on_past': False,
'on_failure_callback': on_failure #실패한 경우 실행되는 함수
}
#해당 DAG의 정의
dag_staging_daily= DAG(
'staging_daily', #DAG 명칭
start_date=days_ago('2022-10-26'), #실행 일자
default_args=default_args,
schedule_interval='24 16 * * *', #반복 주기
catchup=True, #실행시간이 길어져서 동일한 DAG가 실해되려할 때 이전 작업을 끝낸 후 실행하는지의 여부
is_paused_upon_creation=False,
)
## SPARK 실행 cmd 커맨드 변수 선언
cmdReservation='spark-submit ~/airflow/sparkWorkspace/sparkReservationr.py'
cmdApptracking = 'spark-submit ~/airflow/sparkWorkspace/sparkApptracking.py'
cmdGPASS='spark-submit ~/airflow/sparkWorkspace/sparkGPASS.py'
cmdLogin = 'spark-submit ~/airflow/sparkWorkspace/sparkLogin.py'
cmdMileage='spark-submit ~/airflow/sparkWorkspace/sparkMileage.py'
cmdPayment = 'spark-submit ~/airflow/sparkWorkspace/sparkPayment.py'
#시작을 알리는 dummy
task_start = DummyOperator(
task_id='start',
dag=dag_staging_daily,
)
#끝을 알리는 dummy
task_finish = DummyOperator(
task_id='finish',
trigger_rule='all_success', #모든 TASK가 성공한 경우에 정상 종료 all_done : 모든 TASK가 작동한 경우에 정상 종료
dag=dag_staging_daily,
)
task_reservation = BashOperator(
task_id='Reservation',
dag=dag_staging_daily,
bash_command=cmdReservation,
)
task_apptracking= BashOperator(
task_id='Apptracking',
dag=dag_staging_daily,
bash_command=cmdApptracking,
)
task_gpass = BashOperator(
task_id='GPASS',
dag=dag_staging_daily,
bash_command=cmdGPASS,
)
task_login = BashOperator(
task_id='Login',
dag=dag_staging_daily,
bash_command=cmdLogin,
)
task_mileage = BashOperator(
task_id='Mileage',
dag=dag_staging_daily,
bash_command=cmdMileage,
)
task_payment = BashOperator(
task_id='Payment',
dag=dag_staging_daily,
bash_command=cmdPayment,
)
#의존관계 구성
task_start >> task_reservation >> task_apptracking >> task_gpass >> task_login >> task_mileage >> task_payment >> task_finish
'공부 > 소소한 개발' 카테고리의 다른 글
성능테스트, 부하 테스트, 스트레스 테스트 (0) | 2022.09.18 |
---|---|
데이터 처리 방식 (배치, 마이크로 배치) (0) | 2022.09.12 |
병렬처리와 분산처리 (0) | 2022.08.21 |
MSA (Micro Service Architecture) (0) | 2022.08.15 |
타입스크립트 (0) | 2022.08.06 |