脚本思路来自rsbeat,网上有优化过的版本代码 https://github.com/154650362/rsbeat
脚本主要分3步
代码语言:javascript复制# 1 slowlog get N
# 2 send data to ELK
# 3 slowlog reset
代码概要如下:
代码语言:javascript复制# -*- coding: utf-8 -*-
# 轮询采集Redis的slowlog并写上报到ELK中 (参考rsbeat,将daemonset模式改为脚本轮询)
# 步骤:
# 1 slowlog get N
# 2 send data to ELK
# 3 slowlog reset
# TIPS 我这里是集成到django项目里的,因为用了很多的django里面定的变量。如果要拆成单独脚本稍微改改就可以了。
import json
import os
import sys
from elasticsearch import Elasticsearch
import redis
import datetime
import pytz
project_path = os.path.abspath("../..")
sys.path.append(project_path)
os.environ["DJANGO_SETTINGS_MODULE"] = "django_demo.settings"
import django
django.setup()
from django_demo import configs
from utils import db_conn
from utils.simple_encrtypt_decryption import decrypt
es = Elasticsearch(configs.ELK_ES_HOST)
def send_log_to_elk():
# 查数据库,找到全部的redis节点信息
mydb = db_conn.conn_db(
host=configs.DB_HOST,
port=configs.DB_PORT,
user=configs.DB_USER,
passwd=configs.DB_PASSWORD,
)
mycursor = mydb.cursor()
# 只关注生产的Redis实例
query_sql = """
SELECT
instance_cluster,
instance_desc,
instance_addr,
instance_port,
instance_role,
redis_password
FROM
django_demo.db_instance
WHERE
instance_engine = 'Redis'
AND instance_env = '生产'
;
"""
mycursor.execute(query_sql)
myresult = mycursor.fetchall()
for i in myresult:
(
instance_cluster,
instance_desc,
instance_addr,
instance_port,
instance_role,
redis_password,
) = i
# 如果redis是有密码的,则先解密下
if redis_password:
plain_redis_password = decrypt(i[5])
r = redis.Redis(
host=str(i[2]),
port=int(i[3]),
decode_responses=False,
socket_connect_timeout=2,
password=plain_redis_password,
)
else:
r = redis.Redis(
host=str(i[2]),
port=int(i[3]),
decode_responses=False,
socket_connect_timeout=2,
password="",
)
slowlogs = r.slowlog_get(200) # 这个值改大点
for slowlog in slowlogs:
ts = slowlog["start_time"]
# 这里转成UTC时间格式,便于Kibana展示
action_time = datetime.datetime.fromtimestamp(ts, pytz.UTC).strftime("%Y-%m-%dT%H:%M:%SZ")
duration_ms = int(slowlog["duration"]) / 1000 # 微秒转成毫秒
cmd = slowlog[
"command"
]
content = {
"@timestamp":str(action_time),
"instance_cluster":str(i[0]),
"instance_desc":str(i[1]),
"instance_role":str(i[4]),
"host_port":str(i[2]) "_" str(i[3]),
"duration_ms": duration_ms,
"cmd":str(cmd),
}
# print(content)
# 2 将扫到的redis slowlog 写入到ELK中
response = es.index(index="redis_slowlog", body=json.dumps(content))
print(response)
# 3 将源端redis 慢日志清掉
res3 = r.slowlog_reset()
print(res3)
if __name__ == "__main__":
send_log_to_elk()
ES里面的索引和mapping需要提前创建好
1、首次需要到ELK上去创建索引并设置mapping (注意 @timestamp设置为date类型)
代码语言:javascript复制PUT redis_slowlog
{
"mappings":{
"properties":{
"@timestamp":{
"type":"date"
},
"instance_cluster":{
"type":"keyword"
},
"instance_desc":{
"type":"keyword"
},
"host_port":{
"type":"text"
},
"duration_ms":{
"type":"float"
},
"cmd":{
"type":"text"
},
"instance_role":{
"type":"text"
}
}
}
}
命令行方式的查询(注意用UTC时间)
代码语言:javascript复制POST redis_slowlog/_search
{
"query":{
"bool":{
"must":[
{
"range":{
"@timestamp":{
"gte":"2023-09-10T05:11:11Z",
"lte":"2023-09-11T11:35:31Z"
}
}
},
{
"match":{
"cmd":{
"query":"celery",
"fuzziness":"AUTO"
}
}
}
]
}
}
}
定时任务,我这里用的是django celery。 如果是独立脚本的话,用linux的crontab也可以。
kibana看板最终效果类似如下: