用云函数快速批量处理COS里面的日志

2022-03-10 15:13:20 浏览数 (2)

本来CLS日志服务可以直接触发云函数来快速处理的,不过这样触发次数就有点多了,比如说被处理的日志本来就是云函数生成的,那么函数触发次数就直接要翻番,如果日志不是需要及时处理的话,可以让它在CLS里面累计上几分钟,然后用定时器触发一个云函数,通过日志服务查询接口 SearchLog 来实现批量处理。

但是这样做有个坑,如果这几分钟的log条数超过100条的话,我们可以把Limit放到最大1000,如果超过1000条的话,按照文档的说明,应该通过传递Context的方式来实现翻页,最多可以翻出来10000条。然而,文档上说的操作根本是无法实现的,因为如果你翻页查询的时候传递了Query参数,那就会被认为你要进行一次新的查询,然后给你返回第一页,即使Query参数和上一次查询一模一样也没有用。如果你不传Query参数只传Context参数呢,那你只会收到一个缺少Query参数的错误,因为Query是必选参数。

就算你用SQL查询和SQL翻页的方式(通过给SQL传递offset和limit),如果这几分钟的log超过了一万条,你还是没辙。所以更好的方式可能是每分钟通过 日志下载接口 来下载指定时间段的日志处理,或者把日志自动投递到COS,用COS的创建文件事件做触发器来触发云函数执行,然后把日志文件下载过来批量处理。

当日志非常多的时候,通过日志下载接口需要自行处理分包的问题,用投递COS的方式处理的话分包也是自动处理的,代码逻辑会更简单一些。但是CLS投递到COS的最短周期是5分钟,但是实际上一个日志从生成到打包、上传,触发处理,可能要经过接近10分钟,如果需要更及时的处理数据的话,只要确保文件不会大到需要分包,用定时器来触发可能更合适。

这样一个日志文件可能会非常大,如果整个文件读到内存中处理的话需要给云函数申请足够多的内存。更好的方式是用流的方式来处理。因为COS的sdk可以把文件读成流,日志下载接口生成的日志文件也可以用request读成流。这样即使文件非常大,也可以通过流处理的方式进行实时解压(因为日志打包的时候会被强制自动压缩),并对流进行实时解析,实现高效的日志处理:

代码语言:javascript复制
'use strict';
const zlib = require("zlib"),
	readLine = require('readline'),
	COS = require('cos-nodejs-sdk-v5'),
	cos = new COS({
		SecretId: process.env.SECRET_ID,
		SecretKey: process.env.SECRET_KEY,
		KeepAlive: false
	}),
	tencentcloud = require("/opt/node_modules/tencentcloud-sdk-nodejs"), //node14之前的版本的内置SDK不支持cls,需要下载新的sdk用层的方式覆盖进去并用这个方式引用
	ClsClient = tencentcloud.cls.v20201016.Client,
	clientConfig = {
		credential: {
			secretId: process.env.SECRET_ID,
			secretKey: process.env.SECRET_KEY,
		},
		region: process.env.TENCENTCLOUD_REGION,
		profile: {
			httpProfile: {
				endpoint: "cls.tencentcloudapi.com",
			},
		},
	},
	downloadCycle = 60000, //下载周期
	request = require('request');
exports.main_handler = (event, context, callback) => {
	let gunzip = zlib.createGunzip();
	let jsonCount = 0,
		invalidLines = 0;
	let rl = readLine.createInterface(gunzip);
	rl.on('line', function(line) {
		if (/^s*{.*}s*$/.test(line)) {
			jsonCount  ; //收到的行数据似乎是JSON数据
		} else {
			invalidLines  ; //收到的一行似乎不是JSON数据
		}
	})
	rl.on('close', () => {
		//因为是demo,这里没有等待所有的可能的并发流都处理完再回调
		//实际使用的时候应该Promise.all或者用异步方式逐个流处理完再回调。
		callback(null, "有效行数: "   jsonCount   " ,无效行数:"   invalidLines)
	})
	if ("Type" in event && event.Type == "Timer") { //定时器触发
		let client = new ClsClient(clientConfig);
		let params = {
			"TopicId": process.env.TopicId,
			"Query": "SCF_Namespace:""   process.env.SCF_NAMESPACE   """,
			"Count": 10000000, //最大一次下载1000万条。如果一个周期内有可能超过的话需要考虑分包或者该用cos投递
			"From": (Math.floor(Date.now() / downloadCycle - 1) * downloadCycle),
			"To": (Math.floor(Date.now() / downloadCycle) * downloadCycle) //触发点之前的一个完整的时间周期
		};
		//生成日志打包任务,定时器下次触发的时候下载处理
		client.CreateExport(params).then(res => {
			if ("ExportId" in res) {
				console.log("导出任务发起成功,生成的文件等待定时器下次触发的时候处理");
				client.DescribeExports({
					"TopicId": process.env.TopicId
				}).then(res => {
					if ("Exports" in res && res.Exports instanceof Array) {
						for (let i = 0; i < res.Exports.length; i  ) {
							let e = res.Exports[i];
							if (e.Status == "Completed") {
								request(e.CosPath).pipe(gunzip);
								client.DeleteExport({"ExportId": e.ExportId});//如果要更保险一点,放到readline的close后面删除更好
							}
						}
					}
				})
			} else {
				console.log("导出任务发起失败,需要告警并走重试逻辑");
			}
		})
	} else if ("Records" in event && event.Records instanceof Array) { //COS触发
		let records = event.Records;
		for (let i = 0; i < records.length; i  ) {
			let c = records[i].cos;
			console.log("下载并分析日志文件 "   c.cosObject.key)
			cos.getObject({
				Bucket: c.cosBucket.name   "-"   c.cosBucket.appid,
				Region: c.cosBucket.cosRegion,
				Key: "/"   c.cosObject.key.split("/").slice(3).join("/"),
				Output: gunzip
				//            Output: "/tmp/test.gz"
			}, function(err, data) {
				if (err) console.log("err:"   JSON.stringify(err))
			});
		}
	} else {
		return "未知触发器"
	}
};

0 人点赞