Python开发---使用subprocess从命令行程序获取数据

2020-11-04 10:38:18 浏览数 (1)

发现一个简单的解决不同语言开发的程序之间调用对方函数获取数据的方法,就是使用命令行作为数据流的接口。

下面举例说明。

比如可以用一个C# 开发一个命令行程序。程序使用命令行参数,来选择调用不同函数,同时传入其他函数调用需要的参数,返回结果用JSON格式输出到命令行。

下面的C#命令行程序,根据args[0]来选择调用不同的函数,其他args参数作为调用具体函数的参数。返回结果以JSON格式输出到命令行。

代码语言:c#复制
private static void Main(string[] args)
{
    helper helper = new helper();
    string str = args[0];
    if (!(str == "1"))
    {
        if (!(str == "2"))
        {
            if (!(str == "3"))
            {
                if (str == "4")
                {
                    string startDate = args[1];
                    string endDate = args[2];
                    Console.WriteLine(JsonConvert.SerializeObject(helper.GetUserLogList(0, 0, startDate, endDate, 0, 2, "", "")));
                }
                return;
            }
            Console.WriteLine(JsonConvert.SerializeObject(helper.GetTargetValueList(0L, "0", DateTime.Now.Year.ToString(), DateTime.Now.Year.ToString())));
            return;
        }
    }
    else
    {
        Console.WriteLine(JsonConvert.SerializeObject(helper.GetRealTimeMonitorList(0L, "0", "F02,F03,F04,F05,F15,F16,F13,F08,F09,F07,F11,F10", 1)));
        return;
    }
    Console.WriteLine(JsonConvert.SerializeObject(helper.GetMinutesDataList(0, 0, 0L, "", "F02,F03,F04,F05,F15,F16,F13,F08,F09,F07,F11,F10", DateTime.Now.ToString("yyyy-MM-dd"), DateTime.Now.ToString("yyyy-MM-dd"))));
}

在Python来调用它,其实就是带上参数来运行上面的命令行程序来调用不同的函数,并获取返回数据。这时候使用的是

代码语言:python代码运行次数:0复制
# -*- coding:utf-8 -*-
import json
import subprocess


cmdPath='C#控制台程序的文件路径'
dictFactor1 = {'F02': 'cod', 'F03': 'nh', 'F04': 'p', 'F05': 'n'}
dictFactor2={'水质类别': 'level', 'COD': 'cod', '总磷': 'p', '氨氮': 'nh', '总氮': 'n'}

dictFactor11=dict(zip(dictFactor1.values(),dictFactor1.keys()))
dictFactor22=dict(zip(dictFactor2.values(),dictFactor2.keys()))

dictGrid={"1037068501":"1035","1037068502":"1042","1037068503":"1045",
          "1037068504":"1045","1037068506":"1045","1037068508":"1044",
          "1037068505":"1045","1037068507":"1044"}

global dictStation
global dictTarget




def getFactor1(r):
    d1=dict(map(lambda x:[x["ItemCode"],x["ItemValue"]],r))
    d2=dict(map(lambda x:[x[1],d1.get(x[0])],dictFactor1.items()))
    return d2

def getFactor2(r):
    d1=dict(map(lambda x:[x[1],r.get(x[0])],dictFactor1.items()))
    return d1

def getFactor3(r):
    d1=dict(map(lambda x:[x[1],r.get(x[0] "Target")],dictFactor1.items()))
    return d1
def getStation():
    p=subprocess.Popen([cmdPath,"1"],
                       stdin=subprocess.PIPE,stdout=subprocess.PIPE,stderr=subprocess.PIPE)
    result,stderr=p.communicate()
    hourData=json.loads(result.decode('gbk').strip())['SiteList']
    global dictStation
    dictStation=dict(map(lambda x:[x['SiteId'],{'SiteName':x['SiteName'],'SiteId':x['SiteId'],
                  'Longitude':x['Dongitude'],'Latitude':x['Dimensionality'],'lastHour':{},'lastMinute':{}}],hourData))
    return dictStation

def getTarget():
    p=subprocess.Popen([cmdPath,"3"],stdin=subprocess.PIPE,stdout=subprocess.PIPE,stderr=subprocess.PIPE)

    result,stderr=p.communicate()
    targetData=json.loads(result.decode('gbk').strip())
    global dictTarget
    dictTarget=dict(map(lambda x:[x['SiteId'],getFactor3(x)],targetData))
    return dictTarget

def getAlarm(data):
    alarms=[]      
    for i in data:
        for k,v in dictTarget[i['SiteId']].items():
            if(i['factors'][k]==''):
                continue
            value=float(str(i['factors'][k]).replace('-','0'))
            if(value>float(v)):
                alarms.append({'SiteId':i['SiteId'],'RecordTime':i['RecordTime'],'Factor':k,'Value':value,'Target':float(v)})
    return alarms

def getHourAlarm():
    p=subprocess.Popen([cmdPath,"1"],
                       stdin=subprocess.PIPE,stdout=subprocess.PIPE,stderr=subprocess.PIPE)
    result,stderr=p.communicate()
    hourData=json.loads(result.decode('gbk').strip())['SiteList']
    hourData=filter(lambda x:x.get('RecordTime'),hourData)
    hourData=list(map(lambda x:{'SiteName':x['SiteName'],'SiteId':x['SiteId'],'RecordTime':x['RecordTime'],
                  'Longitude':x['Dongitude'],'Latitude':x['Dimensionality'],
                  'factors':getFactor1(x['ItemDataList'])},hourData))
    #hourData=list(filter(lambda x :x['SiteId'] in ['37068506'],hourData))

    result=getAlarm(hourData)
    return result

def getMinuteAlarm():
    p=subprocess.Popen([cmdPath,"2"],stdin=subprocess.PIPE,stdout=subprocess.PIPE,stderr=subprocess.PIPE)

    result,stderr=p.communicate()
    minuteData=json.loads(result.decode('gbk').strip())['rows']
    
    minuteData=list(map(lambda x:{'SiteName':x['SiteName'],'SiteId':x['SiteID'],'RecordTime':x['MonitorTime'],
                            'factors':getFactor2(x)},minuteData))
    #minuteData=list(filter(lambda x :x['SiteId'] in ['37068506'],minuteData))
    result=getAlarm(minuteData)
    return result


def getLogs(start,end):
    p=subprocess.Popen([cmdPath,"4",start,end],stdin=subprocess.PIPE,stdout=subprocess.PIPE,stderr=subprocess.PIPE)
    result,stderr=p.communicate()

    result=json.loads(result.decode('gbk').strip())
    return result


            
        
 
    
  
    

通过subprocess来调用可以传入参数的命令行程序并获取返回结果。这样就可以将C#语言的不同的函数包装成了Python语言的不同函数。

0 人点赞