第十章· Logstash深入-Logstash与Redis那点事

2022-09-26 11:14:09 浏览数 (1)

  • Logstash将日志写入Redis

-曾老湿, 江湖人称曾老大。


-多年互联网运维工作经验,曾负责过大规模集群架构自动化运维管理工作。 -擅长Web集群架构与自动化运维,曾负责国内某大型金融公司运维工作。 -devops项目经理兼DBA。 -开发过一套自动化运维平台(功能如下): 1)整合了各个公有云API,自主创建云主机。 2)ELK自动化收集日志功能。 3)Saltstack自动化运维统一配置管理工具。 4)Git、Jenkins自动化代码上线及自动化测试平台。 5)堡垒机,连接Linux、Windows平台及日志审计。 6)SQL执行及审批流程。 7)慢查询日志分析web界面。


Logstash将日志写入Redis

为什么要使用Redis

在企业中,日志规模的量级远远超出我们的想象,这就是为什么会有一家公司日志易专门做日志收集,给大型金融公司收集日志,比如银行,因为你有可能看到,1秒钟好几千万的日志量,往服务器写入,那么企业中的集群,架构都不是单台的,而是多台的,一台如果是1千万,那么5台的量级,10台的量级,我们要对他们进行收集,进行分析,难免会在网络传输过程中,丢数据。

日志是什么? 日志对于企业来说,有什么作用? 用户使用我们的产品,体验如何? 用户的客诉,我们能拿出什么样的数据来说话? ...

一系列的问题,都和日志相关,如果至关重要的那个数据丢失了,那么公司的损失可不仅仅是一条日志那么简单。如果我们不知道,用户对我们产品最感兴趣的地方在哪,那么产品的寿命也就越来越短。如果被攻击了,恶意攻击的IP源我们都找不到,那么或许就不是产品的寿命越来越短,而是这个企业存在的寿命,越来越短。

好吧,一顿排比句,说的那么浮夸,说白了,我就是想要告诉你们,一个大规模日志量级的企业想要做到数据的安全性,数据的一致性,我们需要消息队列:Redis , Kafka,在ELK5版本中,建议使用Redis来做消息队列,Kafka能不能用?也能,只不过会有一些不必要的坑,需要我们去爬。在ELK6版本中,开始使用Kafka来做消息队列。

话不多说,我们接下来就开始将Logstash收集到的日志,输出到Redis中。


Redis部署

代码语言:javascript复制
#下载
[root@db04 ~]# wget http://download.redis.io/releases/redis-3.2.12.tar.gz
#解压
[root@db04 ~]# tar xf redis-3.2.12.tar.gz
#移动到指定目录
[root@db04 ~]# mv redis-3.2.12 /application/
#做软链接
[root@db04 ~]# ln -s /application/redis-3.2.12 /application/redis
#进入redis目录
[root@db04 ~]# cd /application/redis
#编译
[root@db04 redis]# make
#添加环境变量
[root@db04 redis]# vim /etc/profile.d/redis.sh
export PATH="/application/redis/src:$PATH"
#创建配置文件存放目录
[root@db04 ~]# mkdir -p /data/6379
#编辑redis配置文件
[root@db04 ~]# vim /data/6379/redis.conf
port 6379
daemonize yes
pidfile /data/6379/redis.pid
logfile "/data/6379/redis.log"
dbfilename dump.rdb
dir /data/6379
protected-mode no
requirepass  zls
#启动redis
[root@db04 ~]# redis-server /data/6379/redis.conf

Logstash收集日志输出至Redis

代码语言:javascript复制
#进入Logstash配置文件目录
[root@elkstack03 ~]# cd /etc/logstash/conf.d/
#编辑Logstash配置文件
[root@elkstack03 conf.d]# vim log_to_redis.conf
input {
  file {
    path => "/usr/local/tomcat/logs/tomcat_access_log.*.log"
    start_position => "end"
    type => "tc"
  }
  file {
    path => "/usr/local/nginx/logs/access_json.log"
    start_position => "end"
    type => "ngx"
    codec => json
  }
}

output {
  if [type] == "tc" {
    redis {
      data_type => "list"
      key => "tomcat_log"
      host => "10.0.0.54"
      port => "6379"
      db => "0"
      password => "zls"
   }
}
  if [type] == "ngx" {
    redis {
      data_type => "list"
      key => "nginx_log"
      host => "10.0.0.54"
      port => "6379"
      db => "1"
      password => "zls"
    }
  }
}
#启动Logstash
[root@elkstack03 conf.d]# /usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/log_to_redis.conf &

验证Redis数据

代码语言:javascript复制
#连接redis
[root@elkstack04 ~]# redis-cli -a zls
#在0库中查看所有key
127.0.0.1:6379> KEYS *
1) "tomcat_log"
#查看tomcat_log的长度(日志的条数)
127.0.0.1:6379> LLEN tomcat_log
(integer) 8
#切换1库
127.0.0.1:6379> SELECT 1
OK
#在1库中查看所有key
127.0.0.1:6379[1]> KEYS *
1) "nginx_log"
#查看nginx_log的长度(日志的条数)
127.0.0.1:6379[1]> LLEN nginx_log
(integer) 6

#演示Logstash如何取走一条tomcat日志
127.0.0.1:6379> LPOP tomcat_log
"{"path":"/usr/local/tomcat/logs/tomcat_access_log.2019-04-08.log","@timestamp":"2019-04-08T13:43:35.779Z","@version":"1","host":"0.0.0.0","message":"{\"clientip\":\"10.0.0.53\",\"ClientUser\":\"-\",\"authenticated\":\"-\",\"AccessTime\":\"[08/Apr/2019:21:43:34  0800]\",\"method\":\"GET / HTTP/1.1\",\"status\":\"304\",\"SendBytes\":\"-\",\"Query?string\":\"\",\"partner\":\"-\",\"AgentVersion\":\"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/73.0.3683.86 Safari/537.36\"}","type":"tc"}"
#再次查看长度
127.0.0.1:6379> Llen tomcat_log
(integer) 7


#演示Logstash如何取走一条nginx日志
127.0.0.1:6379[1]> LPOP nginx_log
"{"referer":"-","type":"ngx","http_host":"www.elk.com","url":"/index.html","path":"/usr/local/nginx/logs/access_json.log","upstreamhost":"-","@timestamp":"2019-04-08T13:43:19.000Z","size":0,"clientip":"10.0.0.53","domain":"www.elk.com","host":"10.0.0.53","@version":"1","responsetime":0.0,"xff":"10.0.0.1","upstreamtime":"-","status":"304"}"
#再次查看长度
127.0.0.1:6379[1]> LLEN nginx_log
(integer) 5


Logstash从Redis中取出日志输出到ES

代码语言:javascript复制
#进入Logstash配置文件目录
[root@elkstack03 ~]# cd /etc/logstash/conf.d/
#编辑Logstash配置文件
[root@elkstack03 conf.d]# vim redis_to_es.conf
input {
  redis {
    data_type => "list"
    key => "tomcat_log"
    host => "10.0.0.54"
    port => "6379"
    db => "0"
    password => "zls"
    codec => "json"
  }

  redis {
    data_type => "list"
    key => "nginx_log"
    host => "10.0.0.54"
    port => "6379"
    db => "1"
    password => "zls"
  }
}

output {
  if [type] == "tc" {
    elasticsearch {
      hosts => ["10.0.0.51:9200"]
      index => "m.elk.com-%{ YYYY.MM.dd}"
  }
}

  if [type] == "ngx" {
    elasticsearch {
      hosts => ["10.0.0.51:9200"]
      index => "www.elk.com-%{ YYYY.MM.dd}"
    }
  }
}
#启动Logstash
[root@elkstack03 conf.d]# /usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/redis_to_es.conf &

验证Logstash中的数据是否被取出

代码语言:javascript复制
#连接Redis
[root@elkstack04 ~]# redis-cli -a zls
#查看所有key
127.0.0.1:6379> KEYS *
(empty list or set)
#切换1库
127.0.0.1:6379> SELECT 1
OK
#查看所有key
127.0.0.1:6379[1]> KEYS *
(empty list or set)


在ES中查看数据

打开浏览器,访问:http://10.0.0.51:9100/


将ES索引添加到Kibana中

打开浏览器,访问:http://10.0.0.54:5601

查看Kibana数据

Redis key堆积监控

实际环境当中,可能会出现reids当中堆积了大量的数据而logstash由于种种原因未能及时提取日志,此时会导致redis服务器的内存被大量使用,甚至出现如下内存即将被使用完毕的情景.

代码语言:javascript复制
[root@elkstack01 ~]# vim redis_keylenth.py
#!/usr/bin/env python
#coding:utf-8
#Author Driver_Zeng
import redis
def redis_conn():
    pool=redis.ConnectionPool(host="10.0.0.54",port=6379,db=2,password='zls')
    conn = redis.Redis(connection_pool=pool)
    data = conn.llen('tn')
    print(data)
redis_conn()

[root@elkstack01 ~]# python3 redis_keylenth.py
259

0 人点赞