Logstash Kv filter plugin(安全设备日志字段解析)

2021-04-29 10:57:33 浏览数 (1)

背景

随着一年一度的护网行动进行,大家都在加紧加固现网的安全设备,增加各种控制规则,封禁成千上万的公网IP,升级各种漏洞打各种补丁。安全部门也提出需求,需要对边界安全设备日志进行收集分析,及时发现异常访问事件。在此之前虽然对边界设备的日志进行收集但是没有对字段进行拆解,无法通过字段筛选进行分析,正常情况下可以通过正则表达式去匹配字段格式拆分字段,面临的问题在于安全设备的日志字段排序不是统一的,无法通过正则完全匹配,瞬间脑袋瓜子嗡嗡的,各种查各种找还是没头绪,最后社区大佬介绍使用 kv 过滤插件实现字段拆解。

kv 过滤插件官方介绍

https://www.elastic.co/guide/en/logstash/current/plugins-filters-kv.html

kv描述

此筛选器有助于自动解析各种消息(或特定事件字段)比如foo=bar。

例如一条包含的日志消息ip=1.2.3.4 error=REFUSED,则可以通过配置以下内容自动解析它们:

filter { kv { } }

上面的结果将显示一条包含ip=1.2.3.4 error=REFUSED以下字段拆解的结果:

ip: 1.2.3.4

error: REFUSED

kv过滤插件非常适用于key=value语法的日志。如果日志数据格式不是使用=符号和空格构成的,则可以配置任意字符串以分割数据。例如,此过滤器还可用于解析查询参数,例如 foo=bar&baz=fizz将field_split参数设置为&。

kv参数介绍

代码语言:javascript复制
filter {
    if [type] == "kv" {
        kv {
            source => "message"
            prefix => "ex_"
            field_split => "&? "
            allow_duplicate_values => false
            default_keys => {
                "from" => "A"
                "to" => "B"
            }
            trim_value => "<>[],"
            trim_key => "<>[],"
            value_split => "=:"
        }
    }
}
#output { stdout { codec => rubydebug } }

source:数据源默认为message,需要key=value在其上执行搜索的字段

prefix:一个字符串,位于所有解析字段之前,给所有解析出来的字段加上一个前缀

field_split:用作单字符字段定界符的字符串,用于解析键值的分隔符,默认值为 "空格"

allow_duplicate_values: 布尔类型,用于删除重复的键值对。设置为false时,将仅保留一对唯一的键值对,默认值true,不删除重复键值

default_keys: 指定默认键及其值的哈希值,如果这些键在要解析的源字段中不存在,则应将其添加到事件中

trim_value: 用于去除解析后value里面包含的小括号或者中括号等符号

trim_key: 用于去除解析后key里面包含的小括号或者中括号等符号

value_split:设置键值识别关系的分隔符,默认为=

安全设备原始日志数据

Apr 17 08:29:14 127.0.0.1 date=2021-04-17 time=08:32:53 device_id=FADV040000184375 log_id=0101008001 type=traffic subtype=slb_http pri=information vd=root msg_id=144826897 duration=26008 ibytes=553 obytes=350 proto=6 service=https src=59.53.27.12 src_port=3301 dst=23.209.22.6 dst_port=443 trans_src=59.53.27.12 trans_src_port=32934 trans_dst=23.29.31.11 trans_dst_port=30014 policy=VS_CR_23-29-28-6_443 action=none http_method=post http_host="plms.sf-dsc.com" http_agent="Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/70.0.3538.25 Safari/537.36 Core/1.70.3823.400 QQBrowser/10.7.4307.400" http_url="/endpoint/918/z00gj0xz/xhr" http_qry="t=1618619328955" http_referer="https://ip/wms.html?customer=dhlc-wms-prod&lang=zh" http_cookie="cookiesession1=63B5DAC3P1N0NA6KNFUE4PBYDHNWC28A; fad=rs2|YHosn" http_retcode=200 user=none usrgrp=none auth_status=none srccountry=China dstcountry=Mexico real_server=RS_AZ1_WAF_23.29.21.11

Apr 17 08:32:34 127.0.0.1 date=2021-04-17 time=08:36:13 device_id=FADV040000184375 log_id=0104009000 type=traffic subtype=dns pri=information vd=root msg_id=144827370 proto=17 src=3.221.42.234 src_port=16453 dst=23.29.28.5 dst_port=53 policy=dns_policy action=none fqdn=elink-ftps.awsapp.sf-dsc.com resip=52.80.19.28 srccountry=United States dstcountry=Mexico

通过logstash kv filter 过滤解析

代码语言:javascript复制
if "aabbcc" in [tags] {
    grok{
      match => {"message" => "%{SYSLOGTIMESTAMP:time} %{DATA:hostname} %{GREEDYDATA:info}"}
        }
    kv{
      source=> "info"
        }
mutate {
      remove_field => ["message","info","year","offset","host"]
    }

  }

拆解后字段

0 人点赞