Flume浅度学习指南

2019-08-08 15:40:26 浏览数 (1)

Flume简介

cloudera 公司开源的,贡献给Apache基金会

http://flume.apache.org/

http://archive.cloudera.com/c...

只能运行在linux系统上

Flume is a distributed, reliable, and available service for efficiently collecting, aggregating, and moving large amounts of log data. It has a simple and flexible architecture based on streaming data flows. It is robust and fault tolerant with tunable reliability mechanisms and many failover and recovery mechanisms. It uses a simple extensible data model that allows for online analytic application.

flume用来高效的收集、聚合、移动大量的日志数据

有一个基于流式的简单的有弹性的传输模型

有一个健壮的可容错的机制

使用简单,可以扩展的数据模型运行使用到在线实时分析应用中

简单体现在flume-agent的配置及传输模型简单

在线实时分析应用中

flume日志的实时采集->SparkStreaming/Storm/Flink =>mysql/redis=>实时分析的结果进行报表展示

数据(日志)的移动传输工具:

代码语言:javascript复制
日志=>系统运行日志、web服务器的访问日志、客户端的用户行为日志、软件的运行操作日志

可以将数据从数据源中采集并移动到另外一个目的地:

代码语言:javascript复制
数据源=>系统本地日志文件中的数据、jms、avro端口、kafka、系统本地目录下...
目的地=>hdfs、hive、hbase、kafka、系统本地一个文件中...
        

如何将linux本地的一个日志文件中的日志数据采集到hdfs上

  • 脚本 hdfs命令 =>【周期性】上传 #!/bin/sh HADOOP_HOME=/opt/cdh-5.14.2/hadoop-2.6.0-cdh5.14.2 $HADOOP_HOME/bin/hdfs -put /.../xx.log /hdfs 针对项目初期数据量较少时可以使用 , 没有容灾性及稳定性
  • 采用flume日志采集框架=>【实时】采集一个日志文件中实时追加的日志数据并写入到目的地 针对不同的应用场景定义并启动对应的flume-agent实例/进程 source -- 定义从哪里采集数据 exec类型的source可以借助Linux的shell命令实现实时读取一个日志文件中动态追加的日志数据 avro类型 …… channel -- 定义了source采集的数据临时存储地 memory 基于内存的管道容器 file 基于磁盘 sink -- 定义将数据最终写入的-目的地 hdfs类型的sink将数据最终写入到hdfs上 hive类型将数据最终写入到hive表 kafka类型将数据最终写入到kafka分布式消息队列中 ……

Flume-agent实例的模型

代码语言:javascript复制
每个flume-agent实例至少由以下三个功能模块组成
    source模块
        用于监控数据源并进行数据的实时采集,是实时产生数据流的模块
        数据源=>系统本地的一个日志文件中、kafka、jms、系统本地的一个目录下、avro端口  。。。
        source将采集到的数据提交到channel中
    channel模块
        用于连接source和sink的管道容器
        类似一个队列(FIFO)
    sink模块
        从channel中拉取take(剪切)数据并最终将数据写入到目的地
        目的地=>hdfs、hive、hbase、kafka、avro端口...
            
event事件:
    event事件是flume传输日志数据时基本单元,在flume-agent内部数据都是以事件形式存在
        source将采集到的数据封装成一个个的event事件,将事件提交到channel
        sink从channel消费事件并将事件中封装的数据最终写入到目的地
    event事件的数据结构:header   body
        header
            是一个map集合类型
            内部的key-value为该事件的元数据信息,主要用来区分不同的事件
        body
            是一个字节数组类型
            body为我们真正要传输的数据
        
        

Flume的安装使用

代码语言:javascript复制
                
flume-ng-1.6.0-cdh5.14.2

安装
    1、上次解压flume的安装包
        $ tar zxvf  /opt/softwares/flume-ng-1.6.0-cdh5.14.2.tar.gz -C /opt/cdh-5.14.2/
        $ mv apache-flume-1.6.0-cdh5.14.2-bin/ flume-1.6.0-cdh5.14.2  修改目录名称-可选
    2、修改flume配置文件
        $ mv conf/flume-env.sh.template conf/flume-env.sh  修改后环境配置文件才能生效
        $ vi conf/flume-env.sh
            export JAVA_HOME=/opt/cdh-5.14.2/jdk1.8.0_112
    3、针对不同的场景需求配置对应的java属性配置文件并启动flume-agent进程
        
        如何启动一个flume-agent进程
        $ bin/flume-ng agent  
        --name或-n 当前flume-agent实例的别名  
        --conf或-c 当前flume框架的配置文件目录 
        --conf-file,-f 与当前要启动的flume-agent进程相匹配的java属性配置文件的本地路径
            
        Usage: bin/flume-ng <command> [options]...
        
            

案例

flume官方简单案例

定义一个flume-agent去监听读取某台服务器上的某个端口中的数据,并将监听读取到的数据最终写入到flume框架自己的日志文件中

代码语言:javascript复制
# example.conf: A single-node Flume configuration

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = centos01
a1.sources.r1.port = 44444

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
代码语言:javascript复制
        
提交测试:
    $ bin/flume-ng agent -n a1 -c conf/ -f conf/netcat2logger.properties &
确定目标服务器的端口是否已经成功被flume-agent代理进程监听
    $ netstat -antp |grep 44444     --查看端口信息
    $ ps -ef | grep flume  -- 查看进程信息
    
安装一个telnet工具并连接服务器端口写入数据
    $ sudo yum -y install telnet
    
    发送消息数据
    
检查flume的日志文件中的数据
    $ tail -f logs/flume.log
        
    

sources = exec

要求使用flume实时监控读取系统本地一个日志文件中动态追加的日志数据并实时写入到hdfs上的某个目录下

代码语言:javascript复制
# example.conf: A single-node Flume configuration
#同一台Linux上可开启多个flume-agent,但agent别名要区分
a2.sources = r2
a2.sinks = k2
a2.channels = c2

# Describe/configure the source
#依靠的是Linux的命令读取本地文件,Linux的命令不停止flume就不停
a2.sources.r2.type = exec
# tail -F 文件名  即使没有这个-F后面指定的文件,命令也不会停止,容错能力强
a2.sources.r2.command = tail -F /home/chen/Documents/nginx.log


# Use a channel which buffers events in memory
a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100


#声明a2的sink
a2.sinks.k2.type = hdfs
a2.sinks.k2.hdfs.path = hdfs://centos01:8020/flume/weblog
a2.sinks.k2.hdfs.filePrefix = nginxData


# Bind the source and sink to the channel
a2.sources.r2.channels = c2
a2.sinks.k2.channel = c2

报错首先找logs目录

报错:找不到类

缺少jar包

代码语言:javascript复制
[beifeng@bigdata01 hadoop-2.6.0-cdh5.14.2]$ cp share/hadoop/hdfs/hadoop-hdfs-2.6.0-cdh5.14.2.jar /opt/cdh-5.14.2/flume-1.6.0-cdh5.14.2/lib/
[beifeng@bigdata01 hadoop-2.6.0-cdh5.14.2]$ cp share/hadoop/common/hadoop-common-2.6.0-cdh5.14.2.jar  /opt/cdh-5.14.2/flume-1.6.0-cdh5.14.2/lib/
[beifeng@bigdata01 hadoop-2.6.0-cdh5.14.2]$ cp share/hadoop/common/lib/htrace-core4-4.0.1-incubating.jar  /opt/cdh-5.14.2/flume-1.6.0-cdh5.14.2/lib/
[beifeng@bigdata01 hadoop-2.6.0-cdh5.14.2]$ cp share/hadoop/tools/lib/commons-configuration-1.6.jar  /opt/cdh-5.14.2/flume-1.6.0-cdh5.14.2/lib/
[beifeng@bigdata01 hadoop-2.6.0-cdh5.14.2]$ cp share/hadoop/tools/lib/hadoop-auth-2.6.0-cdh5.14.2.jar  /opt/cdh-5.14.2/flume-1.6.0-cdh5.14.2/lib/

sinks = hdfs 优化

解决生成的文件过多过小的问题(希望文件的大小=128M)

将日志文件按照日期分目录存储(按照天分目录存储)

将生成的日志文件的格式改为Text文本格式

修改上个例子的flume-agent属性文件

代码语言:javascript复制
# 声明当前flume-agent的别名及当前的flume-agent实例包含的模块的别名和个数
a2.sources = s2
a2.channels = c2
a2.sinks = k2

# 定义source模块中的s2的类型及与此类型相关的延伸属性
# exec类型的source可以借助执行一条linux shell命令实现读取linux系统上某个文件中的日志数据,其中 cat是一次性读取,tail可以实现实时读取新增加的数据
# shell属性用来声明要执行的命令的运行环境
a2.sources.s2.type = exec
a2.sources.s2.command = tail -F /opt/nginx/access.log
a2.sources.s2.shell = /bin/sh -c


# 定义channel模块中的c2的类型及与此类型相关的延伸属性
a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100


# 定义sink模块中的k2的类型及与此类型相关的延伸属性
a2.sinks.k2.type = hdfs
a2.sinks.k2.hdfs.path = hdfs://192.168.134.101:8020/flume-demo2/%Y%m%d
#启用根据时间生成路径中的转义字符的具体的时间值
a2.sinks.k2.hdfs.round = true
#表示使用本地linux系统时间戳作为时间基准,否则会自动参考事件的header中的时间戳
a2.sinks.k2.hdfs.useLocalTimeStamp = true

#设置文件的前缀
a2.sinks.k2.hdfs.filePrefix = NgnixLog


#设置解决文件过多过小问题
a2.sinks.k2.hdfs.rollInterval = 0
a2.sinks.k2.hdfs.rollSize = 128000000
a2.sinks.k2.hdfs.rollCount = 0
#写入到hdfs的最小副本数,不设置会导致上面的三个参数不生效
a2.sinks.k2.hdfs.minBlockReplicas = 1


#批量写入到hdfs上文件中的最大event数量
#batchSize的值需要小于等于transactionCapacity的值
#从性能上考虑,最优的是batchSize=transactionCapacity
a2.sinks.k2.hdfs.batchSize = 100

# fileType定义的是数据流的格式,默认的数据流的格式为SequenceFile
a2.sinks.k2.hdfs.fileType = DataStream
# 写入到hdfs上的文件的格式(序列化方法)
# 格式改为text后,可以通过cat 或 text 命令查看文件中的日志内容
a2.sinks.k2.hdfs.writeFormat = Text


# 将a2中的source及sink模块绑定到对应的channel模块上
# 一个source模块可以同时绑定多个channel模块,但是一个sink模块只能绑定一个唯一的channel
a2.sources.s2.channels = c2
a2.sinks.k2.channel = c2

sources = spooldir

利用flume监控某个目录下的日志文件,当某个目录下出现符合要求的文件名称的文件时,则对文件中的日志数据进行读取,并将数据最终写入到hdfs上

代码语言:javascript复制
目录
    /opt/data/logs
        nginx-access.log.2018120309
        nginx-access.log.2018120310
代码语言:javascript复制
# example.conf: A single-node Flume configuration
#同一台Linux上可开启多个flume-agent,但agent别名要区分
a2.sources = r2
a2.sinks = k2
a2.channels = c2

# Describe/configure the source
# includePattern 用正则表达式指定要包含的文件
# ignorePattern  用正则表达式指定要忽略的文件
a2.sources.r2.type = spooldir
a2.sources.r2.spoolDir = /home/chen/mylogs
# 由于每次读完会给读完的文件增加.COMPLETED从而形成新文件,需要忽略这些文件
a2.sources.r2.ignorePattern = ^.*.COMPLETED$
# includePattern和ignorePattern会同时生效
a2.sources.r2.includePattern =     ^.*$




# Use a channel
# file类型更安全
# memory类型效率更高
a2.channels.c2.type = file
a2.channels.c2.dataDirs = /opt/modules/flume-1.6.0-cdh5.14.2/data




#声明a2的sink
a2.sinks.k2.type = hdfs
a2.sinks.k2.hdfs.path = hdfs://centos01:8020/flume/weblog/%y%m%d
#启用根据时间生成转义字符的具体的时间值
a2.sinks.k2.hdfs.round = true
#使用本地linux系统时间戳作为时间基准,否则会自动参考事件的header中的时间戳
a2.sinks.k2.hdfs.useLocalTimeStamp = true

a2.sinks.k2.hdfs.filePrefix = nginxData


#设置解决文件过多过小问题
a2.sinks.k2.hdfs.rollInterval = 0
a2.sinks.k2.hdfs.rollSize = 128000000
a2.sinks.k2.hdfs.rollCount = 0
#写入到hdfs的最小副本数,不设置会导致上面的三个参数不生效
a2.sinks.k2.hdfs.minBlockReplicas = 1

#批量写入到hdfs上文件中的最大event数量
#batchSize的值需要小于等于transactionCapacity的值
#从性能上考虑,最优的是batchSize=transactionCapacity
a2.sinks.k2.hdfs.batchSize = 100


# fileType定义的是数据流的格式,默认的数据流的格式为SequenceFile
a2.sinks.k2.hdfs.fileType = DataStream
# 写入到hdfs上的文件的格式(序列化方法)
# 格式改为text后,可以通过cat 或 text 命令查看文件中的日志内容
a2.sinks.k2.hdfs.writeFormat = Text


# Bind the source and sink to the channel
a2.sources.r2.channels = c2
a2.sinks.k2.channel = c2

一台汇总服务器

代码语言:javascript复制
需求:
Nginx服务器集群 -- 10台
    每台服务器上都有一个access.log日志文件
    需要将每台服务器上的日志文件中追加的日志数据实时读取并写入到hdfs上
    
    
    思路1:
        每台Nginx服务器上启动一个flume-agent
            source - exec
            channel - mem
            sink - hdfs
        多个flume-agent同时写入数据到hfds上不利于hdfs的稳定性

    思路2:
        每台Nginx服务器上启动一个flume-agent
            source - exec
            channel - mem
            sink - avro
                type = avro
                hostname = 主机名
                port =  端口号
            将数据统一写入到某台服务器某个端口中
            
        启动一个负责对汇总后的数据统一写入到目的地的flum-agent
            source - avro
                type = avro
                bind =
                port =
            channel - mem
            sink - hdfs

nginxs2flume.properties

代码语言:javascript复制
# example.conf: A single-node Flume configuration
#同一台Linux上可开启多个flume-agent,但agent别名要区分
a2.sources = r2
a2.sinks = k2
a2.channels = c2

# Describe/configure the source
#依靠的是Linux的命令读取本地文件,Linux的命令不停止flume就不停
a2.sources.r2.type = exec
# tail -F 文件名  即使没有这个-F后面指定的文件,命令也不会停止,容错能力强
a2.sources.r2.command = tail -F /home/chen/Documents/nginx.log


# Use a channel which buffers events in memory
a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100


#声明a2的sink
a2.sinks.k2.type = avro
a2.sinks.k2.hostname = centos01
a2.sinks.k2.port = 6666


# Bind the source and sink to the channel
a2.sources.r2.channels = c2
a2.sinks.k2.channel = c2

flume2hdfs.properties

代码语言:javascript复制
# example.conf: A single-node Flume configuration
#同一台Linux上可开启多个flume-agent,但agent别名要区分
a3.sources = r3
a3.sinks = k3
a3.channels = c3

# Describe/configure the source
a3.sources.r3.type = avro
a3.sources.r3.bind = centos01
a3.sources.r3.port = 6666


# Use a channel which buffers events in memory
a3.channels.c3.type = memory
a3.channels.c3.capacity = 1000
a3.channels.c3.transactionCapacity = 100


#声明a3的sink
a3.sinks.k3.type = hdfs
a3.sinks.k3.hdfs.path = hdfs://centos01:8020/flume/weblog/test
a3.sinks.k3.hdfs.writeFormat = Text

# Bind the source and sink to the channel
a3.sources.r3.channels = c3
a3.sinks.k3.channel = c3

Flume Channel Selectors

作用:

代码语言:javascript复制
当一个flume-agent中存在多个channel时,为Source选择下游的Channel进行配置

类型一:Replicating Channel Selector (default) 复制模式

代码语言:javascript复制
    source将事件复制并提交给所有与其绑定的channel中
    默认属性可以省略定义
代码语言:javascript复制
    flume-agent
        source
            s1 -- 希望采集用户日志复制份数为n(n=channel的个数)分配到各个channel中
        channel
            c1
            c2
        sink
            k1 -- hdfs
            k2 -- kafka的某个主题中
代码语言:javascript复制
# 声明agent名称及此agent要使用的source、channel、sink名称
a2.sources = s2
a2.channels = c1 c2
a2.sinks = k1 k2

# 定义source类型及相关属性,需要声明下运行该命令的shell环境
a2.sources.s2.type = exec
a2.sources.s2.command = tail -F /usr/local/nginx/datalog/access.log
# 默认属性就是replicating
a2.sources.s2.selector.type = replicating





# 定义c1
# 定义channel类型及相关属性
a2.channels.c1.type = memory
# 管道的最大容量
a2.channels.c1.capacity = 1000
# 每次最大从source获取的事件数量和sink每次获取的event最大数量
a2.channels.c1.transactionCapacity = 100

# 定义c2
a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100




# k1
# 定义kafka的sink
a2.sinks.k1.type = org.apache.flume.source.kafka.KafkaSource
a2.sinks.k1.kafka.bootstrap.servers = centos01:9092
a2.sinks.k1.kafka.topics = nginxTopic


# k2
# 定义sinks类型及相关属性
a2.sinks.k2.type = hdfs
#设置按天进行生成存储目录,每天生成一个文件夹
a2.sinks.k2.hdfs.path = hdfs://centos01:8020/flume/nginx/%Y%m%d/

#启用根据时间生成转义字符的值
a2.sinks.k2.hdfs.round = true
#使用本地时间戳作为基准
a2.sinks.k2.hdfs.useLocalTimeStamp = true

#设置文件的前缀
a2.sinks.k2.hdfs.filePrefix = flumeLog

#设置解决文件过多过小问题
a2.sinks.k2.hdfs.rollInterval = 0
a2.sinks.k2.hdfs.rollSize = 128000000
a2.sinks.k2.hdfs.rollCount = 0
#写入到hdfs的最小副本数,不设置会导致上面的三个参数不生效
a2.sinks.k2.hdfs.minBlockReplicas = 1

#批量写入到hdfs上文件中的最大event数量
#batchSize的值需要小于等于transactionCapacity的值
a2.sinks.k2.hdfs.batchSize = 100
# fileType定义的是数据流的格式,默认的数据流的格式为SequenceFile
a2.sinks.k2.hdfs.fileType = DataStream
# 写入到hdfs上的文件的格式(序列化方法)
# 格式改为text后,可以通过cat 或 text 命令查看文件中的日志内容
a2.sinks.k2.hdfs.writeFormat = Text

# 绑定sink和source到channel
a2.sinks.k1.channel = c1
a2.sinks.k2.channel = c2
a2.sources.s2.channels = c1 c2

类型二:Multiplexing Channel Selector(网址案例)多路复用模式

代码语言:javascript复制
source根据事件的header头信息中的不同值提交给对应的channel中
事件中自带头信息
    source直接判断即可
    https://blog.csdn.net/looklook5/article/details/40430965--官方案例
事件没有头信息
    需要自定义header头信息,又称为定义拦截器
    http://www.cnblogs.com/Skyar/p/5831935.html    --案例
        使用正则匹配选择器类型对日志分解并指定key名称
        (\w )表示任意单词字符串,范围包括a-zA-Z0-9_
        file_roll类型本地目录提前创建,默认目录下每30s回滚一个文件
        使用echo "192.168.134.101:bigdata01:spark" >> test.log

mul_demo.properties

代码语言:javascript复制
#声明代理及三种模块的别名及数量
a1.sources = s1
a1.channels = c1 c2 c3 c4
a1.sinks = k1 k2 k3 k4


# 定义s1
a1.sources.s1.type = exec
a1.sources.s1.command = tail -F /opt/flume/test.log

# 声明该source的channel selector类型
# multiplexing类型的channel selector会根据header中的某个key的值的不同提交给不同的channel
a1.sources.s1.selector.type = multiplexing
# 声明依据header头的key为course的值来判断提交该条数据给那些channel
# header map( k1->v1,k2->v2 ,k3->v3 )
a1.sources.s1.selector.header = course
# 如果该条数据中的header中的course的值为hadoop,则将该事件提交给c1
a1.sources.s1.selector.mapping.hadoop = c1
a1.sources.s1.selector.mapping.hive = c1 c2
a1.sources.s1.selector.mapping.hbase = c3
# 如果以上都未匹配到则默认提交给c4
a1.sources.s1.selector.default = c4

# 声明一个source的拦截器组件,别名叫 i1
a1.sources.s1.interceptors = i1

# 声明此拦截器的类型 ,regex_extractor为正则匹配拦截器类型
a1.sources.s1.interceptors.i1.type = regex_extractor

# (\w ) 表示可以匹配任意的字符串 范围包括 a-zA-Z0-9_
#  abc:erdfd:gsaafdf
# 将一条日志通过':'的分割并声明对应的key值,最终定义出一个包含三个key-value对的header头信息
# 192.168.134.101:bigdata01:spark  =>
# header{ip->192.168.134.101,domain->bigdata01,course->spark} body (192.168.134.101:bigdata01:spark )
a1.sources.s1.interceptors.i1.regex = (\w ):(\w ):(\w )
a1.sources.s1.interceptors.i1.serializers = s1 s2 s3
a1.sources.s1.interceptors.i1.serializers.s1.name = ip
a1.sources.s1.interceptors.i1.serializers.s2.name = domain
a1.sources.s1.interceptors.i1.serializers.s3.name = course


# 定义 c1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# 定义 c2
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100
# 定义 c3
a1.channels.c3.type = memory
a1.channels.c3.capacity = 1000
a1.channels.c3.transactionCapacity = 100
# 定义 c4
a1.channels.c4.type = memory
a1.channels.c4.capacity = 1000
a1.channels.c4.transactionCapacity = 100


# 定义k1
#file_roll类型为将事件写入到linux本地磁盘
a1.sinks.k1.type = file_roll
#定义将事件写入到本地的目录
a1.sinks.k1.sink.directory = /opt/flume/k1
a1.sinks.k1.sink.rollInterval = 0

# 定义k2
a1.sinks.k2.type = file_roll
a1.sinks.k2.sink.directory = /opt/flume/k2
a1.sinks.k2.sink.rollInterval = 0
# 定义k3
a1.sinks.k3.type = file_roll
a1.sinks.k3.sink.directory = /opt/flume/k3
a1.sinks.k3.sink.rollInterval = 0
# 定义k4
a1.sinks.k4.type = file_roll
a1.sinks.k4.sink.directory = /opt/flume/k4
a1.sinks.k4.sink.rollInterval = 0


# Bind the source and sink to the channel
a1.sources.s1.channels = c1 c2 c3 c4
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2
a1.sinks.k3.channel = c3
a1.sinks.k4.channel = c4



# 最终可以实现的需求

如果source采集的日志为
    192.168.134.101:bigdata01:spark  =》日志发送给 c4
    192.168.134.101:bigdata01:hive  =》日志发送给 c1 c2
    192.168.134.106:bigdata04:hbase  =》日志发送给 c3
    192.168.134.106:bigdata04:hadoop  =》日志发送给 c1
    192.168.134.106:bigdata04:oozie  =》日志发送给 c4
    


Flume Sink Processors

代码语言:javascript复制
作用:
    将一个flume-agent中的多个sink定义到一个Sinkgroups组中
    使组内多sink之间实现故障转移或负载均衡
类型:

Default Sink Processor

代码语言:javascript复制
        一个组内只有一个sink,不强制用户为Sink创建Processor
        就是之前的单sink案例

Failover Sink Processor

代码语言:javascript复制
        通过配置维护了多个sink组成的优先级列表
        需要为所有的sink分配优先级,所有的优先级数字必须是唯一的
        数字越大表示优先获取channel数据
        搭配 Replicating Channel Selector 使用

nginx2avro.properties

代码语言:javascript复制

#以及声明当前的flume-agent应用实例实例中三种模块的数量及别名
a2.sources = s2
a2.channels = c1 c2
a2.sinks = k1 k2


## Failover Sink Processor
#声明一个由k1 k2组成的组,组名称为g1
a2.sinkgroups = g1
a2.sinkgroups.g1.sinks = k1 k2
#通过以下定义可以声明k1和k2的关系时故障转移关系(ha)
a2.sinkgroups.g1.processor.type = failover
#配置各个sink的优先级,只要有大小差别就行
a2.sinkgroups.g1.processor.priority.k1 = 10
a2.sinkgroups.g1.processor.priority.k2 = 5
#声明failover前最大等待时间,默认10s
a2.sinkgroups.g1.processor.maxpenalty = 10000


# 定义source模块的类型及此类型相关的其他属性
a2.sources.s2.type = exec
a2.sources.s2.command = tail -F /usr/local/nginx/datalog/access.log
a2.sources.s2.shell = /bin/sh -c

#定义c1
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100

#定义c2
a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100

# 定义k1
# 注意:avro的归属服务器要在不同节点上,否则失去HA意义,端口号因为不在同一台服务器上所以可以相同
# 因为目前是伪分布,所以服务器只能一样并且端口号要区分开
a2.sinks.k1.type = avro
a2.sinks.k1.hostname = 192.168.134.101
a2.sinks.k1.port = 4545


# 定义k2
a2.sinks.k2.type = avro
a2.sinks.k2.hostname = 192.168.134.101
a2.sinks.k2.port = 4546


# 将source和sink模块绑定到对应的channel管道上
a2.sources.s2.channels = c1 c2
a2.sinks.k1.channel = c1
a2.sinks.k2.channel = c2

avro2hdfs-active.properties

代码语言:javascript复制
#以及声明当前的flume-agent应用实例实例中三种模块的数量及别名
a3.sources = s3
a3.sinks = k3
a3.channels = c3


# 定义source模块的类型及此类型相关的其他属性
a3.sources.s3.type = avro
a3.sources.s3.bind = 192.168.134.101
a3.sources.s3.port = 4545


#定义当前flume-agent应用实例中的channel模块的类型及此类型相关的其他属性
a3.channels.c3.type = memory
a3.channels.c3.capacity = 1000
a3.channels.c3.transactionCapacity = 100


# 定义sink模块的类型及相关属性
a3.sinks.k3.type = hdfs
a3.sinks.k3.hdfs.path = hdfs://192.168.134.101:8020/flume-collector-active/%Y%m%d/

#启用根据时间生成转义字符的值及自动回滚生产日期目录
a3.sinks.k3.hdfs.round = true
#使用本地系统时间戳作为基准进行日期回滚
a3.sinks.k3.hdfs.useLocalTimeStamp = true

#设置文件的前缀,如果不设置则默认值为FlumeData
a3.sinks.k3.hdfs.filePrefix = HiveLog


#设置解决文件过多过小问题,将每个文件的大小控制在128M
#将rollInterval和rollCount属性的值改为0后文件的回滚将不再由时间间隔及事件的数量为依据
#rollSize的值需要是一个127M左右的值,因为每个文件是一个block块,每个block块都还包含元数据信息
a3.sinks.k3.hdfs.rollInterval = 0
a3.sinks.k3.hdfs.rollSize = 128000000
a3.sinks.k3.hdfs.rollCount = 0
#写入到hdfs的最小副本数,不设置会导致上面的三个参数不生效
a3.sinks.k3.hdfs.minBlockReplicas = 1


#批量写入到hdfs上文件中的最大event数量
#batchSize的值需要小于等于transactionCapacity的值
#hdfs类型的sink将数据写入到hdsf上的底层源码执行过程
#假如batchSize=200,transactionCapacity=100
#系统创建一个容量为100个event的临时队列容器(transactionCapacity)-》sink会最多取出200(batchSize)个event塞到transactionCapacity容器中-》因为transactionCapacity的容量不够会报错channelException
a3.sinks.k3.hdfs.batchSize = 100

# fileType定义的是数据流的格式,默认的数据流的格式为SequenceFile
a3.sinks.k3.hdfs.fileType = DataStream
# 写入到hdfs上的文件的格式(序列化方法)
# 格式改为text后,可以通过cat 或 text 命令查看文件中的日志内容
a3.sinks.k3.hdfs.writeFormat = Text


# 将source和sink模块绑定到对应的channel管道上
a3.sources.s3.channels = c3
a3.sinks.k3.channel = c3

avro2hdfs-failover.properties

代码语言:javascript复制
#以及声明当前的flume-agent应用实例实例中三种模块的数量及别名
a4.sources = s4
a4.sinks = k4
a4.channels = c4

# 定义source模块的类型及此类型相关的其他属性
a4.sources.s4.type = avro
a4.sources.s4.bind = 192.168.134.101
a4.sources.s4.port = 4546

#定义当前flume-agent应用实例中的channel模块的类型及此类型相关的其他属性
a4.channels.c4.type = memory
a4.channels.c4.capacity = 1000
a4.channels.c4.transactionCapacity = 100

# 定义sink模块的类型及相关属性
a4.sinks.k4.type = hdfs
a4.sinks.k4.hdfs.path = hdfs://192.168.134.101:8020/flume-collector-failover/%Y%m%d/

#启用根据时间生成转义字符的值及自动回滚生产日期目录
a4.sinks.k4.hdfs.round = true
#使用本地系统时间戳作为基准进行日期回滚
a4.sinks.k4.hdfs.useLocalTimeStamp = true

#设置文件的前缀,如果不设置则默认值为FlumeData
a4.sinks.k4.hdfs.filePrefix = HiveLog


#设置解决文件过多过小问题,将每个文件的大小控制在128M
#将rollInterval和rollCount属性的值改为0后文件的回滚将不再由时间间隔及事件的数量为依据
#rollSize的值需要是一个127M左右的值,因为每个文件是一个block块,每个block块都还包含元数据信息
a4.sinks.k4.hdfs.rollInterval = 0
a4.sinks.k4.hdfs.rollSize = 128000000
a4.sinks.k4.hdfs.rollCount = 0
#写入到hdfs的最小副本数,不设置会导致上面的三个参数不生效
a4.sinks.k4.hdfs.minBlockReplicas = 1


#批量写入到hdfs上文件中的最大event数量
#batchSize的值需要小于等于transactionCapacity的值
#hdfs类型的sink将数据写入到hdsf上的底层源码执行过程
#假如batchSize=200,transactionCapacity=100
#系统创建一个容量为100个event的临时队列容器(transactionCapacity)-》sink会最多取出200(batchSize)个event塞到transactionCapacity容器中-》因为transactionCapacity的容量不够会报错channelException

a4.sinks.k4.hdfs.batchSize = 100

# fileType定义的是数据流的格式,默认的数据流的格式为SequenceFile
a4.sinks.k4.hdfs.fileType = DataStream
# 写入到hdfs上的文件的格式(序列化方法)
# 格式改为text后,可以通过cat 或 text 命令查看文件中的日志内容
a4.sinks.k4.hdfs.writeFormat = Text


# 将source和sink模块绑定到对应的channel管道上
a4.sources.s4.channels = c4
a4.sinks.k4.channel = c4

Load balancing Sink Processor

代码语言:javascript复制
可以配置同属一个组的多个sink之间负载平衡的能力
支持通过round_robin(轮询)或者random(随机)参数来实现事件的分发
默认情况下使用round_robin,也可以自定义分发机制
通常是多个sink绑定在同一个channel上

nginx2avro-balance.properties

代码语言:javascript复制
#以及声明当前的flume-agent应用实例实例中三种模块的数量及别名
#管道留c1即可!!!!!!!!!!!!!!!!!!
a2.sources = s2
a2.sinks = k1 k2
a2.channels = c1

#拷贝修改!!!!!!!!!!!!!!!!!!
## Load balancing Sink Processor
#声明一个由k1 k2组成的组,组名称为g1
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = load_balance
#是否以剔除失败的Sinks。
a1.sinkgroups.g1.processor.backoff = true
#采用何种负载均衡算法,round_robin, random或自定义
a1.sinkgroups.g1.processor.selector = round_robin

# 定义source模块的类型及此类型相关的其他属性
a2.sources.s2.type = exec
a2.sources.s2.command = tail -F /usr/local/nginx/datalog/access.log
a2.sources.s2.shell = /bin/sh -c

#定义c1
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100

# 定义k1
a2.sinks.k1.type = avro
a2.sinks.k1.hostname = 192.168.134.101
a2.sinks.k1.port = 4546

# 定义k2
a2.sinks.k2.type = avro
a2.sinks.k2.hostname = 192.168.134.101
a2.sinks.k2.port = 4545

# 将source和sink模块绑定到对应的channel管道上
# 修改c2为c1!!!!!!!!!!!!!!!!!!
a2.sources.s2.channels = c1
a2.sinks.k1.channel = c1
a2.sinks.k2.channel = c1

avro2hdfs-01.properties

代码语言:javascript复制
#以及声明当前的flume-agent应用实例实例中三种模块的数量及别名
a3.sources = s3
a3.sinks = k3
a3.channels = c3


# 定义source模块的类型及此类型相关的其他属性
a3.sources.s3.type = avro
a3.sources.s3.bind = 192.168.134.101
a3.sources.s3.port = 4545


#定义当前flume-agent应用实例中的channel模块的类型及此类型相关的其他属性
a3.channels.c3.type = memory
a3.channels.c3.capacity = 1000
a3.channels.c3.transactionCapacity = 100


# 定义sink模块的类型及相关属性
#修改路径!!!!!!!!!!!!!!!!!!
a3.sinks.k3.type = hdfs
a3.sinks.k3.hdfs.path = hdfs://192.168.134.101:8020/nginx-flume-01/%Y%m%d/

#启用根据时间生成转义字符的值及自动回滚生产日期目录
a3.sinks.k3.hdfs.round = true
#使用本地系统时间戳作为基准进行日期回滚
a3.sinks.k3.hdfs.useLocalTimeStamp = true

#设置文件的前缀,如果不设置则默认值为FlumeData
a3.sinks.k3.hdfs.filePrefix = HiveLog


#设置解决文件过多过小问题,将每个文件的大小控制在128M
#将rollInterval和rollCount属性的值改为0后文件的回滚将不再由时间间隔及事件的数量为依据
#rollSize的值需要是一个127M左右的值,因为每个文件是一个block块,每个block块都还包含元数据信息
a3.sinks.k3.hdfs.rollInterval = 0
a3.sinks.k3.hdfs.rollSize = 128000000
a3.sinks.k3.hdfs.rollCount = 0
#写入到hdfs的最小副本数,不设置会导致上面的三个参数不生效
a3.sinks.k3.hdfs.minBlockReplicas = 1


#批量写入到hdfs上文件中的最大event数量
#batchSize的值需要小于等于transactionCapacity的值
#hdfs类型的sink将数据写入到hdsf上的底层源码执行过程
#假如batchSize=200,transactionCapacity=100
#系统创建一个容量为100个event的临时队列容器(transactionCapacity)-》sink会最多取出200(batchSize)个event塞到transactionCapacity容器中-》因为transactionCapacity的容量不够会报错channelException

a3.sinks.k3.hdfs.batchSize = 100

# fileType定义的是数据流的格式,默认的数据流的格式为SequenceFile
a3.sinks.k3.hdfs.fileType = DataStream
# 写入到hdfs上的文件的格式(序列化方法)
# 格式改为text后,可以通过cat 或 text 命令查看文件中的日志内容
a3.sinks.k3.hdfs.writeFormat = Text


# 将source和sink模块绑定到对应的channel管道上
a3.sources.s3.channels = c3
a3.sinks.k3.channel = c3

avro2hdfs-02.properties

代码语言:javascript复制
#以及声明当前的flume-agent应用实例实例中三种模块的数量及别名
a4.sources = s4
a4.sinks = k4
a4.channels = c4

# 定义source模块的类型及此类型相关的其他属性
a4.sources.s4.type = avro
a4.sources.s4.bind = 192.168.134.101
a4.sources.s4.port = 4546

#定义当前flume-agent应用实例中的channel模块的类型及此类型相关的其他属性
a4.channels.c4.type = memory
a4.channels.c4.capacity = 1000
a4.channels.c4.transactionCapacity = 100

# 定义sink模块的类型及相关属性
#修改路径!!!!!!!!!!!!!!!!!!
a4.sinks.k4.type = hdfs
a4.sinks.k4.hdfs.path = hdfs://192.168.134.101:8020/nginx-flume02/%Y%m%d/

#启用根据时间生成转义字符的值及自动回滚生产日期目录
a4.sinks.k4.hdfs.round = true
#使用本地系统时间戳作为基准进行日期回滚
a4.sinks.k4.hdfs.useLocalTimeStamp = true

#设置文件的前缀,如果不设置则默认值为FlumeData
a4.sinks.k4.hdfs.filePrefix = HiveLog


#设置解决文件过多过小问题,将每个文件的大小控制在128M
#将rollInterval和rollCount属性的值改为0后文件的回滚将不再由时间间隔及事件的数量为依据
#rollSize的值需要是一个127M左右的值,因为每个文件是一个block块,每个block块都还包含元数据信息
a4.sinks.k4.hdfs.rollInterval = 0
a4.sinks.k4.hdfs.rollSize = 128000000
a4.sinks.k4.hdfs.rollCount = 0
#写入到hdfs的最小副本数,不设置会导致上面的三个参数不生效
a4.sinks.k4.hdfs.minBlockReplicas = 1


#批量写入到hdfs上文件中的最大event数量
#batchSize的值需要小于等于transactionCapacity的值
#hdfs类型的sink将数据写入到hdsf上的底层源码执行过程
#假如batchSize=200,transactionCapacity=100
#系统创建一个容量为100个event的临时队列容器(transactionCapacity)-》sink会最多取出200(batchSize)个event塞到transactionCapacity容器中-》因为transactionCapacity的容量不够会报错channelException

a4.sinks.k4.hdfs.batchSize = 100

# fileType定义的是数据流的格式,默认的数据流的格式为SequenceFile
a4.sinks.k4.hdfs.fileType = DataStream
# 写入到hdfs上的文件的格式(序列化方法)
# 格式改为text后,可以通过cat 或 text 命令查看文件中的日志内容
a4.sinks.k4.hdfs.writeFormat = Text


# 将source和sink模块绑定到对应的channel管道上
a4.sources.s4.channels = c4
a4.sinks.k4.channel = c4

— THE END —

0 人点赞