Flume学习笔记「建议收藏」

2022-11-16 18:31:10 浏览数 (2)

1.基于尚硅谷做的笔记 2.也参考了几篇我觉得写得比较好的博客,参考链接在文中 3.此外,我也将我在操作过程中遇到的问题以及解决方案都记录了下来

Flume 定义

Flume 是 Cloudera 提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统。Flume 基于流式架构,灵活简单. Flume最主要的作用就是:实时读取服务器本地磁盘的数据,将数据写入到HDFS. 文档查看地址

Flume 基础架构

  1. Agent Agent 是一个 JVM 进程,它以事件的形式将数据从源头送至目的。 Agent 主要有 3 个部分组成:Source、Channel、Sink
  2. Source Source 是负责接收数据到 Flume Agent 的组件。 Source 组件可以处理各种类型、各种格式的日志数据,包括 avro、thrift、exec、jms、spooling directorynetcattaildir、sequence generator、syslog、http、legacy。
  3. Sink Sink 不断地轮询 Channel 中的事件且批量地移除它们,并将这些事件批量写入到存储或索引系统、或者被发送到另一个 Flume Agent。 Sink 组件目的地包括 hdfsloggeravro、thrift、ipc、file、HBase、solr、自定义。
  4. Channel Channel 是位于 Source 和 Sink 之间的缓冲区。 因此,Channel 允许 Source 和 Sink 运作在不同的速率上。Channel 是线程安全的,可以同时处理几个 Source 的写入操作和几个Sink 的读取操作。 Flume 自带两种 Channel:Memory Channel File Channel。 Memory Channel 是内存中的队列。Memory Channel 在不需要关心数据丢失的情景下适用。如果需要关心数据丢失,那么 Memory Channel 就不应该使用,因为程序死亡、机器宕机或者重启都会导致数据丢失。 File Channel 将所有事件写到磁盘。因此在程序关闭或机器宕机的情况下不会丢失数据。
  5. Event 传输单元,Flume 数据传输的基本单元,以 Event 的形式将数据从源头送至目的地。 Event 由Header 和 Body两部分组成,Header 用来存放该 event 的一些属性,为 K-V 结构,Body 用来存放该条数据,形式为字节数组

Flume 安装部署

(1)将 apache-flume-1.9.0-bin.tar.gz 上传到 linux 的/opt/software 目录下 (2)解压 apache-flume-1.9.0-bin.tar.gz 到/opt/module/目录下

(3)修改 apache-flume-1.9.0-bin 的名称为 flume-1.9.0

(4)将 flume-1.9.0/lib 文件夹下的 guava-11.0.2.jar 删除以兼容 Hadoop 3.1.3

监控端口数据

使用 Flume 监听一个端口,收集该端口数据,并打印到控制台: (1)分析

(2)安装 netcat 工具

(3)判断 44444 端口是否被占用

-n 以数字形式显示地址和端口号 -l 仅列出有在 Listen (监听) 的服务状态。 -p 显示建立相关连接的程序名(如下最后一行)

(4)在 flume-1.9.0 目录下创建 jobs 文件夹并进入 jobs 文件夹。

(5)在 jobs 文件夹下创建 Flume Agent 配置文件 flume-netcat-logger.conf。

(6)在 flume-netcat-logger.conf 文件中添加如下内容:

(7)先开启 flume 监听端口 第一种写法:

第二种写法:

参数说明: --conf/ 或 -c:表示配置文件存储在 conf/目录下 --name/ 或 -n:表示给 agent 起名为 a1 --conf-file/ 或 -f:flume 本次启动读取的配置文件是在 jobs 文件夹下的 flume-telnet.conf文件。 -Dflume.root.logger=INFO,console :-D 表示 flume 运行时动态修改flume.root.logger参数属性值,并将控制台日志打印级别设置为 INFO 级别。日志级别包括:log、info、warn、error。 (8)使用 netcat 工具向本机的 44444 端口发送数据

(9)在 Flume 监听页面观察接收数据情况

实时监控单个追加文件

实时监控 Hive 日志,并上传到 HDFS 中 分析:

(1)Flume 要想将数据输出到 HDFS,依赖 Hadoop 相关 jar 包 检查/etc/profile.d/my_env.sh 文件,确认 Hadoop 和 Java 环境变量配置正确 (2)创建 flume-file-hdfs.conf 文件(添加如下内容:)

注意:端口号要和$HADOOP_HOME/etc/hadoop/core-site.xml中的fs.defaultFS的端口号一致

注意:对于所有与时间相关的转义序列,Event Header 中必须存在以 “timestamp”的key(除非 hdfs.useLocalTimeStamp 设置为 true,此方法会使用 TimestampInterceptor 自动添加 timestamp)

(3)运行 Flume

(4)启动 Hadoop 和 Hive 并操作 Hive 产生日志

(5)在 HDFS 上查看文件。

实时监控目录下多个新文件

使用 Flume 监听整个目录的文件,并上传至 HDFS(实时读取目录文件到HDFS) 1.创建配置文件 flume-dir-hdfs.conf

代码语言:javascript复制
#Name the components on this agent
#a1表示agent的名称
# r1表示a1的source的名称
a1.sources = r1
#k1表示a1的sink的名称 
a1.sinks = k1 
#c1表示a1的channel的名称 
a1.channels = c1  
# Describe/configure the source
#定义source类型为目录
a1.sources.r1.type = spooldir  
#被监控的目录
a1.sources.r1.spoolDir = /opt/module/flume-1.9.0/upload
#定义文件上传完 后缀(.COMPLETED)
a1.sources.r1.fileSuffix = .COMPLETED
#是否有文件头
a1.sources.r1.fileHeader = true
#忽略所有以.tmp 结尾的文件,不上传
a1.sources.r1.ignorePattern = ([^ ]*\.tmp)
# Describe the sink
#定义sink类型为hdfs,表示a1的输出目的地是hdfs
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://hadoop102:8020/flume/upload/%Y%m%d/%H
#上传文件的前缀
a1.sinks.k1.hdfs.filePrefix = upload- 
#是否按照时间滚动文件夹
a1.sinks.k1.hdfs.round = true
#多少时间单位创建一个新的文件夹
a1.sinks.k1.hdfs.roundValue = 1
#重新定义时间单位
a1.sinks.k1.hdfs.roundUnit = hour
#是否使用本地时间戳
a1.sinks.k1.hdfs.useLocalTimeStamp = true
#设置每个批次刷新到HDFS上的events数量
a1.sinks.k1.hdfs.batchSize = 100
#设置文件类型为DataStream(普通文本),文件不会被压缩
a1.sinks.k1.hdfs.fileType = DataStream
#设置60秒生成一个新的文件(间隔60秒将临时文件滚动成最终目标文件)
a1.sinks.k1.hdfs.rollInterval = 60
#设置每个文件的滚动大小大概是 128M
a1.sinks.k1.hdfs.rollSize = 134217700
#文件的滚动与 Event 数量无关
a1.sinks.k1.hdfs.rollCount = 0
# Use a channel which buffers events in memory
#表示a1的channel类型是memory内存型
a1.channels.c1.type = memory
#表示a1的channel总容量是1000个event
a1.channels.c1.capacity = 1000
#表示a1的channel从source接收或给sink的最大事件数
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
#表示将r1和c1连接起来
a1.sources.r1.channels = c1
#表示将k1和c1连接起来
a1.sinks.k1.channel = c1

Jetbrains全家桶1年46,售后保障稳定

2.启动监控文件夹命令

说明:在使用 Spooling Directory Source 时,不要在监控目录中创建并持续修改文件;上传完成的文件会以.COMPLETED 结尾;被监控文件夹每 500 毫秒扫描一次文件变动。 3.向 upload 文件夹中添加文件

4.查看 HDFS 上的数据

实时监控目录下的多个追加文件

  1. Exec source 适用于监控一个实时追加的文件,不能实现断点续传;
  2. Spooldir Source适合用于同步新文件,但不适合对实时追加日志的文件进行监听并同步;
  3. Taildir Source适合用于监听多个实时追加的文件,并且能够实现断点续传。

(1)创建配置文件 flume-taildir-hdfs.conf

添加如下内容:

代码语言:javascript复制
#Name the components on this agent
#a1表示agent的名称
a1.sources = r1   # r1表示a1的source的名称
a1.sinks = k1     #k1表示a1的sink的名称
a1.channels = c1  #c1表示a1的channel的名称
# Describe/configure the source
#定义source类型为taildir
a1.sources.r1.type = TAILDIR  
#JSON格式的文件,记录inode、绝对路径和 读取到的最新的位置
a1.sources.r1.positionFile = /opt/module/flume-1.9.0/tail_dir.json
#以空格分隔的文件组列表,每个文件组表示要尾处理的一组文件
a1.sources.r1.filegroups = f1 f2
a1.sources.r1.filegroups.f1 = /opt/module/flume-1.9.0/files1/.*file.*
a1.sources.r1.filegroups.f2 = /opt/module/flume-1.9.0/files2/.*log.*
# Describe the sink
#定义sink类型为hdfs,表示a1的输出目的地是hdfs
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://hadoop102:8020/flume/upload/%Y%m%d/%H
#上传文件的前缀
a1.sinks.k1.hdfs.filePrefix = upload- 
#是否按照时间滚动文件夹
a1.sinks.k1.hdfs.round = true
#多少时间单位创建一个新的文件夹
a1.sinks.k1.hdfs.roundValue = 1
#重新定义时间单位
a1.sinks.k1.hdfs.roundUnit = hour
#是否使用本地时间戳
a1.sinks.k1.hdfs.useLocalTimeStamp = true
#设置每个批次刷新到HDFS上的events数量
a1.sinks.k1.hdfs.batchSize = 100
#设置文件类型为DataStream(普通文本),文件不会被压缩
a1.sinks.k1.hdfs.fileType = DataStream
#设置60秒生成一个新的文件(间隔60秒将临时文件滚动成最终目标文件)
a1.sinks.k1.hdfs.rollInterval = 60
#设置每个文件的滚动大小大概是 128M
a1.sinks.k1.hdfs.rollSize = 134217700
#文件的滚动与 Event 数量无关
a1.sinks.k1.hdfs.rollCount = 0
# Use a channel which buffers events in memory
#表示a1的channel类型是memory内存型
a1.channels.c1.type = memory
#表示a1的channel总容量是1000个event
a1.channels.c1.capacity = 1000
#表示a1的channel从source接收或给sink的最大事件数
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
#表示将r1和c1连接起来
a1.sources.r1.channels = c1
#表示将k1和c1连接起来
a1.sinks.k1.channel = c1

(2)创建files1和files2文件夹

(3)启动监控文件夹命令

(4)向 files1和files2文件夹中追加内容

(5)查看 HDFS 上的数据

Taildir 说明: Taildir Source 维护了一个 json 格式的 position File,其会定期的往 position File中更新每个文件读取到的最新的位置,因此能够实现断点续传。Position File 的格式如下:

注:Linux 中储存文件元数据的区域就叫做 inode,每个 inode 都有一个号码,操作系统用 inode 号码来识别不同的文件,Unix/Linux 系统内部不使用文件名,而使用 inode 号码来识别文件。

Flume 事务

首先Source采集数据,将其封装为event,执行Channel的doPut方法,将event先放入putList里面临时缓存起来,当到达一定的量(batchSize)的时候(或在一定的时间内,如:exec Source:batchTimeout 如果未达到batch data大小,将数据推送到Channel之前等待的时间(以毫秒为单位),默认值为3000)),执行doCommit方法. Put事物流程:

  1. doPut:将批数据写入临时缓冲区putList,putList大小取决于配置Channel的参数transactionCapacity
  2. doCommit:检查Channel内存队列是否足够合并.当Channel的容量足够的时候,提交event成功,并清空putList,开始下一批数据的写入.
  3. doRollback:当Channel的内存队列空间不够时,回滚数据,并清空putList.然后给Source返回一个ChannelException异常

Take事务流程:

  1. sink不断从Channel中拉取event,每拉取一个event,这个event会先放到临时缓冲区takeList当中,takeList大小取决于配置Channel的参数transactionCapacity.
  2. 当一个batchSize的event全部拉取到takeList中,由sink执行写出处理.

今天一直在想一个问题:若是takeList中的event没有达到batchSize,且后面也不会再来数据了,sink还会执行写出处理吗?这让我想到了上面的监控端口数据并上传到HDFS上的案例,虽然只有短短的一两行数据,但是sink还是会执行写出处理的.于是我又想HDFS sink会不会也有一个类似于exec Source:batchTimeout 这样的一个属性,但是呢,在官网上没有找到类似于这样的一个属性.后来去百度查找sink的源码,找到下面这篇博客,终于明白了. 代码参考链接:http://blog.51cto.com/10120275/2052970

代码语言:javascript复制
//HDFS sink
public  Status process()  throws  EventDeliveryException { 

Channel channel = getChannel();  //调用父类getChannel方法,建立Channel与Sink之间的连接
Transaction transaction = channel.getTransaction(); //每次batch提交都建立在一个事务上
transaction.begin();
try  { 

Set<BucketWriter> writers =  new  LinkedHashSet<>();
int  txnEventCount =  0 ;
for  (txnEventCount =  0 ; txnEventCount < batchSize; txnEventCount  ) { 

Event event = channel.take(); //从Channel中取出event
if  (event ==  null ) { 
 //没有新的event的时候,则不需要按照batchSize循环取
break ;
}
...
}
...
}
...
  1. doCommit:如果数据全部发送成功,则清除临时缓冲区takeList
  2. doRollback:数据发送过程中如果出现异常,rollback将临时缓冲区t akeList中的数据归还给channel内存队列。这个操作可能会导致数据重复,如果已经写入一半的event到了HDFS,但是回滚时会向Channel归还整个takeList中的event,后续再次开启事务向HDFS写入这批event时,就出现了数据重复.

Put事务可能会丢数据,如果是像TAILDIR这种可以记录位置信息的source就不会丢数据(只有当事务提交成功之后才会更新位置信息) Take事务可能会导致数据重复 batchSize: 每个Source和Sink都可以配置一个batchSize的参数。这个参数代表一次性到channel中put|take 多少个event transactionCapacity: putList和takeList的初始值 capacity: channel中存储event的总容量 batchSize <= transactionCapacity <= capacity

Flume Agent 内部原理

重要组件:

  1. ChannelSelector ChannelSelector 的作用就是选出 Event 将要被发往哪个 Channel。其共有两种类型,分别是 Replicating(复制) Multiplexing(多路复用)。 ReplicatingSelector 会将同一个 Event 发往所有的 Channel,Multiplexing 会根据相应的原则,将不同的 Event 发往不同的 Channel。
  2. SinkProcessor SinkProcessor 共 有 三 种 类 型 , 分 别 是DefaultSinkProcessorLoadBalancingSinkProcessorFailoverSinkProcessor. DefaultSinkProcessor 对 应 的 是 单 个 Sink. LoadBalancingSinkProcessor 和FailoverSinkProcessor 对应的是 Sink Group,LoadBalancingSinkProcessor 可以实现负载均衡的功能,FailoverSinkProcessor 可以错误恢复的功能。

Flume 拓扑结构

简单串联

这种模式是将多个 flume 顺序连接起来了,从最初的 source 开始到最终 sink 传送的目的存储系统。此模式不建议桥接过多的 flume 数量, flume 数量过多不仅会影响传输速率,而且一旦传输过程中某个节点 flume 宕机,会影响整个传输系统。

复制和多路复用

Flume 支持将事件流向一个或者多个目的地。这种模式可以将相同数据复制到多个channel 中,或者将不同数据分发到不同的 channel 中,sink 可以选择传送到不同的目的地.

负载均衡和故障转移

Flume支持使用将多个sink逻辑上分到一个sink组,sink组配合不同的SinkProcessor可以实现负载均衡和错误恢复的功能。

聚合

这种模式是我们最常见的,也非常实用,日常 web 应用通常分布在上百个服务器,大者甚至上千个、上万个服务器。产生的日志,处理起来也非常麻烦。用 flume 的这种组合方式能很好的解决这一问题,每台服务器部署一个 flume 采集日志,传送到一个集中收集日志的flume,再由此 flume 上传到 hdfs、hive、hbase 等,进行日志分析。

Flume开发案例

复制和多路复用

案例需求 : 使用 Flume-1 监控文件变动,Flume-1 将变动内容传递给 Flume-2,Flume-2 负责存储到 HDFS。同时 Flume-1 将变动内容传递给 Flume-3,Flume-3 负责输出到 Local FileSystem。

(1)在/opt/module/flume-1.9.0/jobs 目录下创建 group1 文件夹

(2)在/opt/module目录下创建多级文件夹(一定要先创建datas/flume3,否则会报错)

(3)创建 flume-file-flume.conf 配置 1 个接收日志文件的 source 和两个 channel、两个 sink,分别输送给 flume-flume-hdfs 和 flume-flume-dir。

添加内容如下:

代码语言:javascript复制
# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2
# 将数据流复制给所有 channel
a1.sources.r1.selector.type = replicating
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/module/hive/logs/hive.log
a1.sources.r1.shell = /bin/bash -c
# Describe the sink
# sink 端的 avro 是一个数据发送者
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop102
a1.sinks.k1.port = 4141
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop102
a1.sinks.k2.port = 4142
# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2

(4)创建 flume-flume-hdfs.conf 配置上级 Flume 输出的 Source,输出是到 HDFS 的 Sink。

添加如下内容:

代码语言:javascript复制
# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1
# Describe/configure the source
# source 端的 avro 是一个数据接收服务
a2.sources.r1.type = avro
a2.sources.r1.bind = hadoop102
a2.sources.r1.port = 4141
# Describe the sink
a2.sinks.k1.type = hdfs
a2.sinks.k1.hdfs.path = hdfs://hadoop102:8020/flume2/%Y%m%d/%H
#上传文件的前缀
a2.sinks.k1.hdfs.filePrefix = flume2- 
#是否按照时间滚动文件夹
a2.sinks.k1.hdfs.round = true
#多少时间单位创建一个新的文件夹
a2.sinks.k1.hdfs.roundValue = 1
#重新定义时间单位
a2.sinks.k1.hdfs.roundUnit = hour
#是否使用本地时间戳
a2.sinks.k1.hdfs.useLocalTimeStamp = true
#积攒多少个 Event 才 flush 到 HDFS 一次
a2.sinks.k1.hdfs.batchSize = 100
#设置文件类型为DataStream(普通文本),文件不会被压缩
a2.sinks.k1.hdfs.fileType = DataStream
#设置30秒生成一个新的文件(间隔30秒将临时文件滚动成最终目标文件)
a2.sinks.k1.hdfs.rollInterval = 30
#设置每个文件的滚动大小大概是 128M
a2.sinks.k1.hdfs.rollSize = 134217700 
#文件的滚动与 Event 数量无关
a2.sinks.k1.hdfs.rollCount = 0
# Describe the channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
#putList和takeList的大小
a2.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1

(5)创建 flume-flume-dir.conf 配置上级 Flume 输出的 Source,输出是到本地目录的 Sink。

添加如下内容:

代码语言:javascript复制
# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c1
# Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop102
a3.sources.r1.port = 4142
# Describe the sink
a3.sinks.k1.type = file_roll
a3.sinks.k1.sink.directory = /opt/module/datas/flume3
# Describe the channel
a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
#putList和takeList的大小
a3.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1

提示:输出的本地目录/opt/module/datas/flume3必须是已经存在的目录,如果该目录不存在,并不会创建新的目录 (6)执行配置文件 分别启动对应的 flume 进程:flume-flume-dir,flume-flume-hdfs,flume-file-flume。(注意:先启服务端,再启客户端)

(7)启动 Hadoop 和 Hive

(8)检查 HDFS 上的数据,检查/opt/module/datas/flume3 目录中的数据

负载均衡和故障转移

案例需求 : 使用 Flume1 监控一个端口,其 sink 组中的 sink 分别对接 Flume2 和 Flume3,采用FailoverSinkProcessor,实现故障转移的功能。

故障转移机制的工作原理是:将失败的sink降级到池中,在池中为它分配一个冷却期,在重试之前,随着连续故障的增加,冷却时间也增加。 一旦sink成功发送event,则将其还原到live池。 假如在一个Agent中,一个Channel对应三个Sink,Sink1和Sink2,Sink3,优先级为Sink1>Sink2>Sink3,所以Channel过来的数据总是先给Sink1,如果Sink1挂掉以后在最大回退周期(也就是processor.maxpenalty属性值,单位毫秒)内自动起来的话Channel传来的数据是不会再考虑Sink1的,他会传给下一个优先级比较高的Sink2,等到maxpenalty时间到时才会考虑Sink1;如果Sink1恢复的时间大于processor.maxpenalty这个值的话那么在下一次传输数据给Sink的时候还是会优先的考虑给Sink1. 参考链接 sink有一个与它们相关联的优先级,数量越大,优先级越高。 如果sink在发送事件时失败,则接下来将尝试具有最高优先级的下一个sink发送事件。 例如,优先级为100的sink在优先级为80的sink之前被激活。 如果没有指定优先级,则根据配置中指定的sink的顺序确定优先级。 (1)在/opt/module/flume-1.9.0/jobs 目录下创建 group2 文件夹

(2)创建 flume-netcat-flume.conf 配置 1 个 netcat source 和 1 个 channel、1 个 sink group(2 个 sink),分别输送给flume-flume-console1 和 flume-flume-console2

添加如下内容:

代码语言:javascript复制
# Name the components on this agent
a1.sources = r1
a1.channels = c1
a1.sinkgroups = g1
a1.sinks = k1 k2
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
a1.sinkgroups.g1.processor.type = failover
#优先级更高的值将在更早时激活sink
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 10
a1.sinkgroups.g1.processor.maxpenalty = 10000
# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop102
a1.sinks.k1.port = 4141
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop102
a1.sinks.k2.port = 4142
# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1

(3)创建 flume-flume-console1.conf 配置上级 Flume 输出的 Source,输出是到本地控制台。

添加如下内容:

代码语言:javascript复制
# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1
# Describe/configure the source
a2.sources.r1.type = avro
a2.sources.r1.bind = hadoop102
a2.sources.r1.port = 4141
# Describe the sink
a2.sinks.k1.type = logger
# Describe the channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1

(4)创建 flume-flume-console2.conf 配置上级 Flume 输出的 Source,输出是到本地控制台

添加如下内容:

代码语言:javascript复制
# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c2
# Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop102
a3.sources.r1.port = 4142
# Describe the sink
a3.sinks.k1.type = logger
# Describe the channel
a3.channels.c2.type = memory
a3.channels.c2.capacity = 1000
a3.channels.c2.transactionCapacity = 100
# Bind the source and sink to the channel
a3.sources.r1.channels = c2
a3.sinks.k1.channel = c2

(5)执行配置文件 分别开启对应配置文件:flume-flume-console2.conf,flume-flume-console1.conf,flume-netcat-flume.conf。

(6)使用 netcat 工具向本机的 44444 端口发送数据

(7)查看 Flume2 及 Flume3 的控制台打印日志 (8)将 Flume2 kill,观察 Flume3 的控制台打印情况。 注:使用 jps -ml 查看 Flume 进程

负载均衡:只需要将flume-netcat-flume.conf配置文件修改为以下内容:

代码语言:javascript复制
# Name the components on this agent
a1.sources = r1
a1.channels = c1
a1.sinkgroups = g1
a1.sinks = k1 k2
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
a1.sinkgroups.g1.processor.type = load_balance
#失败的sink以指数方式退避
a1.sinkgroups.g1.processor.backoff=true
# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop102
a1.sinks.k1.port = 4141
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop102
a1.sinks.k2.port = 4142
# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1

以下是转载 Sink 选择器提供了在多个sink上进行负载均衡流量的功能。 它维护一个活动sink列表的索引来实现负载的分配。

  1. 默认支持了轮询(round_robin)和随机(random)两种选择机制分配负载。
  2. 也可以通过继承自AbstractSinkSelector的自定义类来写一个自定义选择器。

工作时,sink选择器使用其配置的选择机制选择下一个sink并调用它。 如果所选的sink无法正常工作,则sink处理器通过其配置的选择机制选择下一个可用sink。 此实现不会将失败的sink列入黑名单,而是继续乐观地尝试每个可用的sink。 如果所有sink调用都失败了,选择器会将故障抛给sink的运行器。

如果backoff设置为true则启用了退避机制,失败的sink会被放入黑名单,达到一定的超时时间后会自动从黑名单移除。 如从黑名单出来后sink仍然失败,则再次进入黑名单而且超时时间会翻倍,以避免在无响应的sink上浪费过长时间。 如果没有启用退避机制,在禁用此功能的情况下,发生sink传输失败后,会将本次负载传给下一个sink继续尝试,因此这种情况下是不均衡的。 flume轮询是每隔一段时间轮询,而不是每秒轮询一次。所以可能多条在同一时间间隔内的events都被输出到一个sink端. exponential backoff是flume的一种规避算法,规避时间以2的指数倍递增。

聚合

案例需求: hadoop102 上的 Flume-1 监控文件/opt/module/group.log, hadoop103 上的 Flume-2 监控某一个端口的数据流,Flume-1 与 Flume-2 将数据发送给 hadoop104 上的 Flume-3,Flume-3 将最终数据打印到控制台。

(1)分发flume-1.9.0

xsync脚本:

代码语言:javascript复制
#!/bin/bash
if [ $# -lt 1 ]
then
echo "Not Enough Arguement!"
exit;
fi
for host in hadoop102 hadoop103 hadoop104
do
echo =========== $host ===========
for file in $@
do
if [ -e $file ]
then
pdir=$(cd -P $(dirname $file);pwd)
fname=$(basename $file)
ssh $host "mkdir -p $pdir"
rsync -av $pdir/$fname $host:$pdir
else
echo $file does not exists!
fi
done
done

(2)在 hadoop102、hadoop103 以及 hadoop104 的/opt/module/flume-1.9.0/jobs 目录下创建一个group3 文件夹

(3)创建 flume1-logger-flume.conf 配置 Source 用于监控 group.log 文件,配置 Sink 输出数据到下一级 Flume。 在 hadoop102 上编辑配置文件

添加如下内容:

代码语言:javascript复制
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/module/group.log
a1.sources.r1.shell = /bin/bash -c
# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop104
a1.sinks.k1.port = 4141
# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

(4)创建 flume2-netcat-flume.conf 配置 Source 监控端口 44444 数据流,配置 Sink 数据到下一级 Flume: 在 hadoop103 上编辑配置文件

添加如下内容:

代码语言:javascript复制
# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1
# Describe/configure the source
a2.sources.r1.type = netcat
a2.sources.r1.bind = hadoop103
a2.sources.r1.port = 44444
# Describe the sink
a2.sinks.k1.type = avro
a2.sinks.k1.hostname = hadoop104
a2.sinks.k1.port = 4141
# Use a channel which buffers events in memory
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1

(5)创建 flume3-flume-logger.conf 配置 source 用于接收 flume1 与 flume2 发送过来的数据流,最终合并后 sink 到控制台。 在 hadoop104 上编辑配置文件

添加如下内容:

代码语言:javascript复制
# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c1
# Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop104
a3.sources.r1.port = 4141
# Describe the sink
# Describe the sink
a3.sinks.k1.type = logger
# Describe the channel
a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1

(6)执行配置文件 分别开启对应配置文件:flume3-flume-logger.conf,flume2-netcat-flume.conf,flume1-logger-flume.conf。

(7)在 hadoop102上向/opt/module 目录下的 group.log 追加数据

(8)在 hadoop103上向 44444 端口发送数据

(9)检查 hadoop104 上数据

自定义 Interceptor

1)案例需求 使用 Flume 采集服务器本地日志,需要按照日志类型的不同,将不同种类的日志发往不同的分析系统。 2)需求分析 在实际的开发中,一台服务器产生的日志类型可能有很多种,不同类型的日志可能需要发送到不同的分析系统。此时会用到 Flume 拓扑结构中的 Multiplexing 结构,Multiplexing的原理是根据 event 中 Header 的某个 key 的值,将不同的 event 发送到不同的 Channel中,所以我们需要自定义一个 Interceptor,为不同类型的 event 的 Header 中的 key 赋予不同的值。 在该案例中,我们以端口数据模拟日志,以是否包含”flume”模拟不同类型的日志,我们需要自定义 interceptor 区分数据中是否包含”flume”,将其分别发往不同的分析系统(Channel)

(1)创建一个 maven 项目,并引入以下依赖。

代码语言:javascript复制
	<dependencies>
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.9.0</version>
</dependency>
</dependencies>

(2)定义 TypeInterceptor 类并实现 Interceptor 接口。

代码语言:javascript复制
package com.hyj.flume;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
public class TypeInterceptor implements Interceptor { 

//声明一个存放事件的集合
private List<Event> addHeaderEvents;
@Override
public void initialize() { 

//初始化存放事件的集合
addHeaderEvents=new ArrayList<>();
}
//单个事件拦截
@Override
public Event intercept(Event event) { 

//获取事件中的头信息
Map<String, String> headers = event.getHeaders();
//获取事件中的body信息
String body = new String(event.getBody());
//根据body中是否有"flume"来决定添加怎样的头信息
if(body.contains("flume")){ 

//添加头信息
headers.put("type","first");
}else{ 

//添加头信息
headers.put("type","second");
}
return event;
}
//批量事件拦截
@Override
public List<Event> intercept(List<Event> events) { 

//清空集合
addHeaderEvents.clear();
//遍历events
for (Event event : events) { 

//给每一个事件添加头信息
addHeaderEvents.add(intercept(event));
}
//返回结果
return addHeaderEvents;
}
@Override
public void close() { 

}
public static class Builder implements Interceptor.Builder{ 

//该方法主要用来返回自定义类拦截器对象
@Override
public Interceptor build() { 

return new TypeInterceptor();
}
@Override
public void configure(Context context) { 

}
}
}

(3)将此程序打包上传到/opt/module/flume-1.9.0/lib下 (4)在hadoop102,hadoop103和hadoop104的/opt/module/flume-1.9.0/jobs目录下创建group4文件夹

(5)编辑 flume 配置文件 为 hadoop102 上的 Flume1 配置 1 个 netcat source,1 个 sink group(2 个 avro sink),并配置相应的 ChannelSelector 和 interceptor。

添加如下内容:

代码语言:javascript复制
# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
#以空格分隔的拦截器列表,i1表示拦截器名称
a1.sources.r1.interceptors = i1
# 全类名$Builder
a1.sources.r1.interceptors.i1.type = com.hyj.flume.TypeInterceptor$Builder
a1.sources.r1.selector.type = multiplexing
# key -- type
a1.sources.r1.selector.header = type
# 如果头信息中的type对应的值是first则发往c1
a1.sources.r1.selector.mapping.first = c1
# 如果头信息中的type对应的值是second则发往c2
a1.sources.r1.selector.mapping.second = c2
# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop103
a1.sinks.k1.port = 4141
a1.sinks.k2.type=avro
a1.sinks.k2.hostname = hadoop104
a1.sinks.k2.port = 4242
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Use a channel which buffers events in memory
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2

(6)为 hadoop103 上的 Flume2配置一个 avro source 和一个 logger sink

添加如下内容:

代码语言:javascript复制
# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1
# Describe/configure the source
a2.sources.r1.type = avro
a2.sources.r1.bind = hadoop103
a2.sources.r1.port = 4141
# Describe the sink
a2.sinks.k1.type = logger
# Use a channel which buffers events in memory
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a2.sinks.k1.channel = c1
a2.sources.r1.channels = c1

(7)为 hadoop104 上的 Flume3 配置一个 avro source 和一个 logger sink。

添加如下内容:

代码语言:javascript复制
# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c1
# Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop104
a3.sources.r1.port = 4242
# Describe the sink
a3.sinks.k1.type = logger
# Use a channel which buffers events in memory
a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a3.sinks.k1.channel = c1
a3.sources.r1.channels = c1

(8)分别在 hadoop102,hadoop103,hadoop104 上启动 flume 进程,注意先后顺序。

若是在这过程中遇到以下问题:

解决办法

(9)在 hadoop102 使用 netcat 向 localhost:44444 发送数据。

(10)观察 hadoop103 和 hadoop104 打印的日志

自定义 Source

  1. 官方也提供了自定义 source 的接口 根据官方说明自定义MySource 需要继承 AbstractSource 类并实现 Configurable 和 PollableSource 接口。 实现相应方法: getBackOffSleepIncrement() //backoff 步长 getMaxBackOffSleepInterval() //backoff 最长时间 configure(Context context) //初始化 context(读取配置文件内容) process() //获取数据,将其封装成 event 并写入 channel,这个方法将被循环调用。
  2. 需求 使用 flume 接收数据,并给每条数据添加前后缀,输出到控制台。前缀可从 flume 配置文件中配置。

(1)添加 pom 依赖

代码语言:javascript复制
<dependencies>
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.9.0</version>
</dependency>
</dependencies>

(2)编写代码

代码语言:javascript复制
package com.hyj.flume;
import org.apache.flume.Context;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.PollableSource;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.SimpleEvent;
import org.apache.flume.source.AbstractSource;
import java.util.HashMap;
public class MySource extends AbstractSource implements Configurable, PollableSource { 

//定义配置文件将来要读取的字段(声明数据的前后缀)
private String prefix;
private String subfix;
private Long delay;
//初始化配置信息
@Override
public void configure(Context context) { 

/*prefix,subfix,delay会尝试去配置文件中看有没有这个key("pre","sub","delay"),如果有则用配置文件中的value,如果没有则用默认值("pre-","2000L") */
prefix=context.getString("pre","pre-");
subfix=context.getString("sub"); //默认值为null
delay=context.getLong("delay",2000L);
}
@Override
public Status process() throws EventDeliveryException { 

//创建事件头信息
HashMap<String, String> hearderMap = new HashMap<>();
//循环创建事件信息,传给Channel
try { 

for (int i = 0; i < 5; i  ) { 

//创建事件
SimpleEvent event = new SimpleEvent();
// 给事件设置头信息
event.setHeaders(hearderMap);
//给事件设置内容
event.setBody((prefix "flume" i subfix).getBytes());
//将事件写入Channel
getChannelProcessor().processEvent(event);
}
Thread.sleep(delay);
return Status.READY;
} catch (InterruptedException e) { 

e.printStackTrace();
return Status.BACKOFF;
}
}
@Override
public long getBackOffSleepIncrement() { 

return 0;
}
@Override
public long getMaxBackOffSleepInterval() { 

return 0;
}
}

(3)测试 a.将写好的代码打包,上传到/opt/module/flume-1.9.0/lib目录下 b.配置文件

添加如下内容:

代码语言:javascript复制
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = com.hyj.flume.MySource
a1.sources.r1.pre=flume-
#a1.sources.r1.sub=$
a1.sources.r1.delay=3000
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

c.开启任务

自定义 Sink

  1. 介绍: Sink 不断地轮询 Channel 中的事件且批量地移除它们,并将这些事件批量写入到存储或索引系统、或者被发送到另一个 Flume Agent。 Sink 是完全事务性的。在从 Channel 批量删除数据之前,每个 Sink 用 Channel 启动一个事务。批量事件一旦成功写出到存储系统或下一个 Flume Agent,Sink 就利用 Channel 提交事务。事务一旦被提交,该 Channel 从自己的内部缓冲区删除事件。 官方提供了自定义 sink 的接口 根据官方说明自定义MySink 需要继承 AbstractSink 类并实现 Configurable 接口。 实现相应方法: configure(Context context) //初始化 context(读取配置文件内容) process() //从 Channel 读取获取数据(event),这个方法将被循环调用。
  2. 需求 使用 flume 接收数据,并在 Sink 端给每条数据添加前缀和后缀,输出到控制台。前后缀可在 flume 任务配置文件中配置。 configure():读取任务配置文件中的配置信息。 process():从Channel中取数据,添加前后缀,写入日志。 (1)编写代码
代码语言:javascript复制
package com.hyj.flume;
import org.apache.flume.*;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MySink extends AbstractSink implements Configurable { 

private String prefix;
private String subfix;
//创建Logger对象 (哪个类打印数据就用哪个类MySink.class)
private Logger logger= LoggerFactory.getLogger(MySink.class);
@Override
public void configure(Context context) { 

//读取配置文件内容,有默认值
prefix=context.getString("pre","pre-");
//读取配置文件内容,无默认值
subfix=context.getString("sub");
}
@Override
public Status process() throws EventDeliveryException { 

//声明返回值状态信息
Status status;
//获取当前 Sink 绑定的 Channel
Channel channel= getChannel();
//获取事务
Transaction transaction = channel.getTransaction();
//开启事务
transaction.begin();
try { 

Event event;
//从Channel中拉取数据打印到控制台
while (true){ 

event= channel.take();
if(event!=null){ 

break;
}
}
//处理事件(打印)
logger.info(prefix new String(event.getBody()) subfix);
//提交事务
transaction.commit();
return Status.READY;
} catch (ChannelException e) { 

//遇到异常,事务回滚
transaction.rollback();
return Status.BACKOFF; //退避
}finally { 

//关闭事务
transaction.close();
}
}
}

(2)测试 a.将写好的代码打包,上传到/opt/module/flume-1.9.0/lib目录下 b.配置文件

添加如下内容:

代码语言:javascript复制
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
# Describe the sink
a1.sinks.k1.type = com.hyj.flume.MySink
#a1.sinks.k1.pre = p
a1.sinks.k1.sub = s
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

(3)开启任务

Flume 数据流监控

Ganglia 的安装与部署

Ganglia 由 gmond、gmetad 和 gweb 三部分组成。 gmond(Ganglia Monitoring Daemon)是一种轻量级服务,安装在每台需要收集指标数据的节点主机上。使用 gmond,你可以很容易收集很多系统指标数据,如 CPU、内存、磁盘、网络和活跃进程的数据等。 gmetad(Ganglia Meta Daemon)整合所有信息,并将其以 RRD 格式存储至磁盘的服务。 gweb(Ganglia Web)Ganglia 可视化工具,gweb 是一种利用浏览器显示 gmetad 所存储数据的 PHP 前端。在 Web 界面中以图表方式展现集群的运行状态下收集的多种不同指标数据. gmond收集,gmetad存储,gweb展示 1)安装 ganglia (1)规划

代码语言:javascript复制
hadoop102: web gmetad gmod
hadoop103: gmod
hadoop104: gmod

(2)在 hadoop102,hadoop103,hadoop104上分别安装 epel-release

(3)在 hadoop102 安装

(4)在 hadoop103 和 hadoop104 安装

2)在 hadoop102 修改配置文件/etc/httpd/conf.d/ganglia.conf

添加Require ip 192.168.10.102

3)在 hadoop102 修改配置文件/etc/ganglia/gmetad.conf

修改为:

4)在 hadoop102, hadoop103, hadoop104 修改配置文件/etc/ganglia/gmond.conf

修改为:

代码语言:javascript复制
cluster { 

name = "my cluster"  #修改此处
owner = "unspecified"
latlong = "unspecified"
url = "unspecified"
}
udp_send_channel { 

#bind_hostname = yes # Highly recommended, soon to be default.
# This option tells gmond to use a source 
address
# that resolves to the machine's hostname. 
Without
# this, the metrics may appear to come from 
any
# interface and the DNS names associated with
# those IPs will be used to create the RRDs.
# mcast_join = 239.2.11.71
# 数据发送给 hadoop102
host = hadoop102  #修改此处
port = 8649
ttl = 1
}
udp_recv_channel { 

# mcast_join = 239.2.11.71
port = 8649
# 接收来自任意连接的数据
bind = 0.0.0.0   #修改此处
retry_bind = true
# Size of the UDP buffer. If you are handling lots of metrics 
you really
# should bump it up to e.g. 10MB or even higher.
# buffer = 10485760
}

分发 /etc/ganglia/gmond.conf

5)在 hadoop102 修改配置文件/etc/selinux/config

修改为:

代码语言:javascript复制
# This file controls the state of SELinux on the system.
# SELINUX= can take one of these three values:
# enforcing - SELinux security policy is enforced.
# permissive - SELinux prints warnings instead of enforcing.
# disabled - No SELinux policy is loaded.
SELINUX=disabled  #修改此处
# SELINUXTYPE= can take one of these two values:
# targeted - Targeted processes are protected,
# mls - Multi Level Security protection.
SELINUXTYPE=targeted

尖叫提示:selinux 生效需要重启,如果此时不想重启,可以临时生效之:

6)启动 ganglia (1)在 hadoop102, hadoop103, hadoop104 启动

(2)在 hadoop102 启动

7)打开网页浏览 ganglia 页面 http://hadoop102/ganglia 尖叫提示:如果完成以上操作依然出现权限不足错误,请修改/var/lib/ganglia 目录的权限

8)若是还是出现You don’t have permission to access /ganglia on this server.则修改/etc/httpd/conf.d/ganglia.conf为:

代码语言:javascript复制
<Location /ganglia>
#Order deny,allow
Require all granted
#Deny from all
#Allow from 127.0.0.1
#Allow from ::1
# Allow from .example.com
</Location>

然后重启httpd就可以了

操作 Flume 测试监控

1)启动 Flume 任务

2)发送数据观察 ganglia 监测图

字段(图表名称)

字段含义

EventPutAttemptCount

source尝试写入Channel的事件总数量

EventPutSuccessCount

成功写入Channel且提交的事件总数量

EventTakeAttemptCount

sink尝试从Channel拉取事件的总数量

EventTakeSuccessCount

sink成功读取的事件的总数量

StartTime

Channel启动的时间(毫秒)

StopTime

Channel停止的时间(毫秒)

ChannelSize

目前Channel中事件的总数量

ChannelFillPercentage

Channel占用百分比

ChannelCapacity

Channel的容量

你是如何实现 Flume 数据传输的监控的

使用第三方框架 Ganglia 实时监控 Flume。

Flume 的 Source,Sink,Channel 的作用?

(1)Source 组件是专门用来收集数据的,可以处理各种类型、各种格式的日志数据,包括 avro、thrift、exec、jms、spooling directory、netcat、sequence generator、syslog、http、legacy (2)Channel 组件对采集到的数据进行缓存,可以存放在 Memory 或 File 中。 (3)Sink 组件是用于把数据发送到目的地的组件,目的地包括 Hdfs、Logger、avro、thrift、ipc、file、Hbase、solr、自定义.

Flume 的 Channel Selectors

Flume 参数调优

1)Source 增加 Source (使用 Tair Dir Source 时可增加 FileGroups 个数)可以增大 Source的读取数据的能力。例如:当某一个目录产生的文件过多时需要将这个文件目录拆分成多个文件目录,同时配置好多个 Source 以保证 Source 有足够的能力获取到新产生的数据。 batchSize 参数决定 Source 一次批量运输到 Channel 的 event 条数,适当调大这个参数可以提高 Source 搬运 Event 到 Channel 时的性能。 2)Channel type 选择 memory 时 Channel 的性能最好,但是如果 Flume 进程意外挂掉可能会丢失数据。type 选择 file 时 Channel 的容错性更好,但是性能上会比 memory channel 差。 使用 file Channel 时 dataDirs 配置多个不同盘下的目录可以提高性能。 Capacity 参数决定 Channel 可容纳最大的 event 条数。transactionCapacity 参数决定每次 Source 往 channel 里面写的最大 event 条数和每次 Sink 从 channel 里面读的最大event 条数。transactionCapacity 需要大于 Source 和 Sink 的 batchSize 参数。 3)Sink 增加 Sink 的个数可以增加 Sink 消费 event 的能力。Sink 也不是越多越好,够用就行,过多的 Sink 会占用系统资源,造成系统资源不必要的浪费。 batchSize 参数决定 Sink 一次批量从 Channel 读取的 event 条数,适当调大这个参数可以提高 Sink 从 Channel 搬出 event 的性能。

Flume 的事务机制

Flume 的事务机制(类似数据库的事务机制):Flume 使用两个独立的事务分别负责从Soucrce 到 Channel,以及从 Channel 到 Sink 的事件传递。 比如 spooling directory source 为文件的每一行创建一个事件,一旦事务中所有的事件全部传递到 Channel 且提交成功,那么 Soucrce 就将该文件标记为完成。 同理,事务以类似的方式处理从 Channel 到 Sink 的传递过程,如果因为某种原因使得事件无法记录,那么事务将会回滚。且所有的事件都会保持到 Channel 中,等待重新传递。

Flume 采集数据会丢失吗?

根据 Flume 的架构原理,Flume 是不可能丢失数据的,其内部有完善的事务机制,Source 到 Channel 是事务性的,Channel 到 Sink 是事务性的,因此这两个环节不会出现数据的丢失,唯一可能丢失数据的情况是 Channel 采用 memoryChannel,agent 宕机导致数据丢失,或者 Channel 存储数据已满,导致 Source 不再写入,未写入的数据丢失。 Flume 不会丢失数据,但是有可能造成数据的重复,例如数据已经成功由 Sink 发出,但是没有接收到响应,Sink 会再次发送数据,此时可能会导致数据的重复

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/234526.html原文链接:https://javaforall.cn

0 人点赞