前言
全局命令
在环境变量中增加如下命令,可以使用 bd
快速切换到 /data/tools/bigdata
cd /etc/profile.d/
vi bd.sh
内容如下
代码语言:javascript复制alias bd='cd /data/tools/bigdata'
配置生效
代码语言:javascript复制source /etc/profile
Flume的安装
下载地址
https://dlcdn.apache.org/flume/1.9.0/
上传至虚拟机,并解压
代码语言:javascript复制tar -zxvf apache-flume-1.9.0-bin.tar.gz -C /data/tools/bigdata
环境变量
创建配置文件
代码语言:javascript复制vi /etc/profile.d/flume.sh
内容设置为
代码语言:javascript复制# FLUME
export FLUME_HOME=/data/tools/bigdata/apache-flume-1.9.0-bin
export PATH=$PATH:$FLUME_HOME/bin
配置生效
代码语言:javascript复制source /etc/profile
查看是否生效
代码语言:javascript复制echo $FLUME_HOME
查看flume版本
代码语言:javascript复制flume-ng version
测试flume
监控一个目录,将数据打印出来
配置文件
代码语言:javascript复制vi $FLUME_HOME/conf/spoolingtest.conf
内容如下
代码语言:javascript复制# a1表示agent的名字 可以自定义
# 给sources(在一个agent里可以定义多个source)取个名字
a1.sources = r1
# 给channel个名字
a1.channels = c1
# 给channel个名字
a1.sinks = k1
# 对source进行配置
# agent的名字.sources.source的名字.参数 = 参数值
# source的类型 spoolDir(监控一个目录下的文件的变化)
a1.sources.r1.type = spooldir
# 监听哪一个目录
a1.sources.r1.spoolDir = /root/data
# 是否在event的headers中保存文件的绝对路径
a1.sources.r1.fileHeader = true
# 给拦截器取个名字 i1
a1.sources.r1.interceptors = i1
# 使用timestamp拦截器,将处理数据的时间保存到event的headers中
a1.sources.r1.interceptors.i1.type = timestamp
# 配置channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# 配置sink为logger
# 直接打印到控制台
a1.sinks.k1.type = logger
# 将source、channel、sink组装成agent
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
新建/root/data目录
代码语言:javascript复制mkdir /root/data
启动agent
代码语言:javascript复制flume-ng agent -n a1 -f $FLUME_HOME/conf/spoolingtest.conf -Dflume.root.logger=DEBUG,console
在/root/data/目录下新建文件,输入内容,观察flume进程打印的日志
随意在test.txt中加入一些内容
代码语言:javascript复制vi /root/data/test.txt
我们会发现它做了如下操作
- 会把文件中的每一行都打印出来
- 打印完成后文件重命名为
test.txt.COMPLETED
flume的使用
系统文件到HDFS
创建配置文件
代码语言:javascript复制vi $FLUME_HOME/conf/spoolingToHDFS.conf
配置文件
代码语言:javascript复制# a表示给agent命名为a
# 给source组件命名为r1
a.sources = r1
# 给sink组件命名为k1
a.sinks = k1
# 给channel组件命名为c1
a.channels = c1
#指定spooldir的属性
a.sources.r1.type = spooldir
a.sources.r1.spoolDir = /root/data
a.sources.r1.fileHeader = true
a.sources.r1.interceptors = i1
a.sources.r1.interceptors.i1.type = timestamp
#指定sink的类型
a.sinks.k1.type = hdfs
a.sinks.k1.hdfs.path = /flume/data/dir1
# 指定文件名前缀
a.sinks.k1.hdfs.filePrefix = student
# 指定达到多少数据量写一次文件 单位:bytes
a.sinks.k1.hdfs.rollSize = 102400
# 指定多少条写一次文件
a.sinks.k1.hdfs.rollCount = 1000
# 指定文件类型为 流 来什么输出什么
a.sinks.k1.hdfs.fileType = DataStream
# 指定文件输出格式 为text
a.sinks.k1.hdfs.writeFormat = text
# 指定文件名后缀
a.sinks.k1.hdfs.fileSuffix = .txt
#指定channel
a.channels.c1.type = memory
a.channels.c1.capacity = 1000
# 表示sink每次会从channel里取多少数据
a.channels.c1.transactionCapacity = 100
# 组装
a.sources.r1.channels = c1
a.sinks.k1.channel = c1
在 /root/data/目录下准备数据
代码语言:javascript复制vi /root/data/study.txt
内容如下
代码语言:javascript复制good good study
day day up
启动agent
如果HDFS没有启动要先启动HDFS
代码语言:javascript复制flume-ng agent -n a -f $FLUME_HOME/conf/spoolingToHDFS.conf -Dflume.root.logger=DEBUG,console
启动HDFS
代码语言:javascript复制$HADOOP_HOME/sbin/start-dfs.sh
停止HDFS
代码语言:javascript复制$HADOOP_HOME/sbin/stop-dfs.sh
可以从网址中查看文件保存情况(192.168.160.130是我的服务器IP)
http://192.168.160.130:50070/explorer.html#/
HBase日志到HDFS
代码语言:javascript复制vi $FLUME_HOME/conf/hbaseLogToHDFS.conf
配置文件
代码语言:javascript复制# a表示给agent命名为a
# 给source组件命名为r1
a.sources = r1
# 给sink组件命名为k1
a.sinks = k1
# 给channel组件命名为c1
a.channels = c1
#指定spooldir的属性
a.sources.r1.type = exec
a.sources.r1.command = tail -f /data/tools/bigdata/hbase-2.4.11/logs/hbase-root-master-master.log
#指定sink的类型
a.sinks.k1.type = hdfs
a.sinks.k1.hdfs.path = /flume/data/dir2
# 指定文件名前缀
a.sinks.k1.hdfs.filePrefix = hbaselog
# 指定达到多少数据量写一次文件 单位:bytes
a.sinks.k1.hdfs.rollSize = 102400
# 指定多少条写一次文件
a.sinks.k1.hdfs.rollCount = 1000
# 指定文件类型为 流 来什么输出什么
a.sinks.k1.hdfs.fileType = DataStream
# 指定文件输出格式 为text
a.sinks.k1.hdfs.writeFormat = text
# 指定文件名后缀
a.sinks.k1.hdfs.fileSuffix = .txt
#指定channel
a.channels.c1.type = memory
a.channels.c1.capacity = 1000
# 表示sink每次会从channel里取多少数据
a.channels.c1.transactionCapacity = 100
# 组装
a.sources.r1.channels = c1
a.sinks.k1.channel = c1
启动
代码语言:javascript复制flume-ng agent -n a -f $FLUME_HOME/conf/hbaseLogToHDFS.conf -Dflume.root.logger=DEBUG,console
HBase日志到HBase
创建配置文件
代码语言:javascript复制vi $FLUME_HOME/conf/hbaselogToHBase.conf
配置文件
代码语言:javascript复制# a表示给agent命名为a
# 给source组件命名为r1
a.sources = r1
# 给sink组件命名为k1
a.sinks = k1
# 给channel组件命名为c1
a.channels = c1
#指定spooldir的属性
a.sources.r1.type = exec
a.sources.r1.command = cat /data/tools/bigdata/hbase-2.4.11/logs/hbase-root-master-master.log
#指定sink的类型
a.sinks.k1.type = hbase
a.sinks.k1.table = t_log
a.sinks.k1.columnFamily = cf1
#指定channel
a.channels.c1.type = memory
a.channels.c1.capacity = 100000
# 表示sink每次会从channel里取多少数据
a.channels.c1.transactionCapacity = 100
# 组装
a.sources.r1.channels = c1
a.sinks.k1.channel = c1
在hbase中创建log表
代码语言:javascript复制create 't_log','cf1'
启动
代码语言:javascript复制flume-ng agent -n a -f $FLUME_HOME/conf/hbaselogToHBase.conf -Dflume.root.logger=DEBUG,console
网络日志获取
创建配置文件
代码语言:javascript复制vi $FLUME_HOME/conf/netcatLogger.conf
配置文件
代码语言:javascript复制# a表示给agent命名为a
# 给source组件命名为r1
a.sources = r1
# 给sink组件命名为k1
a.sinks = k1
# 给channel组件命名为c1
a.channels = c1
#指定spooldir的属性
a.sources.r1.type = netcat
a.sources.r1.bind = 0.0.0.0
a.sources.r1.port = 8888
#指定sink的类型
a.sinks.k1.type = logger
#指定channel
a.channels.c1.type = memory
a.channels.c1.capacity = 1000
# 表示sink每次会从channel里取多少数据
a.channels.c1.transactionCapacity = 100
# 组装
a.sources.r1.channels = c1
a.sinks.k1.channel = c1
启动
先启动agent
代码语言:javascript复制flume-ng agent -n a -f $FLUME_HOME/conf/netcatLogger.conf -Dflume.root.logger=DEBUG,console
新打开一个控制台
安装telnet
代码语言:javascript复制yum install telnet
再启动telnet
代码语言:javascript复制telnet localhost 8888
网络请求日志
创建配置文件
代码语言:javascript复制vi $FLUME_HOME/conf/httpToLogger.conf
配置文件
代码语言:javascript复制# a表示给agent命名为a
# 给source组件命名为r1
a.sources = r1
# 给sink组件命名为k1
a.sinks = k1
# 给channel组件命名为c1
a.channels = c1
#指定spooldir的属性
a.sources.r1.type = http
a.sources.r1.port = 6666
#指定sink的类型
a.sinks.k1.type = logger
#指定channel
a.channels.c1.type = memory
a.channels.c1.capacity = 1000
# 表示sink每次会从channel里取多少数据
a.channels.c1.transactionCapacity = 100
# 组装
a.sources.r1.channels = c1
a.sinks.k1.channel = c1
先启动agent
代码语言:javascript复制flume-ng agent -n a -f $FLUME_HOME/conf/httpToLogger.conf -Dflume.root.logger=DEBUG,console
再使用curl发起一个http请求
新打开一个控制台
代码语言:javascript复制curl -X POST -d '[{ "headers" :{"a" : "a1","b" : "b1"},"body" : "hello~http~flume~"}]' http://localhost:6666