用云函数操作时序数据库CTSDB

2020-04-13 11:22:56 浏览数 (1)

最近优化一个iot系统。系统未来的需求是比当前多4个数量级的设备接入量,因此打算用时序数据库CTSDB和云函数来大幅度提升数据处理能力,写了一个简单的云函数测试了一下CTSDB的主要读写接口,同时用ab压测了一下性能。云函数和时序数据库的性能表现都很给力,一个乞丐版的数据库最低配置轻松就达到过万QPS的写能力。

代码语言:javascript复制
'use strict';
const request = require('request'),
db_username=process.env.DB_USERNAME,
db_password = process.env.DB_PWD,
db_ip = process.env.DB_IP,
db_port = process.env.DB_PORT,
db_url = "http://" db_username ":" db_password "@" db_ip ":" db_port,
moveCount = parseInt(process.env.MOVE_COUNT),
metricMap = {
	"tags":
	{
		"yakID":"string",
		"pastureID":"string",
		"MAC":"string"
	},
	"time":
	{
		"name":"timestamp",
		"format":"epoch_millis"
	},
	"fields":
	{
		"range1":"short",
		"range2":"short",
		"range3":"short",
		"range4":"short",
		"range5":"short"
	},
	"options":
	{
		"expire_day":100,
		"refresh_interval":"10s",
		"number_of_shards":24
	}
};
let lastRecord;
exports.main_handler = async (event, context, callback) => {
	var metricName = "yakMove";
	var url = db_url "/_metric/" metricName,res;
    var log = [];
	//获取CTSDB基本信息
	res = await req("get",{url:db_url,headers:{"content-type": "application/json"}})
	console.log(JSON.stringify(res))
    log.push("兼容ElasticSearch版本:" res.version.number);
    log.push("lucene版本" res.version.lucene_version);

	/** */
	if((await req("get",{url:url,headers:{"content-type": "application/json"}})).status==200){
		log.push("metric已经存在");
		//删除metric
		res = await req("delete",{
			url:url,
			headers:{"content-type": "application/json"} 
		})
		log.push("删除metric" metricName "操作结果" (res.message?res.message:res.reason))
	}
/**/
	if((await req("get",{url:url,headers:{"content-type": "application/json"}})).status!=200){
		log.push("metri不存在");
		//创建metric
		res = await req("put",{
			url:url,
			headers:{"content-type": "application/json"} ,
			body:JSON.stringify(metricMap)
		})
		log.push("新增metric" metricName "操作结果" (res.message?res.message:res.reason))
	}else{
		log.push("metric " metricName " 已经存在,测试插入" moveCount "条数据")
	}

	url = db_url "/" metricName "/doc/_bulk"
	//生成随机数据
	var data = genBulkData(moveCount);
	var d=new Date()
	//插入数据到metric中
	res = await req("post",{
		url:url,
		headers:{"content-type": "application/json"} ,
		body:data
	});
	console.log("操作结果" JSON.stringify(res))
	if(res.error){
		console.log("出错了!");
		log.push("出现错误:" JSON.stringify(res.error) " 导致批量失败")		
	}else{
		if(res.errors){
			for(var i=0;i<res.items.length;i  ){
				if(res.items[i].index.error){
					log.push("出现错误:" JSON.stringify(res.items[i].index.error))
				}
			}			
		}else{
			console.log("全部写入成功");
		}
		log.push("插入" moveCount "条记录用时" (new Date()-d) "毫秒,数据库用时" res.took "毫秒,平均每条" res.took/moveCount "毫秒");
	}
	console.log("n" log.join("n"))
	callback(null,log.join(" "))
	/**/
};
function req(method,reqbody){
	return new Promise(resolve => {
		request[method](reqbody,(error, response, data)=>{
			resolve(JSON.parse(data));
		});
	})
}
var counter=0
function getRandomMove(){
	
	return {
		"yakID":Math.round(Math.random()*100000000),
		"pastureID":Math.round(Math.random()*100000000),
		"MAC":"00 00 00 00 00 00",
		"timestamp":new Date().getTime(),
		"range1":(counter   == 1)?(99999999999999999999999999999):Math.round(Math.random()*1000),
		"range2":Math.round(Math.random()*2000),
		"range3":Math.round(Math.random()*3000),
		"range4":Math.round(Math.random()*2000),
		"range5":Math.round(Math.random()*1000)		
	}
}
function genBulkData(n){
	var result = []
	for(var i=0;i<n;i  ){
		result.push('{"index":{}}');
		result.push(JSON.stringify(getRandomMove()))
	}
	lastRecord = result[n-1];
	console.log("last inserted record:" JSON.stringify(lastRecord))
	return result.join("n") "n"
}

0 人点赞