Python开发---使用requests库调用Web API

2020-11-04 11:36:33 浏览数 (1)

下面代码将自己系统的访问日志和事件告警定时通过Web API提交到其他系统

代码语言:python代码运行次数:0复制
# -*- coding:utf-8 -*-
import schedule
import requests
import json
from TOOLS import *
import time
from datetime import datetime,timedelta
eventUrl="上报事件API接口"

pcLogUrl='提交PC日志接口'

appLogUrl='提交APP日志接口'

waterEvent={
   "gridId": "",
    "eventTitle": "",
    "eventDesc": "",
    "eventLevel": "",
    "eventSource": "",
    "timeLimit": "",
    "longitude": "",
    "latitude": "",
    "pollutionKindId ":  "",
     
    "address": "",
 }


def sendEventHour(r):
    lastHour=dictStation[r['SiteId']]['lastHour']
     
    if((lastHour.get(r['Factor'])!=None) & (lastHour.get(r['Factor'])==r['RecordTime'])): 
        return 
     
    station=dictStation[r['SiteId']]
    waterEvent['gridId']=dictGrid[r['SiteId']]
    waterEvent['eventTitle']="%s%s小时数据超标报警"%(station['SiteName'],dictFactor22[r['Factor']])
    waterEvent['eventDesc']="时间:%s,监测值:%s,目标值:%s"%(r['RecordTime'],r['Value'],r['Target'])
    waterEvent['longitude']=station['Longitude']
    waterEvent['latitude']=station['Latitude']
    waterEvent['address']=station['SiteName']
      
    print(waterEvent)
    dictStation[r['SiteId']]['lastHour'][r['Factor']]=r['RecordTime']
    r=requests.post(eventUrl,{'waterEvent':json.dumps(waterEvent)})
    print(r.status_code)
def sendEventMinute(r):
    lastMinute=dictStation[r['SiteId']]['lastMinute']
    if((lastMinute.get(r['Factor'])!=None) & (lastMinute.get(r['Factor'])==r['RecordTime'])):    
        return 
            
    station=dictStation[r['SiteId']]
    waterEvent['gridId']=dictGrid[r['SiteId']]
    waterEvent['eventTitle']="%s%s分钟数据超标报警"%(station['SiteName'],dictFactor22[r['Factor']])
    waterEvent['eventDesc']="时间:%s,监测值:%s,目标值:%s"%(r['RecordTime'],r['Value'],r['Target'])
    waterEvent['longitude']=station['Longitude']
    waterEvent['latitude']=station['Latitude']
    waterEvent['address']=station['SiteName']
    print(waterEvent)
    dictStation[r['SiteId']]['lastMinute'][r['Factor']]=r['RecordTime']
    r=requests.post(eventUrl,{'waterEvent':json.dumps(waterEvent)})
    print(r.status_code)


def uploadLogs(data):
    logPath="data.txt"

    f=open(logPath,'r')
    idsDict=json.load(f)
    f.close()
    
    

    pcLogs=filter(lambda x:int(x['SourceType'])==1 and x['USERNAME']!=''and x['IDS'] not in idsDict['pc'],data)
    
    pcLogs=list(pcLogs)
    print(len(pcLogs))
    pcLogIds=list(map(lambda x:x['IDS'],pcLogs))

    appLogs=filter(lambda x:int(x['SourceType'])==0 and x['USERNAME']!='' and x['IDS'] not in idsDict['app'],data)
    appLogs=list(appLogs)
    print(len(appLogs))
    appLogIds=list(map(lambda x:x['IDS'],appLogs))
    
    if len(pcLogs)>0:
        pcLogs=map(lambda x:{
	"userId": x["USERNAME"],
	"appId": "your appId",
	"loginName": x["USERNAME"],
	"actionTime": x["OPERATETIME"].replace("T"," "),
	"action": x["SHORTNAME"] '---' x["CONTENTS"],
	"ip": x["COMPUTERIP"],
	"remark": "无"
        },pcLogs)
        pcLogs=list(pcLogs)
        r=requests.post(pcLogUrl,json=pcLogs)
        print(r.status_code)
        print(r.text)
        if(r.status_code==200):
            print(r.url)
            print(r.text)
            idsDict['pc'] =pcLogIds
            f=open(logPath,'w')
            json.dump(idsDict,f)
            f.close()

    if len(appLogs)>0:
        
        appLogs=map(lambda x:{
	"userId": x["USERNAME"],
	 
	"loginName": x["USERNAME"],
	"actionTime": x["OPERATETIME"].replace("T"," "),
	"action": x["SHORTNAME"] '---' x["CONTENTS"],
	"imei": x["COMPUTERIP"],
        "appPackage": "water",
	"remark": "无"
        },appLogs)
        appLogs=list(appLogs)
        r=requests.post(appLogUrl,json=appLogs)
        print(r.status_code)
        print(r.text)
        if(r.status_code==200):
            print(r.url)
            print(r.text)
            idsDict['app'] =appLogIds
            f=open(logPath,'w')
            json.dump(idsDict,f)
            f.close()
            
def hourJob():
    #print(dictStation)
    try:
        hourData=getHourAlarm()
    
        for i in hourData:
            sendEventHour(i)
    except:
        pass
        
def minuteJob():
    #print(dictStation)
    minuteData=getMinuteAlarm()
    for i in minuteData:
        sendEventMinute(i)
def logJob():
    today=datetime.now()
    print(today.strftime('%Y-%m-%d'))
    yesterday=today-timedelta(days=1)
    tomorrow=today timedelta(days=1)
    try:
        data=getLogs(yesterday.strftime('%Y-%m-%d'),tomorrow.strftime('%Y-%m-%d'))
        print(len(data))
        uploadLogs(data)
    except:
        pass
    

if __name__=='__main__':
     
    global dictStation
    global dictTarget
    dictStation=getStation()
     
    dictTarget=getTarget()
    schedule.every(3).hours.do(hourJob)
    schedule.every(10).minutes.do(logJob)
    #schedule.every(10).minutes.do(minuteJob)
    
    while True:
        schedule.run_pending() 

同时上面代码使用schedule库来运行定时任务。

0 人点赞