CKafka系列学习文章 - Flume接入Ckafka(九)

2019-09-16 11:04:48 浏览数 (1)

导语:当你的业务系统既要对实时数据进行处理也要对离线数据进行分析时,这时候可以来了解一下Flume。

一、 Apache Flume 简介

Apache Flume 是一个分布式、可靠、高可用的日志收集系统,支持各种各样的数据来源(如 HTTP、Log 文件、JMS、监听端口数据等),能将这些数据源的海量日志数据进行高效收集、聚合、移动,最后存储到指定存储系统中(如 Kafka、分布式文件系统、Solr 搜索服务器等)。

Flume 基本结构如下:

Flume 以 agent 为最小的独立运行单位。一个 agent 就是一个 JVM。单个 agent 由 Source、Sink 和 Channel 三大组件构成。

二、 Flume 与 Kafka

Flume:

把数据存储到 HDFS 或者 HBase 等下游存储模块或者计算模块时需要考虑各种复杂的场景,例如并发写入的量以及系统承载压力、网络延迟等问题。Flume 设计作为灵活的分布式系统具有多种接口,同时提供可定制化的管道。

kafka:

在生产处理环节中,当生产与处理速度不一致时,Kafka 可以充当缓存角色。拥有 partition 结构以及采用 append 追加数据,使 Kafka 具有优秀的吞吐能力;同时其拥有 replication 结构,使 Kafka 具有很高的容错性。

所以将 Flume 和 Kafka 结合起来,可以满足生产环境中绝大多数要求。

三、 filebeat、logstash和flume的特点

在用于做日志收集时,我们发现filebeat、logstash、flume都有类似的功能,它们的特别之处在于:

1. logstash 和filebeat都具有日志收集功能,filebeat更轻量,占用资源更少。filebeat可以发送到logstash,进一步进行日志清洗和过滤。

2. Logstash最值得一提的是,在Filter plugin部分具有比较完备的功能,比如grok,能通过正则解析和结构化任何文本,Grok 目前是Logstash最好的方式对非结构化日志数据解析成结构化和可查询化。此外,Logstash还可以重命名、删除、替换和修改事件字段,当然也包括完全丢弃事件,如debug事件;还有很多的复杂功能供程序员自己选择。

3. Flume本身最初设计的目的是为了把数据传入HDFS中(并不是为了采集日志而设计,这和Logstash有根本的区别),所以理所应当侧重于数据的传输。Flume在集群中最擅长的事情就是做路由了,因为每一个Flume Agent相连就构成了一条链路,这也是众多采集工具中Flume非常出色的亮点

四、 Flume 接入 CKafka

1. 创建Ckafka和创建topic

a. 创建ckafka实例

因为CVM主机跟Ckafka所分配的内网IP不是在一个VPC网络内,所以要添加路由策略的方式来访问Ckafka。

b. 创建topic:

c. 解压已下载的Apache Flume压缩包

http://archive.apache.org/dist/flume/1.7.0/

2 配置 Flume 选项-使用Ckafka作为Sink

a. 编写配置文件

此处重点介绍 Flume 与 CKafka 作为 Sink 结合,Source 和 Channel 使用默认配置。以下是一个简单的 Demo (配置在解压目录的 conf 文件夹下),若无特殊要求则将自己的实例 IP 与 Topic 替换到配置文件当中即可。本例使用的 source 为 tail -F flume-test ,即文件中新增的信息。

代码语言:javascript复制
[root@VM_1_250_centos apache-flume-1.7.0-bin]# cat conf/flume-ckafka-sink.properties 
#ckafka作为sink的demo
agentckafka.sources = exectail
agentckafka.channels = memoryChannel
agentckafka.sinks = kafkaSink

###flume的source配置
#设置source类型,根据不同需求而设置
agentckafka.sources.exectail.type = exec
agentckafka.sources.exectail.command = tail -F ./flume-test
agentckafka.sources.exectail.batchSize = 20
#设置source channel
agentckafka.sources.exectail.channels = memoryChannel

###Ckafka作为Sink的配置
#设置sink类型,此处设置为kafka
agentckafka.sinks.kafkaSink.type = org.apache.flume.sink.kafka.KafkaSink
#此处设置ckafka提供的ip:port
agentckafka.sinks.kafkaSink.brokerList = 10.1.3.90:9092
#此处设置需要导入数据的topic,请先在控制台提前创建好topic
agentckafka.sinks.kafkaSink.topic = topic_test1
#设置sink channel
agentckafka.sinks.kafkaSink.channel = memoryChannel

###Channel使用默认配置
#Each channel's type is defined.
agentckafka.channels.memoryChannel.type = memory
agentckafka.channels.memoryChannel.keep-alive = 10

# Other config values specific to each type of channel
# can be defined as well
# In this case, it specifies the capacity of the memory channel
agentckafka.channels.memoryChannel.capacity = 1000
agentckafka.channels.memoryChannel.transactionCapacity = 1000

b. 启动flume

cd /opt/apache-flume-1.7.0-bin

c. 写入消息到flume-test文件中,此时消息将由flume写入到Ckafka

d. 启动kafka的客户端进行消费

3. 使用Ckafka作为Source

a. 编写配置文件。

此处重点介绍Flume与CKafka作为Source结合,Sink和Channel使用默认配置。以下是一个简单的Demo(配置在解压目录的conf文件夹下)。若无特殊要求则将自己的实例IP与Topic替换到配置文件当中即可。此处使用的sink为logger。

代码语言:javascript复制
[root@VM_1_250_centos apache-flume-1.7.0-bin]# cat conf/flume-kafka-source.properties 
# 以kafka作为source的demo
agentckafka.source = kafkaSource
agentckafka.channels = memoryChannel
agentckafka.sinks = loggerSink

#设置source类型,此处设置为kafka
agentckafka.sources.kafkaSource.type = org.apache.flume.source.kafka.KafkaSource
#此处设置ckafka提供的ip:port
agentckafka.sources.kafkaSource.kafka.bootstrap.servers = 10.1.3.90:9092
#此处设置需要导出数据的topic,请先在控制台提前创建好topic
agentckafka.sources.kafkaSource.kafka.topics = topic_test1
#设置找不到offset数据时的处理方式
agentckafka.sources.kafkaSource.kafka.consumer.auto.offset.reset = earliest
#设置source channel
agentckafka.sources.kafkaSource.channels = memoryChannel

#设置sink
agentckafka.sinks.loggerSink.type = logger
#设置sink channel
agentckafka.sinks.loggerSink.channel = memoryChannel

#Each channel's type is defined
agentckafka.channels.memoryChannel.type = memory
agentckafka.channels.memoryChannel.keep-alive = 10

#Other config values specific to each type of channel(sink or source)
#can be defined as well
#In this case, it specifies the capacity of the memory channel
agentckafka.channels.memoryChannel.capacity = 1000
agentckafka.channels.memoryChannel.transactionCapacity = 1000

b. 启动flume

Cd /opt/apache-flume-1.7.0-bin

0 人点赞