写在前面: 博主是一名大数据的初学者,昵称来源于《爱丽丝梦游仙境》中的Alice和自己的昵称。作为一名互联网小白,
写博客一方面是为了记录自己的学习历程,一方面是希望能够帮助到很多和自己一样处于起步阶段的萌新
。由于水平有限,博客中难免会有一些错误,有纰漏之处恳请各位大佬不吝赐教!个人小站:http://alices.ibilibili.xyz/ , 博客主页:https://alice.blog.csdn.net/ 尽管当前水平可能不及各位大佬,但我还是希望自己能够做得更好,因为一天的生活就是一生的缩影
。我希望在最美的年华,做最好的自己
!
在差不多一年前,菌刚接触Flume那会,写了一篇关于Flume的博客。今天无意间翻到,才发现当时介绍的内容是多么的浅显,于是菌打算再为大家介绍如何在Flume中实现过滤器的操作。
码字不易,先赞后看!
Flume过滤器
1、案例场景
A、B两台日志服务机器实时生产日志主要类型为 access.log
、nginx.log
、web.log
现在要求:
把A、B 机器中的access.log
、nginx.log
、web.log
采集汇总到C机器上然后统一收集到hdfs中。
但是在hdfs中要求的目录为:
/source/logs/access/20180101/** /source/logs/nginx/20180101/** /source/logs/web/20180101/**
2、场景分析
3、数据流程处理分析
4、实现
服务器A对应的IP为 192.168.100.100 服务器B对应的IP为 192.168.100.110 服务器C对应的IP为 192.168.100.120
采集端配置文件开发
node01与node02服务器开发flume的配置文件
代码语言:javascript复制[root@node01 ~]# cd /export/servers/apache-flume-1.8.0-bin/conf
[root@node01 conf]# vim exec_source_avro_sink.conf
代码语言:javascript复制a1.sources = r1 r2 r3
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /export/taillogs/access.log
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = static
## static拦截器的功能就是往采集到的数据的header中插入自己定义的key-value对
a1.sources.r1.interceptors.i1.key = type
a1.sources.r1.interceptors.i1.value = access
a1.sources.r2.type = exec
a1.sources.r2.command = tail -F /export/taillogs/nginx.log
a1.sources.r2.interceptors = i2
a1.sources.r2.interceptors.i2.type = static
a1.sources.r2.interceptors.i2.key = type
a1.sources.r2.interceptors.i2.value = nginx
a1.sources.r3.type = exec
a1.sources.r3.command = tail -F /export/taillogs/web.log
a1.sources.r3.interceptors = i3
a1.sources.r3.interceptors.i3.type = static
a1.sources.r3.interceptors.i3.key = type
a1.sources.r3.interceptors.i3.value = web
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 20000
a1.channels.c1.transactionCapacity = 10000
# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = node03
a1.sinks.k1.port = 41414
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sources.r2.channels = c1
a1.sources.r3.channels = c1
a1.sinks.k1.channel = c1
注:
定义拦截器的类型 a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = static
需要注意的是:static拦截器的功能就是往采集到的数据的header中插入自己定义的key-value键值对
例如:
a1.sources.r1.interceptors.i1.key = type a1.sources.r1.interceptors.i1.value = access
服务端配置文件开发
在node03上面开发flume配置文件
代码语言:javascript复制[root@node03 ~]# cd /export/servers/apache-flume-1.8.0-bin/conf
[root@node03 conf]# vim avro_source_hdfs_sink.conf
代码语言:javascript复制a1.sources = r1
a1.sinks = k1
a1.channels = c1
#定义source
a1.sources.r1.type = avro
a1.sources.r1.bind = 192.168.100.120
a1.sources.r1.port =41414
#添加时间拦截器
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = org.apache.flume.interceptor.TimestampInterceptor$Builder
#定义channels
a1.channels.c1.type = memory
a1.channels.c1.capacity = 20000
a1.channels.c1.transactionCapacity = 10000
#定义sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path=hdfs://192.168.100.100:8020/source/logs/%{type}/%Y%m%d
#组装source、channel、sink
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
采集端文件生成脚本
为了方便观察采集的结果,我们分别在node01和node02上开发shell脚本,模拟数据生成。
代码语言:javascript复制[root@node01 servers]# cd /export/shells/
[root@node01 shells]# vim server.sh
代码语言:javascript复制#!/bin/bash
while true
do
date >> /export/taillogs/access.log;
date >> /export/taillogs/web.log;
date >> /export/taillogs/nginx.log;
sleep 0.5;
done
顺序启动服务
node03启动flume实现数据收集
代码语言:javascript复制[root@node03 conf]# cd /export/servers/apache-flume-1.8.0-bin
[root@node03 apache-flume-1.8.0-bin]# bin/flume-ng agent -c conf -f conf/avro_source_hdfs_sink.conf -name a1 -Dflume.root.logger=DEBUG,console
node01与node02启动flume实现数据监控
代码语言:javascript复制[root@node01 shells]# cd /export/servers/apache-flume-1.8.0-bin
[root@node01 apache-flume-1.8.0-bin]# bin/flume-ng agent -c conf -f conf/exec_source_avro_sink.conf -name a1 -Dflume.root.logger=DEBUG,console
node01 与 node02 启动生成文件脚本
代码语言:javascript复制cd /export/shells
sh server.sh
5、效果实现截图
查看hdfs指定的输出路径下的目录构成
查看某个目录下的文件构成
可以发现随着我们shell脚本的启动,数据被不断的追加到指定的监控文件中,node01和node02在检测到变化之后,将变化的内容在node03进行汇总,然后node03根据定义的不同生产日志类型,对于进行“过滤”输出到HDFS的不同目录下。
小结
本篇博客作者简单为大家介绍了一种Flume拦截器的使用示例,如果有看不太懂的小伙伴建议与这篇文章一起食用。当然,都看过的小伙伴也许也会感觉还是少了些内容,例如Flume的自定义监控,选择器,Sink组,还有Flume的web端监控——Ganglia等等…大家Duck不必担心,接下来的几篇博客,博主将详细介绍这些关于Flume的“干货”,敬请期待!!!
受益的朋友或对大数据技术感兴趣的伙伴记得点赞关注支持一波?
希望我们都能在学习的道路上越走越远?