# python引用数据库两种方式
# 方式一
代码语言:javascript
复制# -*- coding: UTF-8 -*-
import pymysql
import requests
import json
#建立连接
conn = pymysql.connect(host='##', port=3306, database='##', user='##', password='####')
#拿到游标
cursor = conn.cursor()
#执行sql语句
sql = "SELECT * FROM bh_job_exec WHERE job_cde='company' AND exec_status='SUCCESS' AND JSON_UNQUOTE(JSON_EXTRACT(exec_param, '$.businessDate')) = DATE_SUB(CURDATE(),INTERVAL 1 DAY);"
# print(sql)
def sendDing(msg):
dingding_url='https://oapi.dingtalk.com/robot/send?access_token=xxxxx'
data = {"msgtype": "text","text": {"content": "告警:" str(msg)}}
headers = {'Content-Type':'application/json;charset=UTF-8'}
send_data = json.dumps(data).encode('utf-8')
ret = requests.post(url=dingding_url,data=send_data,headers=headers)
try:
row = cursor.execute(sql)
# print(row)
# Python查询Mysql使用 fetchone() 方法获取单条数据, 使用fetchall() 方法获取多条数据。
data=cursor.fetchone()
#print(data)
#进行判断
if data:
#print("GFSS结算成功")
sendDing("GFSS结算成功")
else:
#print("GFSS结算失败")
sendDing("GFSS结算失败")
except:
print("Error: unable to fetch data")
finally:
cursor.close()
#关闭数据库
conn.close()
# 方式二
代码语言:javascript
复制#!/usr/bin/python3
# -*- coding: utf-8 -*-
import pymysql
import pickle
import os.path,sys
import urllib.request
import urllib.parse
import json
ding_url = "https://oapi.dingtalk.com/robot/send?access_token=xxxx"
def get_tabcol_current():
tabcol_dic = {}
## 目标库的信息
db = pymysql.connect(host='xxx',
port=13306,
user='xx',
password='xx',
database='xxx')
cursor = db.cursor()
sql = "SELECT * FROM `terminal_session` WHERE date_end is Null"
cursor.execute(sql)
results = cursor.fetchall()
for row in results:
if row[1] in tabcol_dic:
tabcol_dic[row[1]].append(row[2])
else:
tabcol_dic[row[1]]=[row[2]]
cursor.close()
db.close()
# print(tabcol_dic)
return tabcol_dic
def send_msg_by_ding(message):
header = {
"Content-Type": "application/json"
}
data = {
"msgtype": "text",
"text": {"content": message}
}
send_data = json.dumps(data).encode('utf-8')
req = urllib.request.Request(ding_url, data=send_data, headers=header, method='POST')
ret = urllib.request.urlopen(req)
if ret.status != 200:
print("send message error!")
def main():
ret = get_tabcol_current()
# print(ret)
add_dic = ret
# add_dic = list(ret.values())
# del_dic = list(ret.keys())
message = "堡垒机告警:n"
if add_dic:
for (tab,cols) in add_dic.items():
cols.sort()
message = message tab " 登录了: " ', '.join(cols) "n"
if add_dic:
send_msg_by_ding(message)
# print(del_dic)
# print(1111)
if __name__ == '__main__':
main()