采集文件到HDFS

2023-02-25 15:18:01 浏览数 (1)

采集需求:比如业务系统使用log4j生成的日志,日志内容不断增加,需要把追加到日志文件中的数据实时采集到hdfs

根据需求,首先定义以下3大要素 ● 采集源,即source——监控文件内容更新 : exec ‘tail -F file’ 即时输出文件变化后追加的数据。 tail -f file 动态跟踪文件file的增长情况,tail会每隔一秒去检查一下文件是否增加新的内容。如果增加就追加在原来的输出后面显示。但这种情况,必须保证在执行tail命令时,文件已经存在。 ● 下沉目标,即sink——HDFS文件系统 : hdfs sink ● Source和sink之间的传递通道——channel,可用file channel 也可以用 内存channel

配置文件编写:

代码语言:javascript复制
agent1.sources = source1
agent1.sinks = sink1
agent1.channels = channel1

# Describe/configure tail -F source1
agent1.sources.source1.type = exec
agent1.sources.source1.command = tail -F /root/input/input_log
agent1.sources.source1.channels = channel1

#configure host for source
agent1.sources.source1.interceptors = i1
agent1.sources.source1.interceptors.i1.type = host
agent1.sources.source1.interceptors.i1.hostHeader = hostname

# Describe sink1
agent1.sinks.sink1.type = hdfs
#a1.sinks.k1.channel = c1
agent1.sinks.sink1.hdfs.path =hdfs://hq555/flume/%y-%m-%d/
agent1.sinks.sink1.hdfs.filePrefix = access_log
agent1.sinks.sink1.hdfs.maxOpenFiles = 5000
agent1.sinks.sink1.hdfs.batchSize= 100
agent1.sinks.sink1.hdfs.fileType = DataStream
agent1.sinks.sink1.hdfs.writeFormat =Text
agent1.sinks.sink1.hdfs.rollSize = 102400
agent1.sinks.sink1.hdfs.rollCount = 1000000
agent1.sinks.sink1.hdfs.rollInterval = 60
#agent1.sinks.sink1.hdfs.round = true
#agent1.sinks.sink1.hdfs.roundValue = 10
#agent1.sinks.sink1.hdfs.roundUnit = minute
agent1.sinks.sink1.hdfs.useLocalTimeStamp = true

# Use a channel which buffers events in memory
agent1.channels.channel1.type = memory
agent1.channels.channel1.keep-alive = 120
agent1.channels.channel1.capacity = 500000
agent1.channels.channel1.transactionCapacity = 600

# Bind the source and sink to the channel
agent1.sources.source1.channels = channel1
agent1.sinks.sink1.channel = channel1    

开启

代码语言:javascript复制
bin/flume-ng agent -c conf -f conf/exec_log.conf -n a1  -Dflume.root.logger=INFO,console

补充说明

rollSize

代码语言:javascript复制
默认值:1024,当临时文件达到该大小(单位:bytes)时,滚动成目标文件。如果设置成0,则表示不根据临时文件大小来滚动文件。

rollCount

代码语言:javascript复制
默认值:10,当events数据达到该数量时候,将临时文件滚动成目标文件,如果设置成0,则表示不根据events数据来滚动文件。

round

代码语言:javascript复制
默认值:false,是否启用时间上的”舍弃”,类似于”四舍五入”,如果启用,则会影响除了%t的其他所有时间表达式;

roundValue

代码语言:javascript复制
默认值:1,时间上进行“舍弃”的值;

roundUnit

代码语言:javascript复制
默认值:seconds,时间上进行”舍弃”的单位,包含:second,minute,hour

0 人点赞