filebeat及logstash配置

2023-05-19 19:52:11 浏览数 (2)

记录下filebeat及logstash配置语法。

# 配置filebeat收集nginx日志及java日志

代码语言:javascript复制
filebeat.inputs:
- type: log
  enabled: True
  fields:
    log_type: nginx-access
    project_name: shsp
    log_topic: common_nginx
  fields_under_root: true
  paths:
    - /apps/usr/nginx/logs/access.log

- type: log
  enabled: True
  fields:
    project_name: shsp
    log_type: nginx-error
    log_topic: common_nginx
  fields_under_root: true
  paths:
    - /apps/usr/nginx/logs/error.log


- type: log
  enabled: True
  multiline.pattern: '^[[:space:]] |^Caused by:'
  multiline.negate: false
  multiline.match: after
  fields:
    project_name: shsp
    log_type: app_log
    log_topic: app_all
  fields_under_root: true
  paths:
    - /apps/usr/appdata/logs/*.log

output.kafka:
  hosts: ["192.168.20.2:9092", "192.168.20.3:9092", "192.168.20.4:9092"]
  topic: '%{[log_topic]}'
  partition.round_robin:
    reachable_only: false
  required_acks: 1
  compression: gzip
  max_message_bytes: 1000000

processors:
  - drop_fields:
      fields: ['ecs', 'beat', 'input', '@version', 'agent']

# logstash处理nginx日志

代码语言:javascript复制
input {
    kafka {
        bootstrap_servers => "192.168.20.2:9092,192.168.20.3:9092,192.168.20.4:9092"
        topics => ["common_nginx"]
        codec => json { charset => "UTF-8" }
        group_id => "standard"
        consumer_threads => 8
    }
}
filter {
   if [log_type] == "nginx-access" {
        grok {
           match => { "message" => ["%{IPORHOST:[access][remote_ip]} - %{DATA:[access][user_name]} [%{HTTPDATE:[access][time]}] "%{WORD:[access][method]} %{DATA:[access][url]} HTTP/%{NUMBER:[access][http_version]}" %{NUMBER:[access][response_code]} %{NUMBER:[access][body_sent][bytes]} "%{DATA:[access][referrer]}" "%{DATA:[access][agent]}""] }
        remove_field => "message"
        }
        mutate {
            add_field => { "read_timestamp" => "%{@timestamp}" }
        }
        date {
            match => [ "[access][time]", "dd/MMM/YYYY:H:m:s Z" ]
            remove_field => "[access][time]"
        }
        useragent {
            source => "[access][agent]"
            target => "[access][user_agent]"
            remove_field => "[access][agent]"
        }
        geoip {
            source => "[access][remote_ip]"
            target => "[geoip]"
        }
    } 
    else if [log_type] == "nginx-error" {
        grok {
            match => { "message" => ["%{DATA:[error][time]} [%{DATA:[error][level]}] %{NUMBER:[error][pid]}#%{NUMBER:[error][tid]}: (*%{NUMBER:[error][connection_id]} )?%{GREEDYDATA:[error][message]}"] }
            remove_field => "message"
        }
        mutate {
            rename => { "@timestamp" => "read_timestamp" }
        }
        date {
            match => [ "[error][time]", "YYYY/MM/dd H:m:s"]
            remove_field => "[error][time]"
        }
    }
}

output {
    #stdout { 
    #    codec => rubydebug
    #}

    elasticsearch {
        hosts => [ "192.168.20.11:9200","192.168.20.12:9200","192.168.20.13:9200" ]
        user => "elastic"
        password => "abcd"
        index => "logstash-nginx-%{project_name}-%{ YYYY.MM.dd}"
        codec => json { charset => "UTF-8" }
    }
}

# logstash处理java日志

代码语言:javascript复制
input {
    kafka {
        bootstrap_servers => "192.168.20.2:9092,192.168.20.3:9092,192.168.20.4:9092"
        topics => ["app_all"]
        codec => json { charset => "UTF-8" }
        group_id => "standard"
        consumer_threads => 8
    }
}
filter {
    mutate {
                add_field => { "log_path" => "%{[log][file][path]}" }
                add_field => { "host_name" => "%{[host][name]}"}
        }
        mutate {
            split => ["[log][file][path]", "/"]
        }
    mutate {
              split => ["[log][file][path][-1]", "."]
    }
        mutate {
                add_field => { "log_name" => "%{[log][file][path][-1][0]}" }
        }
        mutate{
            remove_field => ["log", "host"]

        }  
}

output {
    #stdout { 
    #    codec => rubydebug
    #}

    elasticsearch {
        hosts => [ "192.168.20.11:9200","192.168.20.12:9200","192.168.20.13:9200" ]
        user => "elastic"
        password => "abcd"
        index => "logstash-app_%{project_name}-%{ YYYY.MM.dd}"
        codec => json { charset => "UTF-8" }
    }
}

# filebeat配置fields中字段介绍

在 Filebeat 的配置文件中,fields 配置项允许你添加自定义字段,以便更好地描述、分类或标记日志事件。

fields 配置项通常位于 Filebeat 配置文件的 filebeat.inputs 部分,如下所示:

代码语言:javascript复制
filebeat.inputs:
- type: log
  enabled: true
  paths:
    - /var/log/*.log
  fields:
    app_name: my_app
    environment: production

在这个例子中,我们向每个日志事件添加了两个自定义字段:app_nameenvironment。这些字段在后续处理和分析日志数据时非常有用,可以帮助我们根据应用名称或环境对日志进行过滤、查询和聚合。

在 Filebeat 中,你可以添加任意数量的自定义字段,以满足你对日志数据的标记和分类需求。这些字段在 Filebeat 将日志数据发送到目标时保留,并可以在日志处理过程中被使用。

fields_under_root介绍

在 Filebeat 配置文件中,fields_under_root 是一个布尔选项,用于控制自定义字段(通过 fields 配置项添加)是作为顶层字段还是子级字段添加到日志事件中。默认情况下,fields_under_root 选项的值为 false,这意味着自定义字段将作为子级字段添加到事件中。

如果将 fields_under_root 设置为 true,则自定义字段将添加到事件的顶层。这样的设置可能对于与其他系统的集成和兼容性非常重要,因为某些系统可能要求在特定的顶层字段中存储一些元数据。

以下是一个配置示例:

代码语言:javascript复制
filebeat.inputs:
- type: log
  enabled: true
  paths:
    - /var/log/*.log
  fields:
    app_name: my_app
    environment: production
  fields_under_root: true

在这个例子中,fields_under_root 被设置为 true,所以 app_nameenvironment 这两个自定义字段将直接作为顶层字段添加到日志事件中。如果将 fields_under_root 设置为 false 或不设置,那么这些字段将作为子级字段添加到事件中,如 fields.app_namefields.environment

0 人点赞