ELK日志收集分析系统配置

2018-03-05 14:15:24 浏览数 (1)

ELK是日志收益与分析的利器。

1、elasticsearch集群搭建

2、logstash日志收集

我这里的实现分如下2步,中间用redis队列做缓冲,可以有效的避免es压力过大:

1、n个agent对n个服务的log做日志收集(1对1的方式),从日志文件解析数据,存入broker,这里用的是redis的发布订阅模式的消息队列,当然你可以选用kafka,redis比较方便;

2、indexer做日志汇总,从redis队列中拿数据入es;

下面给出agent和index的配置示例:

1、driver_schedule.conf

代码语言:javascript复制
input {
  file {
      #这是日志路径
      path => [
          "/home/xiaoju/driver-schedule-api/logs/driver-schedule-api.info.*",
          "/home/xiaoju/driver-schedule-api/logs/driver-schedule-api.error.*"
        ]
      #排除路径,支持glob展开,但是不递归
      exclude => [
          "access.*"
      ]
      #开始位置,beginning从日志开始读取
      start_position => "beginning"
      #sincedb指示的文件,记录日志读取位置
      sincedb_path => "/home/xiaoju/yangfan/local/logstash-1.4.2/sincedb/driver_schedule_progress"
      #添加记录字段
      add_field => {
          "server" => "driver_schedule"
      }
      #编码器,正则pattern多行合并
      codec => multiline {
          pattern => "^d :d "
          negate => true
          what => "previous"
      }
  }
}

filter {
    #匹配路径中包涵info
    if [path] =~ "info" {
        #mutate更改值
        mutate {
            replace => { "type" => "info" }
        }
        grok {
            match => { "message" => "%{COMBINEDAPACHELOG}" }
        }
    }else if [path] =~ "error" {
        mutate {
            replace => { "type" => "error" }
        }
    } else {
        mutate { replace => { "type" => "unknow" } }
    }

    date {
        match => [ "timestamp" , "dd/MMM/yyyy:HH:mm:ss Z" ]
    }
}

output {
    #debug格式化打印
    #stdout{codec => rubydebug}
    redis {
        host => "10.94.99.55"
        #这里用的是redis订阅模式,对应indexer的data_type应是pattern_channel
        data_type => "channel"
        port => 6379
        db => 5
        key => "logstash:%{[server]}"
    }
}

run起来:

代码语言:javascript复制
nohup ./bin/logstash -f ./conf/agent/driver_schedule.conf &

2、indexer.conf

代码语言:javascript复制
input {
    redis {
        host => "10.94.99.55"
        port => 6379
        db => 5
        #如果这里选择了pattern_channel, 采用的是redis的订阅方式, agent里data_type就要对应channel
        data_type => "pattern_channel"
        #这是所有的key的匹配pattern
        key => "logstash:*"
    }
}

output {
    elasticsearch {
        embedded => false
        protocol => "http"
        host => "10.94.99.56"
        port => 9211
        #配置es索引名字
        index => "%{[server]}"
        #配置es索引类型
        index_type => "%{[type]}"
    }
    #debug使用, 格式化打印
    #stdout{codec => rubydebug}
}

run起来:

代码语言:javascript复制
nohup ./bin/logstash -f ./conf/indexer/indexer.conf &

3、kibana配置

网上教程比较多,这里我只mark一些问题的解决方法:

1、connection failure: 

checklist:

1、配置kibana的config.js里的es地址

2、如果es版本>1.4则需要在es的配置里加入

http.cors.allow-origin: "/.*/"

http.cors.enabled: true

注意事项:

1、ES和logstash最好选用相同大版本,不然可能写不进去

2、logstash会写一个syncsys的文件,记录上次读取文件到什么地方

0 人点赞