背景
随着一年一度的护网行动进行,大家都在加紧加固现网的安全设备,增加各种控制规则,封禁成千上万的公网IP,升级各种漏洞打各种补丁。安全部门也提出需求,需要对边界安全设备日志进行收集分析,及时发现异常访问事件。在此之前虽然对边界设备的日志进行收集但是没有对字段进行拆解,无法通过字段筛选进行分析,正常情况下可以通过正则表达式去匹配字段格式拆分字段,面临的问题在于安全设备的日志字段排序不是统一的,无法通过正则完全匹配,瞬间脑袋瓜子嗡嗡的,各种查各种找还是没头绪,最后社区大佬介绍使用 kv 过滤插件实现字段拆解。
kv 过滤插件官方介绍
https://www.elastic.co/guide/en/logstash/current/plugins-filters-kv.html
kv描述
此筛选器有助于自动解析各种消息(或特定事件字段)比如foo=bar。
例如一条包含的日志消息ip=1.2.3.4 error=REFUSED,则可以通过配置以下内容自动解析它们:
filter { kv { } }
上面的结果将显示一条包含ip=1.2.3.4 error=REFUSED以下字段拆解的结果:
ip: 1.2.3.4
error: REFUSED
kv过滤插件非常适用于key=value语法的日志。如果日志数据格式不是使用=符号和空格构成的,则可以配置任意字符串以分割数据。例如,此过滤器还可用于解析查询参数,例如 foo=bar&baz=fizz将field_split参数设置为&。
kv参数介绍
代码语言:javascript复制filter {
if [type] == "kv" {
kv {
source => "message"
prefix => "ex_"
field_split => "&? "
allow_duplicate_values => false
default_keys => {
"from" => "A"
"to" => "B"
}
trim_value => "<>[],"
trim_key => "<>[],"
value_split => "=:"
}
}
}
#output { stdout { codec => rubydebug } }
source:数据源默认为message,需要key=value在其上执行搜索的字段
prefix:一个字符串,位于所有解析字段之前,给所有解析出来的字段加上一个前缀
field_split:用作单字符字段定界符的字符串,用于解析键值的分隔符,默认值为 "空格"
allow_duplicate_values: 布尔类型,用于删除重复的键值对。设置为false时,将仅保留一对唯一的键值对,默认值true,不删除重复键值
default_keys: 指定默认键及其值的哈希值,如果这些键在要解析的源字段中不存在,则应将其添加到事件中
trim_value: 用于去除解析后value里面包含的小括号或者中括号等符号
trim_key: 用于去除解析后key里面包含的小括号或者中括号等符号
value_split:设置键值识别关系的分隔符,默认为=
安全设备原始日志数据
Apr 17 08:29:14 127.0.0.1 date=2021-04-17 time=08:32:53 device_id=FADV040000184375 log_id=0101008001 type=traffic subtype=slb_http pri=information vd=root msg_id=144826897 duration=26008 ibytes=553 obytes=350 proto=6 service=https src=59.53.27.12 src_port=3301 dst=23.209.22.6 dst_port=443 trans_src=59.53.27.12 trans_src_port=32934 trans_dst=23.29.31.11 trans_dst_port=30014 policy=VS_CR_23-29-28-6_443 action=none http_method=post http_host="plms.sf-dsc.com" http_agent="Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/70.0.3538.25 Safari/537.36 Core/1.70.3823.400 QQBrowser/10.7.4307.400" http_url="/endpoint/918/z00gj0xz/xhr" http_qry="t=1618619328955" http_referer="https://ip/wms.html?customer=dhlc-wms-prod&lang=zh" http_cookie="cookiesession1=63B5DAC3P1N0NA6KNFUE4PBYDHNWC28A; fad=rs2|YHosn" http_retcode=200 user=none usrgrp=none auth_status=none srccountry=China dstcountry=Mexico real_server=RS_AZ1_WAF_23.29.21.11
Apr 17 08:32:34 127.0.0.1 date=2021-04-17 time=08:36:13 device_id=FADV040000184375 log_id=0104009000 type=traffic subtype=dns pri=information vd=root msg_id=144827370 proto=17 src=3.221.42.234 src_port=16453 dst=23.29.28.5 dst_port=53 policy=dns_policy action=none fqdn=elink-ftps.awsapp.sf-dsc.com resip=52.80.19.28 srccountry=United States dstcountry=Mexico
通过logstash kv filter 过滤解析
代码语言:javascript复制if "aabbcc" in [tags] {
grok{
match => {"message" => "%{SYSLOGTIMESTAMP:time} %{DATA:hostname} %{GREEDYDATA:info}"}
}
kv{
source=> "info"
}
mutate {
remove_field => ["message","info","year","offset","host"]
}
}
拆解后字段