# zabbix整合elk收集系统异常日志触发告警
代码语言:javascript复制今天来了解一下关于ELK的“L”-Logstash,没错,就是这个神奇小组件,我们都知道,它是ELK不可缺少的组件,完成了输入(input),过滤(fileter),output(输出)工作量,也是我们作为运维人员需要掌握的难点,说到这里 ,又爱又恨;“爱之好,恨之难”;这个Logstash拥有这强大的插件功能,除了帮我们过滤,高效的输出日志,还能帮我们与Zabbix监控相结合? 因为我们的Logstash支持多种输出类型,能够收集web服务日志,系统日志,内核日志;但是;竟然是有日志输出,肯定避免不了错误(error)日志的出现;当Error日志出现的时候,虽然可以通过ELK查找出来,但是ELK不能实时提供报警,这就有点尴尬了,我们要做的就是能够像zabbix,nagios监控那样,不能要做到监控,还要做到报警,这一点,ELK只做到了监控,但是没有做到报警;不过没关系,我们的Logstash插件能够与zabbix结合起来,将需要告警 的日志收集起来(比如说有错误标识的日志)完成日志监控触发告警~ logstash支持多种输出介质,比如说syslog,http,tcp,elasticsearch,kafka等,如果我们将logstash收集的日志输出到zabbix告警,就必须要用到logstash-output-zabbix插件,通过这个插件将logstash与zabbix整合,logstash收集到的数据过滤出错误信息的日志输出到zabbix中,最后通过zabbix告警机制触发;
[root@localhost ~]# /usr/local/logstash/bin/logstash-plugin install logstash-output-zabbix #安装logstash-output-zabbix插件
Validating logstash-output-zabbix
Installing logstash-output-zabbix
Installation successful
环境案例需求:
通过读系统日志文件的监控,过滤掉日志信息中的异常关键词,如ERR,error,Failed,warning等信息,将这些带有异常关键词的异常日志信息过滤出来,然后输出到zabbix,通过zabbix告警机制实现触发告警;下面环境是filebeat作为采集端;输出到kafaka消息队列,最后由logsatsh拉取日志并过滤,输出到zabbix
filebeat
代码语言:javascript复制- type: log
# Change to true to enable this input configuration.
enabled: true
# Paths that should be crawled and fetched. Glob based paths.
paths:
- /opt/apps/nginx_proxy/log/*.log
- /opt/apps/idc/idc_service_go/logs/*.log
- /workspace/apps/goldmerry/business_server/storage/logs/*.log
- /workspace/apps/goldmerry/cms_server/storage/logs/*.log
- /opt/apps/bms/bms_go_server/logs/backend/*.log
- /opt/apps/bms/bms_go_server/logs/contractmgr/*.log
- /opt/apps/bms/bms_go_server/logs/crm/*.log
- /opt/apps/bms/bms_go_server/logs/frontend/*.log
- /opt/apps/bms/bms_go_server/logs/product/*.log
- /opt/apps/idc/idc_service_go/logs/*.log
- /workspace/apps/goldmerry/crm_server/runtime/log/**/*.log
- /opt/apps/idc/idc_python/logs/*.log
- /opt/apps/idc/idc_python_2/logs/*.log
#- c:programdataelasticsearchlogs*
fields:
topic: pro-idc_bms-logs
name: pro_bms_idc
output.kafka:
hosts: ["xx:9093","xx:9094","xx:9095"]
topic: '%{[fields.topic]}'
partition.round_robin:
reachable_only: true
required_acks: 1
logging:
to_syslog: false
to_files: true
files:
rotateeverybytes: 10485760 # 默认的10MB
level: info
代码语言:javascript复制input {
kafka {
type => "pro-idc_bms-logs"
bootstrap_servers => ["172.16.30.247:9093, 172.16.30.247:9094, 172.16.30.247:9095"]
client_id => "pro-idc_bms-logs"
topics => ["pro-idc_bms-logs"]
consumer_threads => 3
codec => "json"
}
}
filter {
if [fields][topic] == "pro-idc_bms-logs" { #指定filebeat产生的日志主题
mutate {
add_field => [ "[zabbix_key]", "oslogs" ] #新增的字段,字段名是zabbix_key,值为oslogs。
add_field => [ "[zabbix_host]", "%{[host][name]}" ] #新增的字段,字段名是zabbix_host,值可以在这里直接定义,也可以引用字段变量来获取。这里的%{[host][name]}获取的就是日志数据的来源IP,这个来源IP在filebeat配置中的name选项进行定义。
}
}
}
output {
if [type] == "pro-idc_bms-logs" {
if [message] =~ /(error_code:!=0|ERROR|Failed|错误原因|status:404|process day|数据库操作失败)/ {
zabbix {
zabbix_host => "[zabbix_host]" #这个zabbix_host将获取上面filter部分定义的字段变量%{[host][name]}的值
zabbix_key => "[zabbix_key]" #这个zabbix_key将获取上面filter部分中给出的值
zabbix_server_host => "172.16.30.241" #这是指定zabbix server的IP地址
zabbix_server_port => "10051" #这是指定zabbix server的监听端口
zabbix_value => "message"
}
}
}
}
【zabbix-监控模板创建到 告警一触即发】
1.创建模板
将词模板链接到192.168.37.147上,创建的模板上的监控项就会在192.168.37.147上自动生效了
2.创建应用集,点击应用集-创建应用集
3.创建监控项,点击监控项,创建监控项
4.告警触发,创建 触发器
将咱们创建的收集日志的模板连接到 需要收集日志的主机,验证告警触发效果
注意事项:filebeat配置name必须要和zabbix主机名称一致,不然收集不到日志信息
关闭日志告警脚本
代码语言:javascript复制#!/usr/bin/python
# -*- coding:utf-8 -*-
import json
import requests
post_headers = {'Content-Type': 'application/json'}
#http://127.0.0.1/zabbix.php zabbix的web访问地址改成api访问
url = 'http://172.16.30.241:82/api_jsonrpc.php'
#这里的鉴权key可以通过user.login 接口获取 百度下一堆教程
#我用4.0版本的zabbix API 官方文档连接:https://www.zabbix.com/documentation/4.0/zh/manual/api/reference/user/login
key = '###############'
post_data = {
"jsonrpc": "2.0",
"method": "problem.get",
"params": {
"output": "extend",
"severities": 1
},
"auth": key,
"id": 1
}
ret = requests.post(url, data=json.dumps(post_data), headers=post_headers)
result = json.loads(ret.text)
#相关接口文档:https://www.zabbix.com/documentation/4.0/zh/manual/api/reference/event/acknowledge
for id in json.loads(ret.text)['result']:
print(id)
post_data = {
"jsonrpc": "2.0",
"method": "event.acknowledge",
"params": {
"eventids": id['eventid'],
"action": 1,
"message": "Problem resolved."
},
"auth": key,
"id": 1
}
ret = requests.post(url, data=json.dumps(post_data), headers=post_headers)
result = json.loads(ret.text)
print(result)
获取zabbix token
参考链接
https://www.cnblogs.com/bixiaoyu/p/9595698.html
https://blog.csdn.net/JineD/article/details/114440164