导语:前面一章讲了Filebeat对接Ckafka,通常的场景是各种beats将数据存到CKafka,然后Logstash将从Ckafka中消息消息进行过滤,再经过Ckafka存入到Elasticsearch 。
一、 Logstash 简介
Logstash 是一个开源的日志处理工具,它可以从多个源头收集数据、过滤收集的数据以及对数据进行存储作为其他用途。
Logstash 灵活性强并且拥有强大的语法分析功能,其插件丰富,支持多种输入和输出源;同时其作为水平可伸缩的数据管道与 Elasticsearch 和 Kibana 配合在日志收集检索方面功能强大。
二、 Logstash 工作原理
Logstash 数据处理可以分为三个阶段:inputs → filters → outputs。
1. inputs:产生数据来源,例如文件、syslog、redis 和 beats 此类来源。
2. filter:修改过滤数据, 在 Logstash 数据管道中属于中间环节,可以根据条件去对事件进行更改。一些常见的过滤器如下:grok、mutate、drop 和 clone 等。
3. outputs:将数据传输到其他地方,一个事件可以传输到多个 outputs,当传输完成后这个事件就结束。Elasticsearch 就是最常见的 outputs。
同时 Logstash 支持编码解码,可以在 inputs 和 outputs 端指定格式。
三、 Logstash 接入 Kafka 的优势
- · 可以异步处理数据,防止突发流量。
- · 解耦,当 Elasticsearch 异常的时候不会影响上游工作。
- · Logstash 过滤消耗资源,如果部署在生产 server 上会影响其性能。
四、 CKafka 接入
1 、准备工作
· Java 版本:java 8
· Logstash 版本:5.5.2 :
· Logstash Ckafka 实例,并且创建相应 topic
Logstash下载地址:https://www.elastic.co/cn/downloads/past-releases/logstash-5-5-2
检查Logstash对kafka的支持:
2 、将logstash的output作为Ckafka的生产者
配置文件:
cd /opt/logstash-5.5.2/bin
[root@VM_1_250_centos bin]# cat ../config/output.conf
代码语言:javascript复制input {
stdin{}
}
output {
kafka {
bootstrap_servers => "10.1.3.90:9092"
topic_id => "topic_test1"
}
}
./logstash -f ../config/output.conf
用logstash生产消息到Ckafka:
用kafka的客户端从Ckafka中消费消息:
3、将logstash的input作为Ckafka的消费者
配置文件:
[root@VM_1_250_centos bin]# cat ../config/input.conf
代码语言:javascript复制input {
kafka {
bootstrap_servers => "10.1.3.90:9092" # ckafka vip 实例地址
group_id => "console-consumer-92728" # ckafka groupid 名称
topics => ["topic_test1"] # ckafka topic 名字
consumer_threads => 1 # 消费线程数,一般跟 ckafka 分区数一致
auto_offset_reset => "latest"
}
}
output {
stdout{codec=>rubydebug}
}
用kafka的客户端生产消息到CKafka中:
用logstash的input作为Ckafka的消费者: