CKafka系列学习文章 - Logstash接入CKafka (八)

2019-09-16 11:04:53 浏览数 (1)

导语:前面一章讲了Filebeat对接Ckafka,通常的场景是各种beats将数据存到CKafka,然后Logstash将从Ckafka中消息消息进行过滤,再经过Ckafka存入到Elasticsearch 。

一、 Logstash 简介

Logstash 是一个开源的日志处理工具,它可以从多个源头收集数据、过滤收集的数据以及对数据进行存储作为其他用途。

Logstash 灵活性强并且拥有强大的语法分析功能,其插件丰富,支持多种输入和输出源;同时其作为水平可伸缩的数据管道与 Elasticsearch 和 Kibana 配合在日志收集检索方面功能强大。

二、 Logstash 工作原理

Logstash 数据处理可以分为三个阶段:inputs → filters → outputs。

1. inputs:产生数据来源,例如文件、syslog、redis 和 beats 此类来源。

2. filter:修改过滤数据, 在 Logstash 数据管道中属于中间环节,可以根据条件去对事件进行更改。一些常见的过滤器如下:grok、mutate、drop 和 clone 等。

3. outputs:将数据传输到其他地方,一个事件可以传输到多个 outputs,当传输完成后这个事件就结束。Elasticsearch 就是最常见的 outputs。

同时 Logstash 支持编码解码,可以在 inputs 和 outputs 端指定格式。

三、 Logstash 接入 Kafka 的优势

  1. · 可以异步处理数据,防止突发流量。
  2. · 解耦,当 Elasticsearch 异常的时候不会影响上游工作。
  3. · Logstash 过滤消耗资源,如果部署在生产 server 上会影响其性能。

四、 CKafka 接入

1准备工作

· Java 版本:java 8

· Logstash 版本:5.5.2 :

· Logstash Ckafka 实例,并且创建相应 topic

Logstash下载地址:https://www.elastic.co/cn/downloads/past-releases/logstash-5-5-2

检查Logstash对kafka的支持:

2将logstash的output作为Ckafka的生产者

配置文件:

cd /opt/logstash-5.5.2/bin

[root@VM_1_250_centos bin]# cat ../config/output.conf

代码语言:javascript复制
input {
    stdin{}
}

output {
    kafka {
        bootstrap_servers => "10.1.3.90:9092"
        topic_id => "topic_test1"
    }
}

./logstash -f ../config/output.conf

用logstash生产消息到Ckafka:

用kafka的客户端从Ckafka中消费消息:

3、将logstash的input作为Ckafka的消费者

配置文件:

[root@VM_1_250_centos bin]# cat ../config/input.conf

代码语言:javascript复制
input {
 kafka {
     bootstrap_servers => "10.1.3.90:9092" # ckafka vip 实例地址
     group_id => "console-consumer-92728"  # ckafka groupid 名称
     topics => ["topic_test1"] # ckafka topic 名字
     consumer_threads => 1 # 消费线程数,一般跟 ckafka 分区数一致
     auto_offset_reset => "latest"
 }
}
output {
 stdout{codec=>rubydebug}
}

用kafka的客户端生产消息到CKafka中:

用logstash的input作为Ckafka的消费者:

0 人点赞