导语:用CKafka作一个消息缓冲,用Filebeat收集日志,然后将日志传到Ckafka中。
一、Filebeat的介绍
Beats 平台 集合了多种单一用途数据采集器。这些采集器安装后可用作轻量型代理,从成百上千或成千上万台机器向目标发送采集数据。
Beats 有多种采集器,我们今天下载Filebeat。
二、下载解压
下载地址:https://www.elastic.co/cn/downloads/past-releases/
https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-6.5.3-linux-x86_64.tar.gz
解压:filebeat-6.5.3-linux-x86_64.tar.gz
三、配置filebeat.yml文件
代码语言:javascript复制filebeat.inputs:
- type: log
enabled: true
paths:
- /var/log/a.log ###要监控的日志文件
setup.template.settings:
index.number_of_shards: 3
output.kafka:
version:0.10.2 ### 根据不同 CKafka 实例开源版本配置
hosts: ["10.1.3.90:9092"] ###接入方式所用的IP和端口
topic: 'topic_test1' ###topic实例名
partition.round_robin:
reachable_only: false
required_acks: 1
compression: none
max_message_bytes: 10000000
四、运行
运行以下命令,启动客户端。
nohup ./filebeat -e -c filebeat.yml &
五、为监控文件增加数据
六、上传Kafka的客户端消费消息
kafka_2.10-0.10.2.0.tar
解压:
/opt/kafka_2.10-0.10.2.0/bin/kafka-console-consumer.sh --bootstrap-server 10.1.3.90:9092 --from-beginning --new-consumer --topic topic_test1
FAQ:
1,加了用户名和密码时:
通不过SASL的验证:
2,没加compression: none
消息内容与其CRC不匹配:
3,filebeat的兼容性
filebeat 5.6的output 支持 Kafka 0.8, 0.9, and 0.10.的服务端版本
filebeat 6.5 的output works with all Kafka versions in between 0.11 and 2.0.0.
Older versions might work as well, but are not supported