大数据日志收集框架之Flume实战

2021-04-27 14:21:43 浏览数 (1)

一、环境准备

flume官方文档:http://flume.apache.org/documentation.html

1、安装包下载

jdk1.8

flume1.9.0:http://flume.apache.org/download.html

2、安装flume

代码语言:javascript复制
tar zxvf apache-flume-1.9.0-bin.tar.gz -C /usr/local/
代码语言:javascript复制
ln -s apache-flume-1.9.0-bin flume

3、修改配置文件

代码语言:javascript复制
cd /usr/local/flume/conf
代码语言:javascript复制
cp flume-conf.properties.template flume-conf.properties
cp flume-env.ps1.template flume-env.ps1
cp flume-env.sh.template flume-env.sh

 二、配置环境变量

1、配置java环境变量

代码语言:javascript复制
export JAVA_HOME=/usr/java/jdk1.8.0_241-amd64
export PATH=$PATH:$JAVA_HOME/bin

2、配置flume环境变量

代码语言:javascript复制
export FLUME_HOME=/usr/local/flume
export PATH=$PATH:$FLUME_HOME/bin

三、flume source

1、netcat source

/usr/local/flume 目录下创建 example.conf 文件,文件内容如下

source类型为监控端口,sink类型为日志输出,channel类型为内存,channel的最大存储event数量为1000,每次source发送或者sink接收event的数量为100

代码语言:javascript复制
# example.conf: A single-node Flume configuration

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

启动flume agent,配置文件为 example.conf ,agent名称为 a1 ,以日志形式在控制台显示接收source消息 

代码语言:javascript复制
flume-ng agent --conf conf --conf-file example.conf --name a1 -Dflume.root.logger=INFO,console

也可以使用命令简令, -c 指定flume的配置目录,-f 指定定义组件的配置文件 -n 指定组件中agent的名称,-Dflume.root.logger=INFO,console为flume的运行日志

代码语言:javascript复制
flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/example.conf -n a1 -Dflume.root.logger=INFO,console
代码语言:javascript复制
telnet localhost 44444

 效果如图 ,sink监听本机44444端口,使用telnet向本机44444端口发送消息模拟source端发送消息,可以看到sink端以控制台日志的形式接收了source端的消息发送

flume还支持配置文件使用环境变量,仅限于值使用,变量也可以通过 conf/flume-env.sh 文件配置

将 example.conf source监听的端口 修改为 

代码语言:javascript复制
a1.sources.r1.port = ${BIND_PORT}

需要添加参数 -DpropertiesImplementation=org.apache.flume.node.EnvVarResolverProperties

代码语言:javascript复制
BIND_PORT=44444 flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/example.conf -n a1 -Dflume.root.logger=INFO,console -DpropertiesImplementation=org.apache.flume.node.EnvVarResolverProperties

2、avro source

在flume根目录新建 example文件夹,将 example.conf 文件复制为 netcat_source.conf 文件

代码语言:javascript复制
cd $FLUME_HOME && mkdir example
代码语言:javascript复制
mv $FLUME_HOME/example.conf $FLUME_HOME/example && cp $FLUME_HOME/example.conf  $FLUME_HOME/example/netcat_source.conf
代码语言:javascript复制
cd $FLUME_HOME/example && cp example.conf && vim avro_source.conf

修改 avro_source.conf 为

代码语言:javascript复制
a1.sources.r1.type = avro
a1.sources.r1.bind = ${BIND_IP}
a1.sources.r1.port = ${BIND_PORT}

启动 Agent 

代码语言:javascript复制
BIND_IP=localhost BIND_PORT=55555 flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/example/avro_source.conf -n a1 -Dflume.root.logger=INFO,console -DpropertiesImplementation=org.apache.flume.node.EnvVarResolverProperties

启动 Avro Client

代码语言:javascript复制
flume-ng avro-client -c $FLUME_HOME/conf -H localhost -p 55555 -F /etc/profile

3、exec source

代码语言:javascript复制
cd $FLUME_HOME/example && cp example.conf exec_tail_source.conf && vim exec_tail_source.conf

复制 example.conf 文件为 exec_tail_source.conf,修改以下内容为

代码语言:javascript复制
a1.sources.r1.type = exec
a1.sources.r1.bind = ${BIND_IP}
a1.sources.r1.port = ${BIND_PORT}
a1.sources.r1.command = tail -F ${FLUME_HOME}/example/test.log

启动 Agent 

代码语言:javascript复制
BIND_IP=localhost BIND_PORT=55555 flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/example/exec_tail_source.conf -n a1 -Dflume.root.logger=INFO,console -DpropertiesImplementation=org.apache.flume.node.EnvVarResolverProperties

向监控的文件写入数据

代码语言:javascript复制
ping 127.0.0.1 >> ${FLUME_HOME}/example/test.log
代码语言:javascript复制
tail -F ${FLUME_HOME}/example/test.log

4、spooldir Source

代码语言:javascript复制
cd $FLUME_HOME/example && cp example.conf spooldir_source.conf && vim spooldir_source.conf

复制 example.conf 文件为 exec_tail_source.conf,修改以下内容为

代码语言:javascript复制
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = ${FLUME_HOME}/example/test_spooldir
a1.sources.r1.fileSuffix = .csv
a1.sources.r1.fileHeader = true
# a1.sources.r1.bind = ${BIND_IP}
# a1.sources.r1.port = ${BIND_PORT}

启动 Agent 

代码语言:javascript复制
flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/example/spooldir_source.conf -n a1 -Dflume.root.logger=INFO,console

写入文件

代码语言:javascript复制
cd $FLUME_HOME/example/test_spooldir
echo 111 >> 1.txt
echo 222 >> 2.txt
ll

5、thrift source

代码语言:javascript复制
cd $FLUME_HOME/example && cp example.conf thrift_source.conf && vim thrift_source.conf

复制 example.conf 文件为 thrift.conf,修改以下内容为

代码语言:javascript复制
a1.sources.r1.type = thrift
a1.sources.r1.bind = ${BIND_IP}
a1.sources.r1.port = ${BIND_PORT}

启动 Agent 

代码语言:javascript复制
BIND_IP=0.0.0.0 BIND_PORT=55555 flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/example/thrift_source.conf -n a1 -Dflume.root.logger=INFO,console -DpropertiesImplementation=org.apache.flume.node.EnvVarResolverProperties

发送数据

代码语言:javascript复制
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.api.RpcClient;
import org.apache.flume.api.RpcClientFactory;
import org.apache.flume.event.EventBuilder;

import java.nio.charset.Charset;
import java.util.Arrays;

//import org.apache.flume.api.SecureRpcClientFactory;

public class MyFlumeRpcClient01 {

    public static void main(String[] args) {

//        String hostname = "127.0.0.1";
        String hostname = "192.168.0.181";
        int port = 55555;

        System.out.println((null==args)   "t"   (String.valueOf(args.length)));
        System.out.println(Arrays.toString(args));

        if(null!=args && args.length!=0) {
            hostname = args[0];
            port = Integer.valueOf(args[1]);
        }

        System.out.println(hostname);
        System.out.println(port);

        RpcClient client =  RpcClientFactory.getThriftInstance(hostname, port);
        Event event;

        for(int i=0;i<10;i  ) {
            event = EventBuilder.withBody(String.valueOf(System.currentTimeMillis()), Charset.forName("UTF-8"));
            try {
                client.append(event);
                Thread.sleep(600);
            } catch (EventDeliveryException | InterruptedException e) {
                e.printStackTrace();
                client.close();
                client =  RpcClientFactory.getThriftInstance(hostname, port);
            }
        }
        client.close();

    }
}

Maven配置

代码语言:javascript复制
      org.apache.flume
      flume-ng-core
      1.9.0
    
    
    
      org.apache.flume
      flume-ng-sdk
      1.9.0

Maven打包

代码语言:javascript复制
mvn clean package -DskipTest

 执行 java -cp 命令

代码语言:javascript复制
java -cp real-time-1.0-jar-with-dependencies.jar com.xtd.java.flume.MyFlumeRpcClient01

flume控制台接收thrift发送的时间戳数据 

6、JMS Source

代码语言:javascript复制
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = jms
a1.sources.r1.channels = c1
a1.sources.r1.initialContextFactory = org.apache.activemq.jndi.ActiveMQInitialContextFactory
a1.sources.r1.connectionFactory = GenericConnectionFactory
a1.sources.r1.providerURL = tcp://mqserver:61616
a1.sources.r1.destinationName = BUSINESS_DATA
a1.sources.r1.destinationType = QUEUE

7、Kafka Source

代码语言:javascript复制
tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
tier1.sources.source1.channels = channel1
tier1.sources.source1.batchSize = 5000
tier1.sources.source1.batchDurationMillis = 2000
tier1.sources.source1.kafka.bootstrap.servers = localhost:9092
tier1.sources.source1.kafka.topics = test1, test2
tier1.sources.source1.kafka.consumer.group.id = custom.g.id
代码语言:javascript复制
tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
tier1.sources.source1.channels = channel1
tier1.sources.source1.kafka.bootstrap.servers = localhost:9092
tier1.sources.source1.kafka.topics.regex = ^topic[0-9]$
# the default kafka.consumer.group.id=flume is used

8、NetCat TCP Source

代码语言:javascript复制
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = netcat
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 6666
a1.sources.r1.channels = c1

9、NetCat UDP Source

代码语言:javascript复制
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = netcatudp
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 6666
a1.sources.r1.channels = c1

9、Sequence Generator Source

代码语言:javascript复制
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = seq
a1.sources.r1.channels = c1

10、Syslog TCP Source

代码语言:javascript复制
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = syslogtcp
a1.sources.r1.port = 5140
a1.sources.r1.host = localhost
a1.sources.r1.channels = c1

11、Multiport Syslog TCP Source

代码语言:javascript复制
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = multiport_syslogtcp
a1.sources.r1.channels = c1
a1.sources.r1.host = 0.0.0.0
a1.sources.r1.ports = 10001 10002 10003
a1.sources.r1.portHeader = port

12、Syslog UDP Source

代码语言:javascript复制
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = syslogudp
a1.sources.r1.port = 5140
a1.sources.r1.host = localhost
a1.sources.r1.channels = c1

13、HTTP Source

代码语言:javascript复制
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = http
a1.sources.r1.port = 5140
a1.sources.r1.channels = c1
a1.sources.r1.handler = org.example.rest.RestHandler
a1.sources.r1.handler.nickname = random props
a1.sources.r1.HttpConfiguration.sendServerVersion = false
a1.sources.r1.ServerConnector.idleTimeout = 300

14、Stress Source

代码语言:javascript复制
a1.sources = stresssource-1
a1.channels = memoryChannel-1
a1.sources.stresssource-1.type = org.apache.flume.source.StressSource
a1.sources.stresssource-1.size = 10240
a1.sources.stresssource-1.maxTotalEvents = 1000000
a1.sources.stresssource-1.channels = memoryChannel-1

15、Avro Legacy Source

代码语言:javascript复制
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = org.apache.flume.source.avroLegacy.AvroLegacySource
a1.sources.r1.host = 0.0.0.0
a1.sources.r1.bind = 6666
a1.sources.r1.channels = c1

16、Thrift Legacy Source

代码语言:javascript复制
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = org.apache.flume.source.thriftLegacy.ThriftLegacySource
a1.sources.r1.host = 0.0.0.0
a1.sources.r1.bind = 6666
a1.sources.r1.channels = c1

17、Custom Source

代码语言:javascript复制
 a1.sources = r1
a1.channels = c1
a1.sources.r1.type = org.example.MySource
a1.sources.r1.channels = c1

18、Scribe Source

代码语言:javascript复制
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = org.apache.flume.source.scribe.ScribeSource
a1.sources.r1.port = 1463
a1.sources.r1.workerThreads = 5
a1.sources.r1.channels = c1

 四、flume sink

1、hdfs sink

代码语言:javascript复制
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute

2、hive sink

hive sink需要定义的内容比较多,根据表的字段,分区以及分隔符的不同设置相应与之变化,如下hive建表

代码语言:javascript复制
create table weblogs ( id int , msg string )
    partitioned by (continent string, country string, time string)
    clustered by (id) into 5 buckets
    stored as orc;

 hive_sink.conf

代码语言:javascript复制
a1.channels = c1
a1.channels.c1.type = memory
a1.sinks = k1
a1.sinks.k1.type = hive
a1.sinks.k1.channel = c1
a1.sinks.k1.hive.metastore = thrift://127.0.0.1:9083
a1.sinks.k1.hive.database = logsdb
a1.sinks.k1.hive.table = weblogs
a1.sinks.k1.hive.partition = asia,%{country},%y-%m-%d-%H-%M
a1.sinks.k1.useLocalTimeStamp = false
a1.sinks.k1.round = true
a1.sinks.k1.roundValue = 10
a1.sinks.k1.roundUnit = minute
a1.sinks.k1.serializer = DELIMITED
a1.sinks.k1.serializer.delimiter = "t"
a1.sinks.k1.serializer.serdeSeparator = 't'
a1.sinks.k1.serializer.fieldnames =id,,msg

3、logger sink

代码语言:javascript复制
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = logger
a1.sinks.k1.channel = c1

4、avro sink

代码语言:javascript复制
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = avro
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = 10.10.10.10
a1.sinks.k1.port = 4545

5、Thrift Sink

代码语言:javascript复制
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = thrift
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = 10.10.10.10
a1.sinks.k1.port = 4545

6、IRC Sink

代码语言:javascript复制
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = irc
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = irc.yourdomain.com
a1.sinks.k1.nick = flume
a1.sinks.k1.chan = #flume

7、File Roll Sink

代码语言:javascript复制
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = file_roll
a1.sinks.k1.channel = c1
a1.sinks.k1.sink.directory = /var/log/flume

8、Null Sink

代码语言:javascript复制
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = null
a1.sinks.k1.channel = c1

9、HBase1Sink

代码语言:javascript复制
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = hbase
a1.sinks.k1.table = foo_table
a1.sinks.k1.columnFamily = bar_cf
a1.sinks.k1.serializer = org.apache.flume.sink.hbase.RegexHbaseEventSerializer
a1.sinks.k1.channel = c1

10、HBase2Sink

代码语言:javascript复制
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = hbase2
a1.sinks.k1.table = foo_table
a1.sinks.k1.columnFamily = bar_cf
a1.sinks.k1.serializer = org.apache.flume.sink.hbase2.RegexHBase2EventSerializer
a1.sinks.k1.channel = c1

10、AsyncHBaseSink

代码语言:javascript复制
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = asynchbase
a1.sinks.k1.table = foo_table
a1.sinks.k1.columnFamily = bar_cf
a1.sinks.k1.serializer = org.apache.flume.sink.hbase.SimpleAsyncHbaseEventSerializer
a1.sinks.k1.channel = c1

11、MorphlineSolrSink

代码语言:javascript复制
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = org.apache.flume.sink.solr.morphline.MorphlineSolrSink
a1.sinks.k1.channel = c1
a1.sinks.k1.morphlineFile = /etc/flume-ng/conf/morphline.conf
# a1.sinks.k1.morphlineId = morphline1
# a1.sinks.k1.batchSize = 1000
# a1.sinks.k1.batchDurationMillis = 1000

12、ElasticSearchSink

代码语言:javascript复制
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = elasticsearch
a1.sinks.k1.hostNames = 127.0.0.1:9200,127.0.0.2:9300
a1.sinks.k1.indexName = foo_index
a1.sinks.k1.indexType = bar_type
a1.sinks.k1.clusterName = foobar_cluster
a1.sinks.k1.batchSize = 500
a1.sinks.k1.ttl = 5d
a1.sinks.k1.serializer = org.apache.flume.sink.elasticsearch.ElasticSearchDynamicSerializer
a1.sinks.k1.channel = c1

13、Kite Dataset Sink

14、Kafka Sink

代码语言:javascript复制
a1.sinks.k1.channel = c1
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = mytopic
a1.sinks.k1.kafka.bootstrap.servers = localhost:9092
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
a1.sinks.k1.kafka.producer.compression.type = snappy

15、TSL Kafka Sink

代码语言:javascript复制
a1.sinks.sink1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.sink1.kafka.bootstrap.servers = kafka-1:9093,kafka-2:9093,kafka-3:9093
a1.sinks.sink1.kafka.topic = mytopic
a1.sinks.sink1.kafka.producer.security.protocol = SSL
# optional, the global truststore can be used alternatively
a1.sinks.sink1.kafka.producer.ssl.truststore.location = /path/to/truststore.jks
a1.sinks.sink1.kafka.producer.ssl.truststore.password = 

16、HTTP Sink

代码语言:javascript复制
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = http
a1.sinks.k1.channel = c1
a1.sinks.k1.endpoint = http://localhost:8080/someuri
a1.sinks.k1.connectTimeout = 2000
a1.sinks.k1.requestTimeout = 2000
a1.sinks.k1.acceptHeader = application/json
a1.sinks.k1.contentTypeHeader = application/json
a1.sinks.k1.defaultBackoff = true
a1.sinks.k1.defaultRollback = true
a1.sinks.k1.defaultIncrementMetrics = false
a1.sinks.k1.backoff.4XX = false
a1.sinks.k1.rollback.4XX = false
a1.sinks.k1.incrementMetrics.4XX = true
a1.sinks.k1.backoff.200 = false
a1.sinks.k1.rollback.200 = false
a1.sinks.k1.incrementMetrics.200 = true

17、Custom Sink

代码语言:javascript复制
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = org.example.MySink
a1.sinks.k1.channel = c1

18、自定义source和sink

参考官网开发者文档:http://flume.apache.org/releases/content/1.9.0/FlumeDeveloperGuide.html

五、flume channel

1、Memory Channel

代码语言:javascript复制
a1.channels = c1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 10000
a1.channels.c1.byteCapacityBufferPercentage = 20
a1.channels.c1.byteCapacity = 800000

2、JDBC Channel

代码语言:javascript复制
a1.channels = c1
a1.channels.c1.type = jdbc

3、Kafka Channel

代码语言:javascript复制
a1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.channel1.kafka.bootstrap.servers = kafka-1:9092,kafka-2:9092,kafka-3:9092
a1.channels.channel1.kafka.topic = channel1
a1.channels.channel1.kafka.consumer.group.id = flume-consumer

4、TSL Kafka Channel

代码语言:javascript复制
a1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.channel1.kafka.bootstrap.servers = kafka-1:9093,kafka-2:9093,kafka-3:9093
a1.channels.channel1.kafka.topic = channel1
a1.channels.channel1.kafka.consumer.group.id = flume-consumer
a1.channels.channel1.kafka.producer.security.protocol = SSL
# optional, the global truststore can be used alternatively
a1.channels.channel1.kafka.producer.ssl.truststore.location = /path/to/truststore.jks
a1.channels.channel1.kafka.producer.ssl.truststore.password = 
a1.channels.channel1.kafka.consumer.security.protocol = SSL
# optional, the global truststore can be used alternatively
a1.channels.channel1.kafka.consumer.ssl.truststore.location = /path/to/truststore.jks
a1.channels.channel1.kafka.consumer.ssl.truststore.password = 

5、File Channel

代码语言:javascript复制
a1.channels = c1
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /mnt/flume/checkpoint
a1.channels.c1.dataDirs = /mnt/flume/data

6、Spillable Memory Channel

代码语言:javascript复制
a1.channels = c1
a1.channels.c1.type = SPILLABLEMEMORY
a1.channels.c1.memoryCapacity = 10000
a1.channels.c1.overflowCapacity = 1000000
a1.channels.c1.byteCapacity = 800000
a1.channels.c1.checkpointDir = /mnt/flume/checkpoint
a1.channels.c1.dataDirs = /mnt/flume/data

7、Pseudo Transaction Channel

代码语言:javascript复制
a1.channels = c1
a1.channels.c1.type = org.example.MyChannel

六、Flume Channel Selector

1、Replicating Channel Selector (default)

代码语言:javascript复制
a1.sources = r1
a1.channels = c1 c2 c3
a1.sources.r1.selector.type = replicating
a1.sources.r1.channels = c1 c2 c3
a1.sources.r1.selector.optional = c3

2、Multiplexing Channel Selector

代码语言:javascript复制
a1.sources = r1
a1.channels = c1 c2 c3 c4
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = state
a1.sources.r1.selector.mapping.CZ = c1
a1.sources.r1.selector.mapping.US = c2 c3
a1.sources.r1.selector.default = c4

3、Custom Channel Selector

代码语言:javascript复制
a1.sources = r1
a1.channels = c1
a1.sources.r1.selector.type = org.example.MyChannelSelector

七、Flume Sink Processors

1、Default Sink Processor

代码语言:javascript复制
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = load_balance

2、Failover Sink Processor

代码语言:javascript复制
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 10
a1.sinkgroups.g1.processor.maxpenalty = 10000

3、Load balancing Sink Processor

代码语言:javascript复制
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.backoff = true
a1.sinkgroups.g1.processor.selector = random

4、Body Text Serializer

代码语言:javascript复制
a1.sinks = k1
a1.sinks.k1.type = file_roll
a1.sinks.k1.channel = c1
a1.sinks.k1.sink.directory = /var/log/flume
a1.sinks.k1.sink.serializer = text
a1.sinks.k1.sink.serializer.appendNewline = false

八、Flume Event Serializers

1、Body Text Serializer

代码语言:javascript复制
a1.sinks = k1
a1.sinks.k1.type = file_roll
a1.sinks.k1.channel = c1
a1.sinks.k1.sink.directory = /var/log/flume
a1.sinks.k1.sink.serializer = text
a1.sinks.k1.sink.serializer.appendNewline = false

2、“Flume Event” Avro Event Serializer

代码语言:javascript复制
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S
a1.sinks.k1.serializer = avro_event
a1.sinks.k1.serializer.compressionCodec = snappy

3、Avro Event Serializer

代码语言:javascript复制
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S
a1.sinks.k1.serializer = org.apache.flume.sink.hdfs.AvroEventSerializer$Builder
a1.sinks.k1.serializer.compressionCodec = snappy
a1.sinks.k1.serializer.schemaURL = hdfs://namenode/path/to/schema.avsc

九、Flume Interceptors

1、default interceptor

代码语言:javascript复制
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.interceptors = i1 i2
a1.sources.r1.interceptors.i1.type = org.apache.flume.interceptor.HostInterceptor$Builder
a1.sources.r1.interceptors.i1.preserveExisting = false
a1.sources.r1.interceptors.i1.hostHeader = hostname
a1.sources.r1.interceptors.i2.type = org.apache.flume.interceptor.TimestampInterceptor$Builder
a1.sinks.k1.filePrefix = FlumeData.%{CollectorHost}.%Y-%m-%d
a1.sinks.k1.channel = c1

2、Timestamp Interceptor

代码语言:javascript复制
a1.sources = r1
a1.channels = c1
a1.sources.r1.channels =  c1
a1.sources.r1.type = seq
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = timestamp

3、Host Interceptor

代码语言:javascript复制
a1.sources = r1
a1.channels = c1
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = host

4、Static Interceptor

代码语言:javascript复制
a1.sources = r1
a1.channels = c1
a1.sources.r1.channels =  c1
a1.sources.r1.type = seq
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = static
a1.sources.r1.interceptors.i1.key = datacenter
a1.sources.r1.interceptors.i1.value = NEW_YORK

5、Remove Header Interceptor

6、UUID Interceptor

7、Morphline Interceptor

代码语言:javascript复制
a1.sources.avroSrc.interceptors = morphlineinterceptor
a1.sources.avroSrc.interceptors.morphlineinterceptor.type = org.apache.flume.sink.solr.morphline.MorphlineInterceptor$Builder
a1.sources.avroSrc.interceptors.morphlineinterceptor.morphlineFile = /etc/flume-ng/conf/morphline.conf
a1.sources.avroSrc.interceptors.morphlineinterceptor.morphlineId = morphline1

8、Search and Replace Interceptor

代码语言:javascript复制
a1.sources.avroSrc.interceptors = search-replace
a1.sources.avroSrc.interceptors.search-replace.type = search_replace

# Remove leading alphanumeric characters in an event body.
a1.sources.avroSrc.interceptors.search-replace.searchPattern = ^[A-Za-z0-9_] 
a1.sources.avroSrc.interceptors.search-replace.replaceString =
代码语言:javascript复制
a1.sources.avroSrc.interceptors = search-replace
a1.sources.avroSrc.interceptors.search-replace.type = search_replace

# Use grouping operators to reorder and munge words on a line.
a1.sources.avroSrc.interceptors.search-replace.searchPattern = The quick brown ([a-z] ) jumped over the lazy ([a-z] )
a1.sources.avroSrc.interceptors.search-replace.replaceString = The hungry $2 ate the careless $1

9、Regex Filtering Interceptor

10、Regex Extractor Interceptor

代码语言:javascript复制
a1.sources.r1.interceptors.i1.regex = (\d):(\d):(\d)
a1.sources.r1.interceptors.i1.serializers = s1 s2 s3
a1.sources.r1.interceptors.i1.serializers.s1.name = one
a1.sources.r1.interceptors.i1.serializers.s2.name = two
a1.sources.r1.interceptors.i1.serializers.s3.name = three
代码语言:javascript复制
a1.sources.r1.interceptors.i1.regex = ^(?:\n)?(\d\d\d\d-\d\d-\d\d\s\d\d:\d\d)
a1.sources.r1.interceptors.i1.serializers = s1
a1.sources.r1.interceptors.i1.serializers.s1.type = org.apache.flume.interceptor.RegexExtractorInterceptorMillisSerializer
a1.sources.r1.interceptors.i1.serializers.s1.name = timestamp
a1.sources.r1.interceptors.i1.serializers.s1.pattern = yyyy-MM-dd HH:mm

十、Flume Properties

1、Environment Variable Config Filter

代码语言:javascript复制
a1.sources = r1
a1.channels = c1
a1.configfilters = f1

a1.configfilters.f1.type = env

a1.sources.r1.channels =  c1
a1.sources.r1.type = http
a1.sources.r1.keystorePassword = ${f1['my_keystore_password']} #will get the value Secret123

2、External Process Config Filter

代码语言:javascript复制
a1.sources = r1
a1.channels = c1
a1.configfilters = f1

a1.configfilters.f1.type = external
a1.configfilters.f1.command = /usr/bin/passwordResolver.sh
a1.configfilters.f1.charset = UTF-8

a1.sources.r1.channels =  c1
a1.sources.r1.type = http
a1.sources.r1.keystorePassword = ${f1['my_keystore_password']} #will get the value Secret123
代码语言:javascript复制
a1.sources = r1
a1.channels = c1
a1.configfilters = f1

a1.configfilters.f1.type = external
a1.configfilters.f1.command = /usr/bin/generateUniqId.sh
a1.configfilters.f1.charset = UTF-8

a1.sinks = k1
a1.sinks.k1.type = file_roll
a1.sinks.k1.channel = c1
a1.sinks.k1.sink.directory = /var/log/flume/agent_${f1['agent_name']} # will be /var/log/flume/agent_1234

3、Hadoop Credential Store Config Filter

代码语言:javascript复制
a1.sources = r1
a1.channels = c1
a1.configfilters = f1

a1.configfilters.f1.type = hadoop
a1.configfilters.f1.credential.provider.path = jceks://file/

a1.sources.r1.channels =  c1
a1.sources.r1.type = http
a1.sources.r1.keystorePassword = ${f1['my_keystore_password']} #will get the value from the credential store

4、Log4J Appender

代码语言:javascript复制
log4j.appender.flume = org.apache.flume.clients.log4jappender.Log4jAppender
log4j.appender.flume.Hostname = example.com
log4j.appender.flume.Port = 41414
log4j.appender.flume.UnsafeMode = true

# configure a class's logger to output to the flume appender
log4j.logger.org.example.MyClass = DEBUG,flume
代码语言:javascript复制
log4j.appender.flume = org.apache.flume.clients.log4jappender.Log4jAppender
log4j.appender.flume.Hostname = example.com
log4j.appender.flume.Port = 41414
log4j.appender.flume.AvroReflectionEnabled = true
log4j.appender.flume.AvroSchemaUrl = hdfs://namenode/path/to/schema.avsc

# configure a class's logger to output to the flume appender
log4j.logger.org.example.MyClass = DEBUG,flume

5、Load Balancing Log4J Appender

代码语言:javascript复制
log4j.appender.out2 = org.apache.flume.clients.log4jappender.LoadBalancingLog4jAppender
log4j.appender.out2.Hosts = localhost:25430 localhost:25431

# configure a class's logger to output to the flume appender
log4j.logger.org.example.MyClass = DEBUG,flume
代码语言:javascript复制
log4j.appender.out2 = org.apache.flume.clients.log4jappender.LoadBalancingLog4jAppender
log4j.appender.out2.Hosts = localhost:25430 localhost:25431
log4j.appender.out2.Selector = RANDOM

# configure a class's logger to output to the flume appender
log4j.logger.org.example.MyClass = DEBUG,flume
代码语言:javascript复制
log4j.appender.out2 = org.apache.flume.clients.log4jappender.LoadBalancingLog4jAppender
log4j.appender.out2.Hosts = localhost:25430 localhost:25431 localhost:25432
log4j.appender.out2.Selector = ROUND_ROBIN
log4j.appender.out2.MaxBackoff = 30000

# configure a class's logger to output to the flume appender
log4j.logger.org.example.MyClass = DEBUG,flume

0 人点赞