高质量编码-轨迹管理平台(模拟实时位置)

2020-12-09 17:39:17 浏览数 (1)

设备终端通过调用http api来实现定位信息的上传,为了实现平台演示,模拟多个设备向平台发送http请求上传最新位置信息。

使用随机位置,设备的轨迹肯定交叉杂乱,所以使用百度地图的公交线路json数据模拟车辆的位置更新,当到起点和终点时掉头。

也可以直接修改某设备对应redis中keepOnline_locator_X和recentCoordinate_locator_X来更新设备最新定位信息,因为web平台中各设备的在线状态和最新位置就是从它们获得。

python 代码如下:

代码语言:python代码运行次数:0复制
# -*- coding:utf-8 -*-
import requests
    
import random
import time
from datetime import datetime

from gpsMonitor_Model import *
import redis
import json
def main():
    def initTest():
        f=open('./lines12.js','rb')
        global lines
        lines=json.load(f)[0:50]
        f.close()
        for line in lines:
            line['direction']=1
            line['currentStop']=1

    def emulateRedis():           
        for line in lines[:10]:
            if line['direction']==1:
                if line['currentStop']<len(line['stops']):
                    line['currentStop']=line['currentStop'] 1
                else:
                    line['direction']=-1
                    line['currentStop']=line['currentStop']-1
            else:
                if line['currentStop']>1:
                    line['currentStop']=line['currentStop']-1
                else:
                    line['direction']=1
                    line['currentStop']=line['currentStop'] 1
                    
            
            currentStop=line['stops'][line['currentStop']-1]
            print line['name'],line['currentStop']
            myRedis.setex('keepOnline_locator_' str(line['name']),str(line['name']),maxTimeout)
            myRedis.set('recentCoordinate_locator_' str(line['name']),
                        '|'.join([str(currentStop['lat']),
                                  str(currentStop['lng']),
                                  datetime.strftime(datetime.now(),'%Y-%m-%d %H:%M:%S')]))
    
    def emulateHttpRequsts():
        while True:          
            time.sleep(0.05)
            operationFlag=random.randint(0,100)%9
        
            if operationFlag in [0,1,2,3,4]:
                TestAddCoordinate()

            elif operationFlag in [5,6]:
                None
                #TestAddLocacor()
            else:
                None
                #TestRemoveLocacor()
    def initConsts():
        minTimeout=60
        maxTimeout=60*2
        defaultCoordinate_x=34.730861
        defaultCoordinate_y=113.6101392
        defaultCoordinate_time=datetime(2016, 12, 18, 3, 4, 5)  
       
    def TestAddLocacor():
       
       coordinate=coordinates[random.randint(0,coordinates.count()-1)]
       response=requests.put('http://127.0.0.1:8000/locator',data={'locator_name':'miao','locator_fleed':'1'})
       locatorID=response.content.split()[1]
       response=requests.post('http://127.0.0.1:8000/locator',data={'locator_name':'locator' str(locatorID),'locator_fleed':'1','itemId':locatorID})
    
       coordinate_locatorID=response.content.split()[-1].strip('>').split('=')[-1]
       requests.put('http://127.0.0.1:8000/coordinate',data={'coordinate_locator':coordinate_locatorID,'coordinate_x':coordinate.coordinate_x,'coordinate_y':coordinate.coordinate_y})

       locator=Locator.get(locatorID)
        
       print 'add locator:' str(locatorID)                        
    def TestRemoveLocacor():
        locators=Locator.select((Locator.q.locator_fleedID==1)&(Locator.q.id>11))
        if locators.count()>1:
            locator=locators[random.randint(0,locators.count()-1)]
            requests.delete('http://127.0.0.1:8000/locator',data={'itemId':locator.id})
            print 'remove locator' str(locator.id)
        else:
            print 'remove locator canceled'
    def TestAddCoordinate():
        coordinate=coordinates[random.randint(0,coordinates.count()-1)]
        locators=Locator.select(Locator.q.locator_fleedID==1)
        locator=locators[random.randint(0,locators.count()-1)]
        requests.put('http://127.0.0.1:8000/coordinate',data={'coordinate_locator':locator.id,'coordinate_x':coordinate.coordinate_x,'coordinate_y':coordinate.coordinate_y})
        print 'add coordinate:' str(coordinate.id) ' on:' str(locator.id)
##    initConsts()
##    initDatabase()
##    coordinates=Coordinate.select()
##    emulateHttpRequsts()
    maxTimeout=60*5
    myRedis=redis.Redis()
    initTest()
    while True:
        time.sleep(3)
        emulateRedis()
if __name__=='__main__':
    main()

0 人点赞