Flume

2023-10-17 15:41:16 浏览数 (1)

1 Flume丢包问题

  单机upd的flume source的配置,100 M/s数据量,10w qps flume就开始大量丢包,因此很多公司在搭建系统时,抛弃了Flume,自己研发传输系统,但是往往会参考Flume的Source-Channel-Sink模式。

  一些公司在Flume工作过程中,会对业务日志进行监控,例如Flume agent中有多少条日志,Flume到Kafka后有多少条日志等等,如果数据丢失保持在1%左右是没有问题的,当数据丢失达到5%左右时就必须采取相应措施。

2 Flume与Kafka的选取

  采集层主要可以使用Flume、Kafka两种技术。

  Flume:Flume 是管道流方式,提供了很多的默认实现,让用户通过参数部署,及扩展API。

  Kafka:Kafka是一个可持久化的分布式的消息队列。

  Kafka 是一个非常通用的系统。你可以有许多生产者和很多的消费者共享多个主题Topics。相比之下,Flume是一个专用工具被设计为旨在往HDFS,HBase发送数据。它对HDFS有特殊的优化,并且集成了Hadoop的安全特性。所以,Cloudera 建议如果数据被多个系统消费的话,使用kafka;如果数据被设计给Hadoop使用,使用Flume。

如果需要向HDFS写入数据,Flume需要安装在Hadoop集群上,否则会找不到HDFS文件系统。

  Flume可以使用拦截器实时处理数据。这些对数据屏蔽或者过量是很有用的。Kafka需要外部的流处理系统才能做到。

  Kafka和Flume都是可靠的系统,通过适当的配置能保证零数据丢失。然而,Flume不支持副本事件。于是,如果Flume代理的一个节点奔溃了,即使使用了可靠的文件管道方式,你也将丢失这些事件直到你恢复这些磁盘。如果需要一个高可靠行的管道,那么使用Kafka是个更好的选择。

  Flume和Kafka可以很好地结合起来使用。如果你的设计需要从Kafka到Hadoop的流数据,使用Flume代理并配置Kafka的Source读取数据也是可行的:你没有必要实现自己的消费者。你可以直接利用Flume与HDFS及HBase的结合的所有好处。你可以使用Cloudera Manager对消费者的监控,并且你甚至可以添加拦截器进行一些流处理。

3 日志数据如何采集到Kafka?

  日志采集Flume需要采集日志文件内容,并对日志格式(JSON)进行校验,然后将校验通过的日志发送到Kafka。

可选择TaildirSource和KafkaChannel,并配置日志校验拦截器

3.1 TailDirSource

  TailDirSource相比ExecSource、SpoolingDirectorySource的优势:

  TailDirSource:断点续传、多目录。Flume1.6以前需要自己自定义Source记录每次读取文件位置,实现断点续传。(Apache1.7、CDH1.6版本开始存在)

taildir挂了不会丢数(断点续传),但是有可能数据重复,生产环境通常不处理重复数据,出现重复的概率比较低。处理会影响传输效率。可以在下一级处理(hive dwd sparkstreaming flink布隆)、去重手段(groupby、开窗取窗口第一条、redis;如果需要在Flume处理则可以在taildirsource里面增加自定义事务。

taildir source不支持递归遍历文件夹读取文件。

  ExecSource可以实时搜集数据,但是在Flume不运行或者Shell命令出错的情况下,数据将会丢失。

  SpoolingDirectorySource监控目录,支持断点续传。

3.2 KafkaChannel

  采用Kafka Channel,可以省去Sink,提高效率。

日志采集Flume关键配置如下:

4 flume不采集Nginx日志,通过Logger4j采集日志,优缺点是什么?

  优点:Nginx的日志格式是固定的,但是缺少sessionid,通过logger4j采集的日志是带有sessionid的,而session可以通过redis共享,保证了集群日志中的同一session落到不同的tomcat时,sessionId还是一样的,而且logger4j的方式比较稳定,不会宕机。

  缺点:不够灵活,logger4j的方式和项目结合过于紧密,而flume的方式比较灵活,拔插式比较好,不会影响项目性能。

5 flume和kafka采集日志区别,采集日志时中间停了,怎么记录之前的日志。

Flume采集日志是通过流的方式直接将日志收集到存储层,而kafka是将缓存在kafka集群,待后期可以采集到存储层。

Flume采集中间停了,可以采用文件的方式记录之前的日志,而kafka是采用offset的方式记录之前的日志。

6 flume的source、channel、sink具体是做什么的

  1)source:用于采集数据,Source是产生数据流的地方,同时Source会将产生的数据流传输到Channel,这个有点类似于Java IO部分的Channel。

  Source输入端类型有Avro、Thrift、exec、netcat等,企业中最常用的还是采集日志文件。

  2)channel:用于桥接Sources和Sinks,类似于一个队列。

    ① Channel 被设计为 Event 中转临时缓冲区,存储 Source 收集并且没有被Sink 读取的 Event,为平衡 Source 收集和 Sink 读取的速度,可视为 Flume内部的消息队列。

    ② Channel 线程安全并且具有事务性,⽀持 Source 写失败写,和 Sink 读失败重复读的操作。常⻅的类型包括 Memory Channel, File Channel,Kafka Channel。

  3)sink:从Channel收集数据,将数据写到目标源(可以是下一个Source,也可以是HDFS或者HBase)。

Source到Channel是Put事务,Channel到Sink是Take事务

7 file channel /memory channel/kafka channel

(1)File Channel

  数据存储于磁盘,优势:可靠性高;劣势:传输速度低

  默认容量:100万event

  注意:FileChannel可以通过配置dataDirs指向多个路径,每个路径对应不同的硬盘,增大Flume吞吐量。

(2)Memory Channel

   数据存储于内存,优势:传输速度快;劣势:可靠性差

  默认容量:100个event

(3)Kafka Channel

  数据存储于Kafka,基于磁盘;

  优势:可靠性高;

  传输速度快 Kafka Channel 大于Memory Channel Kafka Sink 原因省去了Sink阶段

(4)Kafka Channel哪个版本产生的?

  Flume1.6 版本产生=》并没有火;因为有bug:event(header body ) ture 和false 控制是否包含header信息,很遗憾,都不起作用。增加了额外清洗的工作量。Flume1.7解决了这个问题,开始火了。

(5)生产环境如何选择

如果下一级是Kafka,优先选择Kafka Channel

如果是金融、对钱要求准确的公司,选择File Channel

如果就是普通的日志,通常可以选择Memory Channel

8 HDFS sink

  时间(半个小时) – hdfs.rollInterval=1800

  大小128m – hdfs.rollSize=134217728

  event个数(0禁止)-- hdfs.rollCount =0

9 Flume拦截器

9.1 拦截器注意事项

  (1)ETL拦截器:主要是用来判断json是否完整。没有做复杂的清洗操作主要是防止过多的降低传输速率。

  (2)时间戳拦截器:主要是解决零点漂移问题

9.2 自定义拦截器

  Source 将 Event 写⼊到 Channel 之前可以使⽤拦截器对 Event 进⾏各种形式的处理, Source 和 Channel 之间可以有多个拦截器,不同拦截器使⽤不同的规则处理 Event,包括时间、主机、 UUID、正则表达式等多种形式的拦截器。

  自定义拦截器步骤:

(1)实现 Interceptor

(2)重写四个方法

  initialize 初始化

  public Event intercept(Event event) 处理单个Event

  public List intercept(List events) 处理多个Event,在这个方法中调用Event intercept(Event event)

  close方法

(3)静态内部类,实现Interceptor.Builder

9.3 拦截器可以不用吗?

  ETL拦截器可以不用;需要在下一级Hive的dwd层和SparkSteaming里面处理,时间戳拦截器建议使用。 如果不用需要采用延迟15-20分钟处理数据的方式,比较麻烦。

10 Flume Channel选择器

  Source 发送的 Event 通过 Channel 选择器来选择以哪种⽅式写⼊到 Channel中, Flume 提供三种类型 Channel 选择器,分别是复制、复⽤和⾃定义选择器。

  1. 复制选择器(Replicating:默认选择器): ⼀个 Source 以复制的⽅式将⼀个 Event 同时写⼊到多个Channel 中,不同的 Sink 可以从不同的 Channel 中获取相同的 Event,⽐如⼀份⽇志数据同时写 Kafka 和 HDFS,⼀个 Event 同时写⼊两个Channel,然后不同类型的 Sink 发送到不同的外部存储。(将数据发往下一级所有通道

  2. 复⽤选择器(Multiplexing): 需要和拦截器配合使⽤,根据 Event 的头信息中不同键值数据来判断 Event 应该写⼊哪个 Channel 中。(选择性发往指定通道

11 Flume监控器

  1)采用Ganglia监控器,监控到Flume尝试提交的次数远远大于最终成功的次数,说明Flume运行比较差。主要是内存不够导致的。

  2)解决办法?

    (1)自身:flume默认内存2000m。考虑增加flume内存,在flume-env.sh配置文件中修改flume内存为 4-6g

    -Xmx与-Xms最好设置一致,减少内存抖动带来的性能影响,如果设置不一致容易导致频繁fullgc。

    (2)找朋友:增加服务器台数

    搞活动 618 =》增加服务器=》用完在退出

    日志服务器配置:8-16g内存、磁盘8T

12 Flume 的负载均衡和故障转移

  ⽬的是为了提⾼整个系统的容错能⼒和稳定性。简单配置就可以轻松实现,⾸先需要设置 Sink 组,同⼀个 Sink 组内有多个⼦ Sink,不同 Sink 之间可以配置成负载均衡或者故障转移。

0 人点赞