ELK 简介
ELK 是一般被称作日志分析系统,是三款开源软件的简称。通常在业务服务上线后我们会部署一套 ELK 系统,方便我们通过图形化界面直接查找日志,快速找到问题源并帮助解决问题。
Elasticsearch
Elasticsearch 代表 ELK 中的 E,通常简称为 ES 。它是一个分布式 RESTful 风格的搜索和数据分析引擎,提供非常多的功能包括存储,搜索以及分析数据。具体的介绍可以查看官网:Elasticsearch(https://www.elastic.co/guide/en/elasticsearch/reference/current/getting-started.html)。
Logstash
Logstash 是开源的服务器端数据处理管道,能够同时从多个来源采集数据、格式化数据,然后将数据发送到相应的地方。详细介绍请访问:Logstash(https://www.elastic.co/guide/en/logstash/current/introduction.html)。
Kibana
Kibana 能够让我们使用可视化的方式操作 Elasticsearch 中的数据。详细介绍请访问:Kibana(https://www.elastic.co/guide/en/kibana/current/introduction.html)。
Beats:Filebeat
通常 ELK 可以使用 Logstash 直接收集日志源作为 Beats,是一类轻量级数据采集器的统称,Elastic 提供了非常多的 Beats 选择,我们使用的主要是操作文件的 Filebeat(https://www.elastic.co/guide/en/beats/filebeat/current/filebeat-overview.html),用于转发和汇总日志文件,Filebeat 中最主要的两个组件为 prospectors(https://www.elastic.co/guide/en/beats/filebeat/current/how-filebeat-works.html#prospector) 和 harvesters(https://www.elastic.co/guide/en/beats/filebeat/current/how-filebeat-works.html#harvester) 。更加详细的介绍请访问:Filebeat(https://www.elastic.co/guide/en/beats/filebeat/current/filebeat-overview.html)。
传统方式的对比
通常中小公司技术发展历程是从“单机大服务”到“多机微服务”这种模式(通常是先在市场中活下来再革了自己的命)。如果说传统的方式是人员少,为了业务发展而快速响应,所以一般服务实例会少,日志文件会比较集中,服务器资源也会相对有限;而业务发展起来之后,技术开始革命,服务开始拆分,日志文件数量根据服务实例数量动态变化,并且多分散在各不同的服务器中。
传统方式
- 优势:关键字查询,日志文件数量相对较少,业务少的前期,能快速响应,适合小团体;
- 缺点:无法多维度查询,不能直观查看信息,必须下载日志文件(或者 SSH 服务器进行操作),不适合团队。
ELK 方式
- 优势:统一查询入口,多维度查询与统计,适合团队;
- 缺点:不适合业务前期,需要额外的硬件资源。
ELK 系统架构图
ELK 系统架构,如下图所示。
工作流程如下:
- Filebeat 定时监控并收集每个服务的日志信息;
- Logstash 把格式化日志信息发送到 ES 中进行存储,同时发送到监控预警服务进行处理;
- 监控中心处理日志内容,配置相应策略通过邮件或者即时通讯方式告知开发人员;
- Kibana 结合 ES 提供的搜索功能进行查询,使用 Kibana 自带的图表功能进行统计。
采集:获取多个服务器中的日志
在每台业务服务器上面通过部署 Filebeat,配置相关的 Filebeat 参数收集器采集日志,然后发送到 Logstash 进行日志的过滤。Filebeat 收集日志的方式是按行来读取,在 Filebeat 中进行的配置 filebeat.yml 如下:
代码语言:javascript复制filebeat.prospectors:
- type: log
enabled: true # 开关
paths: # 日志文件路径,可以用用通配符
- /var/log/customer-info.log #
#- c:programdataelasticsearchlogs* 如果是windows服务器,用这个路径
multiline: # 日志多行处理,列如java的堆栈信息
pattern: ^d{4} # 匹配前缀为数字开头,如果不是日期,该行日志接到上一行后尾
negate: true
match: after
fields: # 自定义属性,用于 Logstash 中
service_name: customer # 产生日志的服务名
log_type: info # 日志文件类型
server_id: ip-address # 服务器ip地址
scan_frequency: 50 #扫描间隔,单位为秒;设置太小会引起filebeat频繁扫描文件,导致cpu占用百分比过高- type: log
enabled: true
paths:
- /var/log/customer-error.log
multiline:
pattern: ^d{4}
negate: true
match: after
fields:
service_name: customer
log_type: error
server_id: 127.0.0.1
scan_frequency: 60
output.logstash: # 输出到logstash的安装位置,以及监听的端口
hosts: ["129.1.7.203:5043"]
启动 filebeat 的命令为:./filebeat -e -c config/filebeat.yml
。
为什么使用 Filebeat 而不是直接使用 Logstash 来收集日志?原因有以下几点。
- Filebeat 更加的轻量级,Logstash 占用更多的系统资源,如果在每个服务器上部署 Logstash,有时候会影响到业务服务,导致服务响应缓慢;
- Filebeat 能够记录文件状态,文件状态记录在文件中(~FILEBEAT_HOME/data/registry)。此状态可以记住 harvesters 收集文件的偏移量,重启后 prospectors 能知道每个日志文件的记录状态再进行收集文件。;
- Filebeat 保证至少有一次输出,因为 Filebeat 将每个事件的传递状态保存在文件中。在没有得到接收方确认时,会尝试一直发送,直到得到回应。
传输:将日志数据传送给中央处理系统
Logstash 监控 Beats 源并且将 Beats 的数据进行过滤处理,Logstash 的优势是有非常丰富的插件提供使用。Logstash 的工作模式如下:
当输入插件监听到 beats 传过来数据时,使用过滤插件进行信息的过滤或者格式话处理,之后再通过输出插件输出到 ES 或者其它地方。Logstash 的配置文件 logstash.conf(如果没有直接在 config 目录下新建)如下:
代码语言:javascript复制input { # 指定输入源-beats
beats {
host => "localhost"
port => "5043"
}
}
filter { # 日志格式化,使用 grok 插件
if [fields][log_type] == 'error' { # 如果是error类型的日志该怎么处理,在filebeat 的fields中定义
grok { # 使用 grok 插件进行一整条日志信息格式成key-value信息
match => { "message" => "%{TIMESTAMP_ISO8601:logdate} %{LOGLEVEL:loglevel} %{JAVACLASS:class} %{NUMBER:thread} %{JAVALOGMESSAGE:logmsg}" } # 这里采用的是grok预制的一些正则,":"后面是我们自定义的key
}
date { # 将 kibana 的查询时间改成日志的打印时间,方便之后查询,如果不改的话,kibana会有自己的时间,导致查询不方便
match => ["logdate", "yyyy-MM-dd HH:mm:ss Z", "ISO8601"]
target => "@timestamp"
}
} if [fields][log_type] == 'info' { # 如果是info类型该怎么格式,这里是重复的,如果有日志格式不一样比如nginx的日志类型,可以在这里自己定义
grok {
match => { "message" => "%{TIMESTAMP_ISO8601:logdate} %{LOGLEVEL:loglevel} %{JAVACLASS:class} %{NUMBER:thread} %{JAVALOGMESSAGE:logmsg}" }
}
date {
match => ["logdate", "yyyy-MM-dd HH:mm:ss Z", "ISO8601"]
target => "@timestamp"
}
}
}
output { # 输出设置
stdout { # 输出到控制台
codec => rubydebug
}
http { # 输出到日志处理中心,日志处理中心是团队内部开发项目
http_method => "post"
format => "message"
url => "http://localhost:8388/ding/notify"
content_type => "application/json"
message => '{"content": "%{message}","path":"%{source}","ip":"%{[fields][server_id]}"}'
# }
elasticsearch { # 输出到elasticsearch,提供给kibana进行搜索
hosts => [ "localhost:9200" ]
index => "%{[fields][service_name]}-%{ YYYY.MM.dd}" # 在es中存储的索引格式,按照“服务名-日期”进行索引
} if "ERROR" == [loglevel] { # 如果是ERROR日志单独发一份邮件给管理者,在写了日志处理中心后这里可以去掉,然后交给日志处理中心处理
email {
to => "email_me@163.com"
via => "smtp"
subject => "WARN: %{[fields][service_name]}项目出现ERROR异常"
htmlbody => "error_message:%{message}\nhost:%{[fields][service_id]}\nkibana:http://127.0.0.1:5601"
from => "noreply@163.com"
address => "smtp.163.com"
username => "noreply@163.com"
password => "password1234"
}
}
}
Logstash 的输入插件有很多,可以根据实际情况选择不同的输入插件,由于是使用 Filebeat 做日志搜集,这里采用 beats 作为输入源。Logstash 在实际的日志处理中,最主要的作用是做日志的格式化与过滤,它的过滤插件有非常多,我们在实际中主要用到的过滤插件是 Grok ,它是一种基于正则的方式来对日志进行格式化和过滤。
Grok 的语法规则是:%{预置正则表达式:自定义属性名称}
,如:%{TIMESTAMP_ISO8601:logdate}
。前面的TIMESTAMP_ISO8601
是预置的一些 Grok 表达式。更多预置的 Grok 表达式请访问:Grok 预置正则表达式(https://github.com/logstash-plugins/logstash-patterns-core/tree/master/patterns)。
如果预置 Grok 表达式的不能满足实际需求,可以写自定义的表达式,语法为:(?<自定义属性名称>正则表达式)
。例如,我们在 Java 中有时遇到线程名:DiscoveryClient-InstanceInfoReplicator-0
, 这个时候可以自定义表达式为:(?<thread-name>[A-Za-z0-9-] [d]?)
。Grok 在线调试工具为 Grok Debugger(https://grokdebug.herokuapp.com/)。
在 Logstash 的输出插件中我们指定四个输出位置:控制台、HTTP、Elasticsearch、Email。
- 控制台:其中控制台输出主要方便最初的调试;
- Elasticsearch:输出到 Elasticsearch 是为了作为 Kibana 之后进行搜索和查询的来源;
- HTTP:输出到日志处理中心方便我们在日志处理中心做进一步处理,包括通知与告警策略处理;
- Email:适合在前期量小的时候,人员能快速响应,在有日志处理中心后,可以去掉 Email 输出。
存储:如何存储日志数据
日志输出最好统一规范,并且按天进行切分。Log 源文件会一直保存,可以购买了专用的硬盘存储日志,也可以交给数据分析部门做一些数据处理。编写脚本,每天凌晨1点脚本会把前天的日志文件传送到专用于存储日志文件的硬盘中。
在 ES 中存储的数据存储周期为一个月,ES 服务器的硬盘可以用 SSD,可以提高 ES 的性能。ES 中的数据索引采用“项目名-年月日”的方式,ES 的部署方式如果是单机部署,非常容易;如果有多台服务器资源的话可以设置成集群方式;我们线上 ES 的日志文件存储时间为 1 个月,1 个月过后 ES 存储的数据会删除。调用 ES 的通配符删除接口:curl -XDELETE elasticsearch_ip_address:9200/[index prefix]*?pretty
。
ES 如果是集群的方式请区分数据节点和 master 节点,集群配置非常方便,保证集群名称一样,并且加上集群的地址。在 config 目录下的 elasticsearch.yml 中进行如下配置。
master 节点的设置:
代码语言:javascript复制cluster.name: elk-servicenode.name: master-1node.master: truenode.data: false # 主节点只做控制network.host: ip-addresshttp.port: 9200 # http端口transport.tcp.port: 9300 # tcp端口#http.cors.enabled: true # 跨域使用#http.cors.allow-origin: "*"discovery.zen.ping.unicast.hosts: ["ip-address1:9300","ip-address2:9300"] # 其它节点的位置#discovery.zen.minimum_master_nodes: xx # 防止脑裂这里的主节点需要设置为xx=(可用主节点/2) 1
data节点的设置:
代码语言:javascript复制cluster.name: elk-servicenode.name: data-1node.master: falsenode.data: true # 数据节点做存储network.host: ip-addresshttp.port: 9200 # http端口transport.tcp.port: 9300 # tcp端口#http.cors.enabled: true # 跨域使用#http.cors.allow-origin: "*"discovery.zen.ping.unicast.hosts: ["ip-address1:9300","ip-address2:93000"]#discovery.zen.minimum_master_nodes: xx # 防止脑裂这里的主节点需要设置为xx=(可用主节点/2) 1
启动 ES 如果报错
[1]: max virtual memory areas vm.max_map_count [65530] is too low, increase to at least [262144]
,用户切换到 root :vim /etc/sysctl.conf
,添加:vm.max_map_count=655360
,之后使用sysctl -p
就可以了。