采集Redis的慢查询到ELK展示

2023-09-11 15:02:20 浏览数 (1)

脚本思路来自rsbeat,网上有优化过的版本代码 https://github.com/154650362/rsbeat

脚本主要分3步

代码语言:javascript复制
# 1 slowlog get N
# 2 send data to ELK
# 3 slowlog reset

代码概要如下:

代码语言:javascript复制
# -*- coding: utf-8 -*-
# 轮询采集Redis的slowlog并写上报到ELK中 (参考rsbeat,将daemonset模式改为脚本轮询)
# 步骤:
# 1 slowlog get N
# 2 send data to ELK
# 3 slowlog reset

# TIPS 我这里是集成到django项目里的,因为用了很多的django里面定的变量。如果要拆成单独脚本稍微改改就可以了。


import json
import os
import sys
from elasticsearch import Elasticsearch
import redis
import datetime
import pytz

project_path = os.path.abspath("../..")
sys.path.append(project_path)
os.environ["DJANGO_SETTINGS_MODULE"] = "django_demo.settings"

import django

django.setup()
from django_demo import configs
from utils import db_conn
from utils.simple_encrtypt_decryption import decrypt


es = Elasticsearch(configs.ELK_ES_HOST)

def send_log_to_elk():

    # 查数据库,找到全部的redis节点信息
    mydb = db_conn.conn_db(
        host=configs.DB_HOST,
        port=configs.DB_PORT,
        user=configs.DB_USER,
        passwd=configs.DB_PASSWORD,
    )

    mycursor = mydb.cursor()

    # 只关注生产的Redis实例
    query_sql = """
        SELECT
            instance_cluster,
            instance_desc,
            instance_addr,
            instance_port,
            instance_role,
            redis_password 
        FROM
            django_demo.db_instance 
        WHERE
            instance_engine = 'Redis' 
            AND instance_env = '生产'
            ;
            """

    mycursor.execute(query_sql)
    myresult = mycursor.fetchall()

    for i in myresult:
        (
            instance_cluster,
            instance_desc,
            instance_addr,
            instance_port,
            instance_role,
            redis_password,
        ) = i
        
        # 如果redis是有密码的,则先解密下
        if redis_password:
            plain_redis_password = decrypt(i[5])
            r = redis.Redis(
                host=str(i[2]),
                port=int(i[3]),
                decode_responses=False,
                socket_connect_timeout=2,
                password=plain_redis_password,
            )
        else:
            r = redis.Redis(
                host=str(i[2]),
                port=int(i[3]),
                decode_responses=False,
                socket_connect_timeout=2,
                password="",
            )

        slowlogs = r.slowlog_get(200)  # 这个值改大点
        for slowlog in slowlogs:
            ts = slowlog["start_time"]
            # 这里转成UTC时间格式,便于Kibana展示
            action_time = datetime.datetime.fromtimestamp(ts, pytz.UTC).strftime("%Y-%m-%dT%H:%M:%SZ")
            duration_ms = int(slowlog["duration"]) / 1000  # 微秒转成毫秒

            cmd = slowlog[
                "command"
            ]

            content = {
                "@timestamp":str(action_time),
                "instance_cluster":str(i[0]),
                "instance_desc":str(i[1]),
                "instance_role":str(i[4]),
                "host_port":str(i[2])   "_"   str(i[3]),
                "duration_ms": duration_ms,
                "cmd":str(cmd),
            }
            # print(content)

            # 2 将扫到的redis slowlog 写入到ELK中
            response = es.index(index="redis_slowlog", body=json.dumps(content))
            print(response)

        # 3 将源端redis 慢日志清掉
        res3 = r.slowlog_reset()
        print(res3)

if __name__ == "__main__":
    send_log_to_elk()

ES里面的索引和mapping需要提前创建好

1、首次需要到ELK上去创建索引并设置mapping (注意 @timestamp设置为date类型)

代码语言:javascript复制
PUT redis_slowlog
{
    "mappings":{
        "properties":{
            "@timestamp":{
                "type":"date"
            },
            "instance_cluster":{
                "type":"keyword"
            },
            "instance_desc":{
                "type":"keyword"
            },
            "host_port":{
                "type":"text"
            },
            "duration_ms":{
                "type":"float"
            },
            "cmd":{
                "type":"text"
            },
            "instance_role":{
                "type":"text"
            }
        }
    }
}

命令行方式的查询(注意用UTC时间)

代码语言:javascript复制
POST redis_slowlog/_search
{
    "query":{
        "bool":{
            "must":[
                {
                    "range":{
                        "@timestamp":{
                            "gte":"2023-09-10T05:11:11Z",
                            "lte":"2023-09-11T11:35:31Z"
                        }
                    }
                },
                {
                    "match":{
                        "cmd":{
                            "query":"celery",
                            "fuzziness":"AUTO"
                        }
                    }
                }
            ]
        }
    }
}

定时任务,我这里用的是django celery。 如果是独立脚本的话,用linux的crontab也可以。

kibana看板最终效果类似如下:

0 人点赞