设备终端通过调用http api来实现定位信息的上传,为了实现平台演示,模拟多个设备向平台发送http请求上传最新位置信息。
使用随机位置,设备的轨迹肯定交叉杂乱,所以使用百度地图的公交线路json数据模拟车辆的位置更新,当到起点和终点时掉头。
也可以直接修改某设备对应redis中keepOnline_locator_X和recentCoordinate_locator_X来更新设备最新定位信息,因为web平台中各设备的在线状态和最新位置就是从它们获得。
python 代码如下:
代码语言:python代码运行次数:0复制# -*- coding:utf-8 -*-
import requests
import random
import time
from datetime import datetime
from gpsMonitor_Model import *
import redis
import json
def main():
def initTest():
f=open('./lines12.js','rb')
global lines
lines=json.load(f)[0:50]
f.close()
for line in lines:
line['direction']=1
line['currentStop']=1
def emulateRedis():
for line in lines[:10]:
if line['direction']==1:
if line['currentStop']<len(line['stops']):
line['currentStop']=line['currentStop'] 1
else:
line['direction']=-1
line['currentStop']=line['currentStop']-1
else:
if line['currentStop']>1:
line['currentStop']=line['currentStop']-1
else:
line['direction']=1
line['currentStop']=line['currentStop'] 1
currentStop=line['stops'][line['currentStop']-1]
print line['name'],line['currentStop']
myRedis.setex('keepOnline_locator_' str(line['name']),str(line['name']),maxTimeout)
myRedis.set('recentCoordinate_locator_' str(line['name']),
'|'.join([str(currentStop['lat']),
str(currentStop['lng']),
datetime.strftime(datetime.now(),'%Y-%m-%d %H:%M:%S')]))
def emulateHttpRequsts():
while True:
time.sleep(0.05)
operationFlag=random.randint(0,100)%9
if operationFlag in [0,1,2,3,4]:
TestAddCoordinate()
elif operationFlag in [5,6]:
None
#TestAddLocacor()
else:
None
#TestRemoveLocacor()
def initConsts():
minTimeout=60
maxTimeout=60*2
defaultCoordinate_x=34.730861
defaultCoordinate_y=113.6101392
defaultCoordinate_time=datetime(2016, 12, 18, 3, 4, 5)
def TestAddLocacor():
coordinate=coordinates[random.randint(0,coordinates.count()-1)]
response=requests.put('http://127.0.0.1:8000/locator',data={'locator_name':'miao','locator_fleed':'1'})
locatorID=response.content.split()[1]
response=requests.post('http://127.0.0.1:8000/locator',data={'locator_name':'locator' str(locatorID),'locator_fleed':'1','itemId':locatorID})
coordinate_locatorID=response.content.split()[-1].strip('>').split('=')[-1]
requests.put('http://127.0.0.1:8000/coordinate',data={'coordinate_locator':coordinate_locatorID,'coordinate_x':coordinate.coordinate_x,'coordinate_y':coordinate.coordinate_y})
locator=Locator.get(locatorID)
print 'add locator:' str(locatorID)
def TestRemoveLocacor():
locators=Locator.select((Locator.q.locator_fleedID==1)&(Locator.q.id>11))
if locators.count()>1:
locator=locators[random.randint(0,locators.count()-1)]
requests.delete('http://127.0.0.1:8000/locator',data={'itemId':locator.id})
print 'remove locator' str(locator.id)
else:
print 'remove locator canceled'
def TestAddCoordinate():
coordinate=coordinates[random.randint(0,coordinates.count()-1)]
locators=Locator.select(Locator.q.locator_fleedID==1)
locator=locators[random.randint(0,locators.count()-1)]
requests.put('http://127.0.0.1:8000/coordinate',data={'coordinate_locator':locator.id,'coordinate_x':coordinate.coordinate_x,'coordinate_y':coordinate.coordinate_y})
print 'add coordinate:' str(coordinate.id) ' on:' str(locator.id)
## initConsts()
## initDatabase()
## coordinates=Coordinate.select()
## emulateHttpRequsts()
maxTimeout=60*5
myRedis=redis.Redis()
initTest()
while True:
time.sleep(3)
emulateRedis()
if __name__=='__main__':
main()