一、前言
随着时间的积累,日志数据会越来越多,当你需要查看并分析庞杂的日志数据时,可通过 Filebeat Kafka Logstash Elasticsearch 采集日志数据到Elasticsearch(简称ES)中,并通过 Kibana 进行可视化展示与分析。
本文介绍具体的实现方法。
二、背景信息
Kafka 是一种分布式、高吞吐、可扩展的消息队列服务,广泛用于日志收集、监控数据聚合、流式数据处理、在线和离线分析等大数据领域,已成为大数据生态中不可或缺的部分。在实际应用场景中,为了满足大数据实时检索的需求,一般可以使用 Filebeat 采集日志数据,将 Kafka 作为 Filebeat 的输出端。Kafka 实时接收到 Filebeat 采集的数据后,以 Logstash 作为输出端输出。输出到 Logstash 中的数据在格式或内容上可能不能满足你的需求,此时可以通过 Logstash 的 filter 插件过滤数据。最后将满足需求的数据输出到 ES 中进行分布式检索,并通过 Kibana 进行数据分析与展示。
简单处理流程如下:
三、操作流程
- 准备工作:
- 完成环境准备
- 包括创建对应服务
- 安装 Filebeat 。
- 配置 Filebeat:配置 Filebeat 的 input 为系统日志,outpu 为 Kafka,将日志数据采集到 Kafka 的指定 Topic 中。
- 配置 Logstash 管道:配置 Logstash 管道的 input 为 Kafka,output 为ES,使用 Logstash 消费 Topic 中的数据并传输到ES 中。
- 查看日志消费状态:在消息队列 Kafka 中查看日志数据的消费的状态,验证日志数据是否采集成功。
- 通过 Kibana 过滤日志数据:在 Kibana 控制台的 Discover 页面,通过 Filter 过滤出 Kafka 相关的日志。
四、准备工作
CenterOS 7.6 版本,推荐 8G 以上内存。
1、Docker 环境
执行命令如下:
代码语言:javascript复制# 在 docker 节点执行
# 腾讯云 docker hub 镜像
# export REGISTRY_MIRROR="https://mirror.ccs.tencentyun.com"
# DaoCloud 镜像
# export REGISTRY_MIRROR="http://f1361db2.m.daocloud.io"
# 阿里云 docker hub 镜像
export REGISTRY_MIRROR=https://registry.cn-hangzhou.aliyuncs.com
# 安装 docker
# 参考文档如下
# https://docs.docker.com/install/linux/docker-ce/centos/
# https://docs.docker.com/install/linux/linux-postinstall/
# 卸载旧版本
yum remove -y docker
docker-client
docker-client-latest
docker-ce-cli
docker-common
docker-latest
docker-latest-logrotate
docker-logrotate
docker-selinux
docker-engine-selinux
docker-engine
# 设置 yum repository
yum install -y yum-utils
device-mapper-persistent-data
lvm2
yum-config-manager --add-repo http://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo
# 安装并启动 docker
yum install -y docker-ce-19.03.11 docker-ce-cli-19.03.11 containerd.io-1.2.13
mkdir /etc/docker || true
cat > /etc/docker/daemon.json <<EOF
{
"registry-mirrors": ["${REGISTRY_MIRROR}"],
"exec-opts": ["native.cgroupdriver=systemd"],
"log-driver": "json-file",
"log-opts": {
"max-size": "100m"
},
"storage-driver": "overlay2",
"storage-opts": [
"overlay2.override_kernel_check=true"
]
}
EOF
mkdir -p /etc/systemd/system/docker.service.d
# Restart Docker
systemctl daemon-reload
systemctl enable docker
systemctl restart docker
# 关闭 防火墙
systemctl stop firewalld
systemctl disable firewalld
# 关闭 SeLinux
setenforce 0
sed -i "s/SELINUX=enforcing/SELINUX=disabled/g" /etc/selinux/config
# 关闭 swap
swapoff -a
yes | cp /etc/fstab /etc/fstab_bak
cat /etc/fstab_bak |grep -v swap > /etc/fstab
验证下 docker info:
代码语言:javascript复制[root@vm-1]# docker info
Client:
Debug Mode: false
Server:
Containers: 16
Running: 11
Paused: 0
Stopped: 5
Images: 22
Server Version: 19.03.11
Storage Driver: overlay2
Backing Filesystem: xfs
Supports d_type: true
Native Overlay Diff: true
Logging Driver: json-file
Cgroup Driver: systemd
Plugins:
Volume: local
Network: bridge host ipvlan macvlan null overlay
Log: awslogs fluentd gcplogs gelf journald json-file local logentries splunk syslog
Swarm: inactive
Runtimes: runc
Default Runtime: runc
Init Binary: docker-init
containerd version: 7ad184331fa3e55e52b890ea95e65ba581ae3429
runc version: dc9208a3303feef5b3839f4323d9beb36df0a9dd
init version: fec3683
Security Options:
seccomp
Profile: default
Kernel Version: 3.10.0-1127.el7.x86_64
Operating System: CentOS Linux 7 (Core)
OSType: linux
Architecture: x86_64
CPUs: 4
Total Memory: 11.58GiB
Name: vm-autotest-server
ID: KQ5B:KAG5:LLB5:CUD4:NQZX:4GHL:5XLY:FM7X:KRJ5:X3WK:42GV:QLON
Docker Root Dir: /var/lib/docker
Debug Mode: false
Registry: https://index.docker.io/v1/
Labels:
Experimental: false
Insecure Registries:
172.16.62.179:5000
127.0.0.0/8
Registry Mirrors:
https://registry.cn-hangzhou.aliyuncs.com/
Live Restore Enabled: false
2、Docker Compose 环境
代码语言:javascript复制❝Docker Compose是一个用于定义和运行多个 docker 容器应用的工具。使用 Compose 你可以用 YAML 文件来配置你的应用服务,然后使用一个命令,你就可以部署你配置的所有服务了。 ❞
# 下载 Docker Compose
sudo curl -L "https://github.com/docker/compose/releases/download/1.29.2/docker-compose-$(uname -s)-$(uname -m)" -o /usr/local/bin/docker-compose
# 修改该文件的权限为可执行
chmod x /usr/local/bin/docker-compose
# 验证信息
docker-compose --version
3、版本准备
「组件」 | 「版本」 | 「部署方式」 |
---|---|---|
elasticsearch | 7.6.2 | Docker Compose |
logstash | 7.6.2 | Docker Compose |
kibana | 7.6.2 | Docker Compose |
zookeeper | latest | Docker Compose |
kafka | latest | Docker Compose |
filebeat | 7.4.2 | 二进制 |
4、环境初始化
执行命令如下:
代码语言:javascript复制# 需要设置系统内核参数,否则 ES 会因为内存不足无法启动
# 改变设置
sysctl -w vm.max_map_count=262144
# 使之立即生效
sysctl -p
# 创建 logstash 目录,并将 Logstash 的配置文件 logstash.conf 拷贝到该目录
mkdir -p /mydata/logstash
# 需要创建 elasticsearch/data 目录并设置权限,否则 ES 会因为无权限访问而启动失败
mkdir -p /mydata/elasticsearch/data/
chmod 777 /mydata/elasticsearch/data/
5、服务安装
docker-compose.yml 文件内容为:
代码语言:javascript复制version: '3'
services:
elasticsearch:
image: elasticsearch:7.6.2
container_name: elasticsearch
user: root
environment:
- "cluster.name=elasticsearch" #设置集群名称为elasticsearch
- "discovery.type=single-node" #以单一节点模式启动
volumes:
- /mydata/elasticsearch/plugins:/usr/share/elasticsearch/plugins #插件文件挂载
- /mydata/elasticsearch/data:/usr/share/elasticsearch/data #数据文件挂载
- /etc/localtime:/etc/localtime:ro
- /usr/share/zoneinfo:/usr/share/zoneinfo
ports:
- 9200:9200
- 9300:9300
networks:
- elastic
logstash:
image: logstash:7.6.2
container_name: logstash
environment:
- TZ=Asia/Shanghai
volumes:
- /mydata/logstash/logstash.conf:/usr/share/logstash/pipeline/logstash.conf #挂载logstash的配置文件
depends_on:
- elasticsearch #kibana在elasticsearch启动之后再启动
links:
- elasticsearch:es #可以用es这个域名访问elasticsearch服务
ports:
- 5044:5044
networks:
- elastic
kibana:
image: kibana:7.6.2
container_name: kibana
links:
- elasticsearch:es #可以用es这个域名访问elasticsearch服务
depends_on:
- elasticsearch #kibana在elasticsearch启动之后再启动
environment:
- "elasticsearch.hosts=http://es:9200" #设置访问elasticsearch的地址
- /etc/localtime:/etc/localtime:ro
- /usr/share/zoneinfo:/usr/share/zoneinfo
ports:
- 5601:5601
networks:
- elastic
zookeeper:
image: wurstmeister/zookeeper
container_name: zookeeper
volumes:
- /mydata/zookeeper/data:/data
- /mydata/zookeeper/log:/datalog
- /etc/localtime:/etc/localtime:ro
- /usr/share/zoneinfo:/usr/share/zoneinfo
networks:
- elastic
ports:
- "2181:2181"
kafka:
container_name: kafka
image: wurstmeister/kafka
depends_on:
- zookeeper
volumes:
- /var/run/docker.sock:/var/run/docker.sock
- /mydata/kafka:/kafka
- /etc/localtime:/etc/localtime:ro
- /etc/localtime:/etc/localtime:ro
links:
- zookeeper
ports:
- "9092:9092"
networks:
- elastic
environment:
- KAFKA_LISTENERS=INTERNAL://kafka:9092, OUT://kafka:29092
- KAFKA_ADVERTISED_LISTENERS=INTERNAL://kafka:9092, OUT://kafka:29092
- KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:PLAINTEXT,OUT:PLAINTEXT
- KAFKA_INTER_BROKER_LISTENER_NAME=OUT
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_MESSAGE_MAX_BYTES=2000000
- KAFKA_CREATE_TOPICS=logs:1:1
networks:
elastic:
将该文件上传的 linux 服务器上,执行 docker-compose up 命令即可启动所有服务。
代码语言:javascript复制[root@vm-1]# docker-compose -f docker-compose.yml up -d
[root@vm-1]# docker-compose ps
Name Command State Ports
-----------------------------------------------------------------------------------------------------------
elasticsearch /usr/local/bin/docker-entr ... Up 0.0.0.0:9200->9200/tcp, 0.0.0.0:9300->9300/tcp
kafka start-kafka.sh Up 0.0.0.0:9092->9092/tcp
kibana /usr/local/bin/dumb-init - ... Up 0.0.0.0:5601->5601/tcp
logstash /usr/local/bin/docker-entr ... Up 0.0.0.0:5044->5044/tcp, 9600/tcp
zookeeper /bin/sh -c /usr/sbin/sshd ... Up 0.0.0.0:2181->2181/tcp, 22/tcp, 2888/tcp, 3888/tcp
[root@vm-autotest-server elk]#
filebeat 客户端安装方式:
代码语言:javascript复制curl -L -O https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-7.4.2-linux-x86_64.tar.gz
tar xzvf filebeat-7.4.2-linux-x86_64.tar.gz
cd filebeat-7.4.2-linux-x86_64
6、服务设置
当所有依赖服务启动完成后,需要对以下服务进行一些设置。
代码语言:javascript复制# elasticsearch 需要安装中文分词器 IKAnalyzer,并重新启动。
docker exec -it elasticsearch /bin/bash
#此命令需要在容器中运行
elasticsearch-plugin install https://github.com/medcl/elasticsearch-analysis-ik/releases/download/v7.6.2/elasticsearch-analysis-ik-7.6.2.zip
docker restart elasticsearch
# logstas h需要安装 json_lines 插件,并重新启动。
docker exec -it logstash /bin/bash
logstash-plugin install logstash-codec-json_lines
docker restart logstash
五、配置 Filebeat
修改 filebeat.yml 文件内容
代码语言:javascript复制ilebeat.inputs:
- type: log
enabled: true
paths:
- /var/log/nginx/*.log
filebeat.config.modules:
path: ${path.config}/modules.d/*.yml
reload.enabled: false
setup.template.settings:
index.number_of_shards: 1
setup.dashboards.enabled: false
setup.kibana:
host: "http://kafka:5601"
output.kafka:
hosts: ["kafka:9092"]
topic: 'logs'
codec.json:
pretty: false
参数说明:
「参数」 | 「说明」 |
---|---|
「type」 | 输入类型。设置为log,表示输入源为日志。 |
「enabled」 | 设置配置是否生效。true表示生效,false表示不生效。 |
「paths」 | 需要监控的日志文件的路径。多个日志可在当前路径下另起一行写入日志文件路径。 |
「hosts」 | 消息队列Kafka实例的接入点。 |
「topic」 | 日志输出到消息队列Kafka的Topic,请指定为已创建的Topic。 |
「注意:」客户端 hosts 添加 kafka 对应 server 的 ip 地址 以及 filebeat 配置建议使用 ansible。
代码语言:javascript复制[root@vm-1# cat /etc/hosts
172.16.62.179 kafka
# 客户端启动服务
[root@vm-1#./filebeat &
「更多配置请参见:」
- 有关 filebeat 的 log input 的配置介绍见官网文档:https://www.elastic.co/guide/en/beats/filebeat/current/filebeat-input-log.html
- 有关 filebeat output 到 kafka 的配置介绍见官方文档:https://www.elastic.co/guide/en/beats/filebeat/current/kafka-output.html
六、配置 Logstash 管道
修改 logstash.conf 内容:
代码语言:javascript复制input {
# # 来源beats
# beats {
# 端口
# port => "5044"
# }
kafka {
bootstrap_servers => "kafka:29092"
topics => ["logs"]
group_id => "logstash"
codec => json
}
}
# 分析、过滤插件,可以多个
# filter {
# grok {
# match => { "message" => "%{COMBINEDAPACHELOG}"}
# }
# geoip {
# source => "clientip"
# }
# }
output {
# 选择elasticsearch
elasticsearch {
hosts => ["http://es:9200"]
#index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{ YYYY.MM.dd}"
index => "logs-%{ YYYY.MM.dd}"
}
}
input 参数说明:
「参数」 | 「说明」 |
---|---|
「bootstrap_servers」 | 消息队列 Kafka 实例的接入点 |
「group_id」 | 指定已创建的 Consumer Group 的名称。 |
「topics」 | 指定为已创建的 Topic 的名称,需要与 Filebeat 中配置的 Topic 名称保持一致。 |
「codec」 | 设置为 json,表示解析 JSON 格式的字段,便于在 Kibana 中分析。 |
output 参数说明:
「参数」 | 「说明」 |
---|---|
「hosts」 | ES的访问地址,取值为「http://<es内网地址>:9200」。 |
「user」 | 访问 ES 的用户名,默认为 elastic。 |
「password」 | 访问 ES 的密码。 |
「index」 | 索引名称。设置为 **logs‐%{ YYYY.MM.dd} **表示索引名称以 logs 为前缀,以日期为后缀,例如 「logs-2021.09.28」。 |
「注意:」logstash 中最为关键的地方在于 filter,为了调试 filter 的配置。
「更多配置请参见:」
- 有关 logstash 中 kafka-input 的配置介绍见官方文档:https://www.elastic.co/guide/en/logstash/current/plugins-inputs-kafka.html
- 有关 logstash 中grok-filter 的配置介绍见官方文档:https://www.elastic.co/guide/en/logstash/current/plugins-filters-grok.html
- 有关 logstash 中 output-elasticsearch 的配置介绍见官方文档:https://www.elastic.co/guide/en/logstash/current/plugins-outputs-elasticsearch.html
七、查看 kafka 日志消费状态
操作命令如下:
代码语言:javascript复制# 进入容器
docker exec -it kafka bash
# kafka 默认安装在 /opt/kafka
cd opt/kafka
# 要想查询消费数据,必须要指定组
bash-5.1# bin/kafka-consumer-groups.sh --bootstrap-server 172.16.62.179:9092 --list
logstash
# 查看 topic
bash-5.1# bin/kafka-topics.sh --list --zookeeper 172.16.62.179:2181
__consumer_offsets
logs
# 查看消费情况
bash-5.1# bin/kafka-consumer-groupsdescribe --bootstrap-server 172.16.62.179:9092 --group logstash
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
logstash logs 0 107335 107335 0 logstash-0-c6d82a1c-0f14-4372-b49f-8cd476f54d90 /172.19.0.2 logstash-0
#参数解释:
#--describe 显示详细信息
#--bootstrap-server 指定kafka连接地址
#--group 指定组。
字段解释:
TOPIC | PARTITION | CURRENT-OFFSET | LOG-END-OFFSET | LAG | CONSUMER-ID | HOST | CLIENT-ID |
---|---|---|---|---|---|---|---|
topic名字 | 分区id | 当前已消费的条数 | 总条数 | 未消费的条数 | 消费id | 主机ip | 客户端id |
从上面的信息可以看出,topic 为 logs 总共消费了 107335 条信息, 未消费的条数为 0。也就是说,消费数据没有积压的情况.
八、查看 ES 内容
通过 elasticsearch-head 插件查看 ES 中是否收到了由 logstash 发送过来的日志
九、通过 Kibana 过滤日志数据
1、创建 index-pattern
打开 es,进入首页后,点击“connect to your Elasticsearch index”
填入 es 中的索引名,支持正则匹配,输入 Index pattern(本文使用 logs-*),单击 Next step。
选择“@timestamp”作为时间过滤字段,然后点击“create index pattern”:
创建完成后:
2、查看日志
在左侧导航栏,单击 Discover。
从页面左侧的下拉列表中,选择已创建的索引模式(logs-*)。在页面右上角,选择一段时间,查看对应时间段内的 Filebeat 采集的日志数据。
十、小结
在企业实际项目中,elk 是比较成熟且广泛使用的技术方案。logstash 性能稍弱于 filebeat,一般不直接运行于采集点,推荐使用filebeat。在日志进入elk前,从经验性角度,前置 kafka,一方面作为队列和缓冲,另一方面提供了统一的入口渠道。
源码地址:
- https://github.com/zuozewei/blog-example/tree/master/Performance-testing/04-full-link/Filebeat-Kafka-Logstash-Elasticsearch-Kibana