最近优化一个iot系统。系统未来的需求是比当前多4个数量级的设备接入量,因此打算用时序数据库CTSDB和云函数来大幅度提升数据处理能力,写了一个简单的云函数测试了一下CTSDB的主要读写接口,同时用ab压测了一下性能。云函数和时序数据库的性能表现都很给力,一个乞丐版的数据库最低配置轻松就达到过万QPS的写能力。
代码语言:javascript复制'use strict';
const request = require('request'),
db_username=process.env.DB_USERNAME,
db_password = process.env.DB_PWD,
db_ip = process.env.DB_IP,
db_port = process.env.DB_PORT,
db_url = "http://" db_username ":" db_password "@" db_ip ":" db_port,
moveCount = parseInt(process.env.MOVE_COUNT),
metricMap = {
"tags":
{
"yakID":"string",
"pastureID":"string",
"MAC":"string"
},
"time":
{
"name":"timestamp",
"format":"epoch_millis"
},
"fields":
{
"range1":"short",
"range2":"short",
"range3":"short",
"range4":"short",
"range5":"short"
},
"options":
{
"expire_day":100,
"refresh_interval":"10s",
"number_of_shards":24
}
};
let lastRecord;
exports.main_handler = async (event, context, callback) => {
var metricName = "yakMove";
var url = db_url "/_metric/" metricName,res;
var log = [];
//获取CTSDB基本信息
res = await req("get",{url:db_url,headers:{"content-type": "application/json"}})
console.log(JSON.stringify(res))
log.push("兼容ElasticSearch版本:" res.version.number);
log.push("lucene版本" res.version.lucene_version);
/** */
if((await req("get",{url:url,headers:{"content-type": "application/json"}})).status==200){
log.push("metric已经存在");
//删除metric
res = await req("delete",{
url:url,
headers:{"content-type": "application/json"}
})
log.push("删除metric" metricName "操作结果" (res.message?res.message:res.reason))
}
/**/
if((await req("get",{url:url,headers:{"content-type": "application/json"}})).status!=200){
log.push("metri不存在");
//创建metric
res = await req("put",{
url:url,
headers:{"content-type": "application/json"} ,
body:JSON.stringify(metricMap)
})
log.push("新增metric" metricName "操作结果" (res.message?res.message:res.reason))
}else{
log.push("metric " metricName " 已经存在,测试插入" moveCount "条数据")
}
url = db_url "/" metricName "/doc/_bulk"
//生成随机数据
var data = genBulkData(moveCount);
var d=new Date()
//插入数据到metric中
res = await req("post",{
url:url,
headers:{"content-type": "application/json"} ,
body:data
});
console.log("操作结果" JSON.stringify(res))
if(res.error){
console.log("出错了!");
log.push("出现错误:" JSON.stringify(res.error) " 导致批量失败")
}else{
if(res.errors){
for(var i=0;i<res.items.length;i ){
if(res.items[i].index.error){
log.push("出现错误:" JSON.stringify(res.items[i].index.error))
}
}
}else{
console.log("全部写入成功");
}
log.push("插入" moveCount "条记录用时" (new Date()-d) "毫秒,数据库用时" res.took "毫秒,平均每条" res.took/moveCount "毫秒");
}
console.log("n" log.join("n"))
callback(null,log.join(" "))
/**/
};
function req(method,reqbody){
return new Promise(resolve => {
request[method](reqbody,(error, response, data)=>{
resolve(JSON.parse(data));
});
})
}
var counter=0
function getRandomMove(){
return {
"yakID":Math.round(Math.random()*100000000),
"pastureID":Math.round(Math.random()*100000000),
"MAC":"00 00 00 00 00 00",
"timestamp":new Date().getTime(),
"range1":(counter == 1)?(99999999999999999999999999999):Math.round(Math.random()*1000),
"range2":Math.round(Math.random()*2000),
"range3":Math.round(Math.random()*3000),
"range4":Math.round(Math.random()*2000),
"range5":Math.round(Math.random()*1000)
}
}
function genBulkData(n){
var result = []
for(var i=0;i<n;i ){
result.push('{"index":{}}');
result.push(JSON.stringify(getRandomMove()))
}
lastRecord = result[n-1];
console.log("last inserted record:" JSON.stringify(lastRecord))
return result.join("n") "n"
}