1.环境
开通钉钉群自定义告警机器人,主要是否要加签和关键词,本次使用告警的关键词是Flink任务。
2.开发代码
2.1 告警信息发送python代码
代码语言:javascript复制# -*- coding: UTF-8 -*-
import json
import time
import requests
class SendMesg(object):
def __init__(self):
self.__headers = {'Content-Type': 'application/json;charset=utf-8'}
self.__url = "https://oapi.dingtalk.com/robot/send?xxxx"
self.__out_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
def sendMsg(self, subject, content):
out_content = "{0}ntime: {1}".format(content, self.__out_time)
out_subject = "[<alert>{0}]n".format(subject)
text = {"content": out_subject out_content}
data = {"msgtype": "text", "text": text}
info = requests.post(url=self.__url, data=json.dumps(data), headers=self.__headers)
return info
2.2 yarn任务内容处理,通过关键词
代码语言:javascript复制# -*- coding: UTF-8 -*-
import re
import sys
from app_info import Apps
from send_msg import SendMesg
apps_info= Apps()
def get_job_names(text):
pattern = re.compile(r'flink_w ')
job_name = pattern.findall(text)
if job_name is None:
return []
return job_name
if __name__ == '__main__':
text = sys.stdin.read()
input_info = get_job_names(text)
print(input_info)
out_info = list(set(apps_info.apps).difference(set(input_info)))
print(out_info)
if len(out_info) > 0:
sender = SendMesg()
info_str = "n".join([str(x) for x in out_info])
#print(info_str)
print_info = sender.sendMsg("Flink任务挂掉", info_str)
print(print_info)
2.3 配置sh执行
代码语言:javascript复制#! /bin/bash
source /home/hadoop/.bash_profile
# yarn application -list
result=$(yarn application -list)
echo $result
echo $result 2>/dev/null | python /mnt/disk1/flink_monitor/flink_job.py
2.4 配置对应调度执行
代码语言:javascript复制sudo crontab -u hadoop -e
*/1 * * * * /usr/sh /flink_monitor/flink_job.sh >> /flink_monitor/flink_job.log