大数据ELK(二十二):采集Apache Web服务器日志

2022-10-12 08:28:50 浏览数 (1)

采集Apache Web服务器日志

一、需求

Apache的Web Server会产生大量日志,当我们想要对这些日志检索分析。就需要先把这些日志导入到Elasticsearch中。此处,我们就可以使用Logstash来实现日志的采集

打开这个文件,如下图所示。我们发现,是一个纯文本格式的日志。如下图所示:

 这个日志其实由一个个的字段拼接而成,参考以下表格

字段名

说明

client IP

浏览器端IP

timestamp

请求的时间戳

method

请求方式(GET/POST)

uri

请求的链接地址

status

服务器端响应状态

length

响应的数据长度

reference

从哪个URL跳转而来

browser

浏览器

因为最终我们需要将这些日志数据存储在Elasticsearch中,而Elasticsearch是有模式(schema)的,而不是一个大文本存储所有的消息,而是需要将字段一个个的保存在Elasticsearch中。所以,我们需要在Logstash中,提前将数据解析好,将日志文本行解析成一个个的字段,然后再将字段保存到Elasticsearch中

二、准备日志数据

将Apache服务器日志上传到 /export/server/es/data/apache/ 目录

代码语言:javascript复制
mkdir -p /export/server/es/data/apache/

三、使用FileBeats将日志发送到Logstash

在使用Logstash进行数据解析之前,我们需要使用FileBeat将采集到的数据发送到Logstash。之前,我们使用的FileBeat是通过FileBeat的Harvester组件监控日志文件,然后将日志以一定的格式保存到Elasticsearch中,而现在我们需要配置FileBeats将数据发送到Logstash。FileBeat这一端配置以下即可:

代码语言:javascript复制
#----------------------------- Logstash output ---------------------------------
#output.logstash:
  # Boolean flag to enable or disable the output module.
  #enabled: true

  # The Logstash hosts
  #hosts: ["localhost:5044"]

hosts配置的是Logstash监听的IP地址/机器名以及端口号。

准备FileBeat配置文件

代码语言:javascript复制
cd /export/server/es/filebeat-7.6.1-linux-x86_64

vim filebeat-logstash.yml

因为Apache的web log日志都是以IP地址开头的,所以我们需要修改下匹配字段

代码语言:javascript复制
filebeat.inputs:
- type: log
  enabled: true
  paths:
    - /var/apache/log/access.*
  multiline.pattern: '^d .d .d .d  '
  multiline.negate: true
  multiline.match: after

output.logstash:
  enabled: true
  hosts: ["node1:5044"]

启动FileBeat,并指定使用新的配置文件

代码语言:javascript复制
./filebeat -e -c filebeat-logstash.yml

FileBeat将尝试建立与Logstash监听的IP和端口号进行连接。但此时,我们并没有开启并配置Logstash,所以FileBeat是无法连接到Logstash的。

代码语言:javascript复制
2021-12-05T11:28:47.585 0800    ERROR   pipeline/output.go:100  Failed to connect to backoff(async(tcp://node1.itcast.cn:5044)): dial tcp 192.168.88.100:5044: connect: connection refused

四、配置Logstash接收FileBeat数据并打印

Logstash的配置文件和FileBeat类似,它也需要有一个input、和output。基本格式如下:

代码语言:javascript复制
# #号表示添加注释
# input表示要接收的数据
input {
}

# file表示对接收到的数据进行过滤处理
filter {

}

# output表示将数据输出到其他位置
output {
}

配置从FileBeat接收数据

代码语言:javascript复制
cd /export/server/es/logstash-7.6.1/config
vim filebeat-print.conf
代码语言:javascript复制
input {
  beats {
    port => 5044
  }
}

output {
  stdout {
    codec => rubydebug
  }
}

测试logstash配置是否正确

代码语言:javascript复制
bin/logstash -f config/filebeat-print.conf --config.test_and_exit
代码语言:javascript复制
[2021-12-05T11:46:33,940][INFO ][logstash.runner          ] Using config.test_and_exit mode. Config Validation Result: OK. Exiting Logstash

启动logstash

代码语言:javascript复制
bin/logstash -f config/filebeat-print.conf --config.reload.automatic

 reload.automatic:修改配置文件时自动重新加载

测试

创建一个access.log.1文件,使用cat test >> access.log.1往日志文件中追加内容。

test文件中只保存一条日志:

代码语言:javascript复制
[root@node1 log]# cat test 
235.9.200.242 - - [15/Apr/2015:00:27:19  0849] "POST /it.cn/bigdata.html 200 45 "www.baidu.com" "Mozilla/5.0 (Windows NT 5.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/53.0.2785.104 Safari/537.36 Core/1.53.4549.400 QQBrowser/9.7.12900 144.180.122.249

当我们启动Logstash之后,就可以发现Logstash会打印出来从FileBeat接收到的数据:

代码语言:javascript复制
{
           "log" => {
          "file" => {
            "path" => "/var/apache/log/access.log.1"
        },
        "offset" => 825
    },
         "input" => {
        "type" => "log"
    },
         "agent" => {
        "ephemeral_id" => "d4c3b652-4533-4ebf-81f9-a0b78c0d4b05",
             "version" => "7.6.1",
                "type" => "filebeat",
                  "id" => "b4c5c4dc-03c3-4ba4-9400-dc6afcb36d64",
            "hostname" => "node1.itcast.cn"
    },
    "@timestamp" => 2021-12-05T09:07:55.236Z,
           "ecs" => {
        "version" => "1.4.0"
    },
          "host" => {
        "name" => "node1"
    },
          "tags" => [
        [0] "beats_input_codec_plain_applied"
    ],
       "message" => "235.9.200.242 - - [15/Apr/2015:00:27:19  0849] "POST /it.cn/bigdata.html 200 45 "www.baidu.com" "Mozilla/5.0 (Windows NT 5.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/53.0.2785.104 Safari/537.36 Core/1.53.4549.400 QQBrowser/9.7.12900 144.180.122.249",
      "@version" => "1"
}

五、Logstash输出数据到Elasticsearch

通过控制台,我们发现Logstash input接收到的数据没有经过任何处理就发送给了output组件。而其实我们需要将数据输出到Elasticsearch。所以,我们修改Logstash的output配置。配置输出Elasticsearch只需要配置以下就可以了:

代码语言:javascript复制
output {
    elasticsearch {
        hosts => [ "localhost:9200" ]
    }}

操作步骤:

1、重新拷贝一份配置文件

代码语言:javascript复制
cp filebeat-print.conf filebeat-es.conf

2、将output修改为Elasticsearch

代码语言:javascript复制
input {
  beats {
    port => 5044
  }
}

output {
 elasticsearch {
   hosts => [ "node1:9200","node2:9200","node3:9200"]
 }
}

3、重新启动Logstash

代码语言:javascript复制
bin/logstash -f config/filebeat-es.conf --config.reload.automatic

4、追加一条日志到监控的文件中,并查看Elasticsearch中的索引、文档

代码语言:javascript复制
cat test >> access.log.1 

// 查看索引数据

GET /_cat/indices?v

我们在Elasticsearch中发现一个以logstash开头的索引

代码语言:javascript复制
    {
        "health": "green",
        "status": "open",
        "index": "logstash-2021.12.05-000001",
        "uuid": "147Uwl1LRb-HMFERUyNEBw",
        "pri": "1",
        "rep": "1",
        "docs.count": "2",
        "docs.deleted": "0",
        "store.size": "44.8kb",
        "pri.store.size": "22.4kb"
    }

// 查看索引库的数据

GET /logstash-2021.12.05-000001/_search?format=txt

代码语言:javascript复制
{

    "from": 0,

    "size": 1

}

我们可以获取到以下数据:

代码语言:javascript复制
                    "@timestamp": "2021-12-05T09:38:00.402Z",
                    "tags": [
                        "beats_input_codec_plain_applied"
                    ],
                    "host": {
                        "name": "node1"
                    },
                    "@version": "1",
                    "log": {
                        "file": {
                            "path": "/var/apache/log/access.log.1"
                        },
                        "offset": 1343
                    },
                    "agent": {
                        "version": "7.6.1",
                        "ephemeral_id": "d4c3b652-4533-4ebf-81f9-a0b78c0d4b05",
                        "id": "b4c5c4dc-03c3-4ba4-9400-dc6afcb36d64",
                        "hostname": "node1",
                        "type": "filebeat"
                    },
                    "input": {
                        "type": "log"
                    },
                    "message": "235.9.200.242 - - [15/Apr/2015:00:27:19  0849] "POST /it.cn/bigdata.html 200 45 "www.baidu.com" "Mozilla/5.0 (Windows NT 5.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/53.0.2785.104 Safari/537.36 Core/1.53.4549.400 QQBrowser/9.7.12900 144.180.122.249",
                    "ecs": {
                        "version": "1.4.0"
                    }

从输出返回结果,我们可以看到,日志确实已经保存到了Elasticsearch中,而且我们看到消息数据是封装在名为message中的,其他的数据也封装在一个个的字段中。我们其实更想要把消息解析成一个个的字段。例如:IP字段、时间、请求方式、请求URL、响应结果,这样

六、Logstash过滤器

在Logstash中可以配置过滤器Filter对采集到的数据进行中间处理,在Logstash中,有大量的插件供我们使用。参考官网:

Filter plugins | Logstash Reference [7.6] | Elastic

此处,我们重点来讲解Grok插件。

1、查看Logstash已经安装的插件

代码语言:javascript复制
bin/logstash-plugin list

2、Grok插件

Grok是一种将非结构化日志解析为结构化的插件。这个工具非常适合用来解析系统日志、Web服务器日志、MySQL或者是任意其他的日志格式。

Grok官网:Grok filter plugin | Logstash Reference [7.6] | Elastic

3、Grok语法

Grok是通过模式匹配的方式来识别日志中的数据,可以把Grok插件简单理解为升级版本的正则表达式。它拥有更多的模式,默认,Logstash拥有120个模式。如果这些模式不满足我们解析日志的需求,我们可以直接使用正则表达式来进行匹配。

官网:https://github.com/logstash-plugins/logstash-patterns-core/blob/master/patterns/grok-patterns

grok模式的语法是:%{SYNTAX:SEMANTIC}

SYNTAX指的是Grok模式名称,SEMANTIC是给模式匹配到的文本字段名。例如:

%{NUMBER:duration} %{IP:client}

duration表示:匹配一个数字,client表示匹配一个IP地址

默认在Grok中,所有匹配到的的数据类型都是字符串,如果要转换成int类型(目前只支持int和float),可以这样:%{NUMBER:duration:int} %{IP:client}

以下是常用的Grok模式:

NUMBER

匹配数字(包含:小数)

INT

匹配整形数字

POSINT

匹配正整数

WORD

匹配单词

DATA

匹配所有字符

IP

匹配IP地址

PATH

匹配路径

4、用法

代码语言:javascript复制
    filter {
      grok {
        match => { "message" => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}" }
      }
    }

七、匹配日志中的IP、日期并打印

代码语言:javascript复制
235.9.200.242 - - [15/Apr/2015:00:27:19  0849] "POST /it.cn/bigdata.html 200 45 "www.baidu.com" "Mozilla/5.0 (Windows NT 5.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/53.0.2785.104 Safari/537.36 Core/1.53.4549.400 QQBrowser/9.7.12900 144.180.122.249

我们使用IP就可以把前面的IP字段匹配出来,使用HTTPDATE可以将后面的日期匹配出来

配置Grok过滤插件

1、配置Logstash

代码语言:javascript复制
input {
    beats {
        port => 5044
    }
}

filter {
    grok {
        match => { 
            "message" => "%{IP:ip} - - [%{HTTPDATE:date}]" 
        }
    }   
}

output {
    stdout {
        codec => rubydebug
    }
}

2、启动Logstash

代码语言:javascript复制
bin/logstash -f config/filebeat-filter-print.conf --config.reload.automatic
代码语言:javascript复制
{
           "log" => {
        "offset" => 1861,
          "file" => {
            "path" => "/var/apache/log/access.log.1"
        }
    },
         "input" => {
        "type" => "log"
    },
          "tags" => [
        [0] "beats_input_codec_plain_applied"
    ],
          "date" => "15/Apr/2015:00:27:19  0849",
           "ecs" => {
        "version" => "1.4.0"
    },
    "@timestamp" => 2021-12-05T11:02:05.809Z,
       "message" => "235.9.200.242 - - [15/Apr/2015:00:27:19  0849] "POST /it.cn/bigdata.html 200 45 "www.baidu.com" "Mozilla/5.0 (Windows NT 5.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/53.0.2785.104 Safari/537.36 Core/1.53.4549.400 QQBrowser/9.7.12900 144.180.122.249",
          "host" => {
        "name" => "node1"
    },
            "ip" => "235.9.200.242",
         "agent" => {
            "hostname" => "node1",
             "version" => "7.6.1",
        "ephemeral_id" => "d4c3b652-4533-4ebf-81f9-a0b78c0d4b05",
                  "id" => "b4c5c4dc-03c3-4ba4-9400-dc6afcb36d64",
                "type" => "filebeat"
    },
      "@version" => "1"
}

我们看到,经过Grok过滤器插件处理之后,我们已经获取到了ip和date两个字段。接下来,我们就可以继续解析其他的字段

八、解析所有字段

将日志解析成以下字段:

字段名

说明

client IP

浏览器端IP

timestamp

请求的时间戳

method

请求方式(GET/POST)

uri

请求的链接地址

status

服务器端响应状态

length

响应的数据长度

reference

从哪个URL跳转而来

browser

浏览器

1、修改Logstash配置文件

代码语言:javascript复制
input {
    beats {
        port => 5044
    }
}

filter {
    grok {
        match => { 
            "message" => "%{IP:ip} - - [%{HTTPDATE:date}] "%{WORD:method} %{PATH:uri} %{DATA}" %{INT:status} %{INT:length} "%{DATA:reference}" "%{DATA:browser}"" 
        }
    }   
}

output {
    stdout {
        codec => rubydebug
    }
}

2、测试并启动Logstash

我们可以看到,8个字段都已经成功解析。

代码语言:javascript复制
{
     "reference" => "www.baidu.com",
      "@version" => "1",
           "ecs" => {
        "version" => "1.4.0"
    },
    "@timestamp" => 2021-12-05T03:30:10.048Z,
            "ip" => "235.9.200.241",
        "method" => "POST",
           "uri" => "/it.cn/bigdata.html",
         "agent" => {
                  "id" => "b4c5c4dc-03c3-4ba4-9400-dc6afcb36d64",
        "ephemeral_id" => "734ae9d8-bcdc-4be6-8f97-34387fcde972",
             "version" => "7.6.1",
            "hostname" => "node1",
                "type" => "filebeat"
    },
        "length" => "45",
        "status" => "200",
           "log" => {
          "file" => {
            "path" => "/var/apache/log/access.log"
        },
        "offset" => 1
    },
         "input" => {
        "type" => "log"
    },
          "host" => {
        "name" => "node1"
    },
          "tags" => [
        [0] "beats_input_codec_plain_applied"
    ],
       "browser" => "Mozilla/5.0 (Windows NT 5.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/53.0.2785.104 Safari/537.36 Core/1.53.4549.400 QQBrowser/9.7.12900",
          "date" => "15/Apr/2015:00:27:19  0849",
       "message" => "235.9.200.241 - - [15/Apr/2015:00:27:19  0849] "POST /it.cn/bigdata.html HTTP/1.1" 200 45 "www.baidu.com" "Mozilla/5.0 (Windows NT 5.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/53.0.2785.104 Safari/537.36 Core/1.53.4549.400 QQBrowser/9.7.12900""
}

九、将数据输出到Elasticsearch

到目前为止,我们已经通过了Grok Filter可以将日志消息解析成一个一个的字段,那现在我们需要将这些字段保存到Elasticsearch中。我们看到了Logstash的输出中,有大量的字段,但如果我们只需要保存我们需要的8个,该如何处理呢?而且,如果我们需要将日期的格式进行转换,我们又该如何处理呢?

1、过滤出来需要的字段

要过滤出来我们需要的字段。我们需要使用mutate插件。mutate插件主要是作用在字段上,例如:它可以对字段进行重命名、删除、替换或者修改结构。

官方文档:Mutate filter plugin | Logstash Reference [7.6] | Elastic

例如,mutate插件可以支持以下常用操作

配置:

注意:此处为了方便进行类型的处理,将status、length指定为int类型

代码语言:javascript复制
input {
    beats {
        port => 5044
    }
}

filter {
    grok {
        match => { 
            "message" => "%{IP:ip} - - [%{HTTPDATE:date}] "%{WORD:method} %{PATH:uri} %{DATA}" %{INT:status:int} %{INT:length:int} "%{DATA:reference}" "%{DATA:browser}"" 
        }
    }
    mutate {
        enable_metric => "false"
        remove_field => ["message", "log", "tags", "@timestamp", "input", "agent", "host", "ecs", "@version"]
    }
}

output {
    stdout {
        codec => rubydebug
    }
}

2、转换日期格式

要将日期格式进行转换,我们可以使用Date插件来实现。该插件专门用来解析字段中的日期,官方说明文档:Date filter plugin | Logstash Reference [7.6] | Elastic

用法如下:

将date字段转换为「年月日 时分秒」格式。默认字段经过date插件处理后,会输出到@timestamp字段,所以,我们可以通过修改target属性来重新定义输出字段。

Logstash配置修改为如下:

代码语言:javascript复制
input {
    beats {
        port => 5044
    }
}

filter {
    grok {
        match => { 
            "message" => "%{IP:ip} - - [%{HTTPDATE:date}] "%{WORD:method} %{PATH:uri} %{DATA}" %{INT:status:int} %{INT:length:int} "%{DATA:reference}" "%{DATA:browser}"" 
        }
    }
    mutate {
        enable_metric => "false"
        remove_field => ["message", "log", "tags", "@timestamp", "input", "agent", "host", "ecs", "@version"]
    }
    date {
        match => ["date","dd/MMM/yyyy:HH:mm:ss Z","yyyy-MM-dd HH:mm:ss"]
        target => "date"
    }
}

output {
    stdout {
        codec => rubydebug
    }
}

启动Logstash:

代码语言:javascript复制
bin/logstash -f config/filebeat-filter-print.conf --config.reload.automatic
代码语言:javascript复制
{
       "status" => "200",
    "reference" => "www.baidu.com",
       "method" => "POST",
      "browser" => "Mozilla/5.0 (Windows NT 5.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/53.0.2785.104 Safari/537.36 Core/1.53.4549.400 QQBrowser/9.7.12900",
           "ip" => "235.9.200.241",
       "length" => "45",
          "uri" => "/it.cn/bigdata.html",
         "date" => 2015-04-14T15:38:19.000Z
}

3、输出到Elasticsearch指定索引

我们可以通过    

代码语言:javascript复制
elasticsearch {

        hosts => ["node1:9200" ,"node2:9200" ,"node3:9200"]

        index => "xxx"

}

index来指定索引名称,默认输出的index名称为:logstash-%{ yyyy.MM.dd}。但注意,要在index中使用时间格式化,filter的输出必须包含 @timestamp字段,否则将无法解析日期。

代码语言:javascript复制
input {
    beats {
        port => 5044
    }
}

filter {
    grok {
        match => { 
            "message" => "%{IP:ip} - - [%{HTTPDATE:date}] "%{WORD:method} %{PATH:uri} %{DATA}" %{INT:status:int} %{INT:length:int} "%{DATA:reference}" "%{DATA:browser}"" 
        }
    }
    mutate {
        enable_metric => "false"
        remove_field => ["message", "log", "tags", "input", "agent", "host", "ecs", "@version"]
    }
    date {
        match => ["date","dd/MMM/yyyy:HH:mm:ss Z","yyyy-MM-dd HH:mm:ss"]
        target => "date"
    }
}

output {
    stdout {
        codec => rubydebug
    }
    elasticsearch {
        hosts => ["node1:9200" ,"node2:9200" ,"node3:9200"]
        index => "apache_web_log_%{ YYYY-MM}"
    }
}

启动Logstash

代码语言:javascript复制
bin/logstash -f config/filebeat-apache-weblog.conf --config.test_and_exit

bin/logstash -f config/filebeat-apache-weblog.conf --config.reload.automatic

注意:

  • index名称中,不能出现大写字符

0 人点赞