基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(二)数据源

2023-10-17 08:51:00 浏览数 (2)

04:数据源

目标了解数据源的格式及实现模拟数据的生成

路径

  • step1:数据格式
  • step2:数据生成

实施

数据格式

消息时间

发件人昵称

发件人账号

发件人性别

发件人IP

发件人系统

发件人手机型号

发件人网络制式

发件人GPS

收件人昵称

收件人IP

收件人账号

收件人系统

收件人手机型号

收件人网络制式

收件人GPS

收件人性别

消息类型

双方距离

消息

msg_time

sender_nickyname

sender_account

sender_sex

sender_ip

sender_os

sender_phone_type

sender_network

sender_gps

receiver_nickyname

receiver_ip

receiver_account

receiver_os

receiver_phone_type

receiver_network

receiver_gps

receiver_sex

msg_type

distance

message

2020/05/08 15:11:33

古博易

14747877194

48.147.134.255

Android 8.0

小米 Redmi K30

4G

94.704577,36.247553

莱优

97.61.25.52

17832829395

IOS 10.0

Apple iPhone 10

4G

84.034145,41.423804

TEXT

77.82KM

天涯海角惆怅渡,牛郎织女隔天河。佛祖座前长顿首,只求共度一百年。

数据生成

创建原始文件目录

代码语言:javascript复制
mkdir /export/data/momo_init

上传模拟数据程序

代码语言:javascript复制
cd /export/data/momo_init
rz

创建模拟数据目录

代码语言:javascript复制
mkdir /export/data/momo_data

运行程序生成数据

语法

代码语言:javascript复制
java -jar /export/data/momo_init/MoMo_DataGen.jar 原始数据路径 模拟数据路径 随机产生数据间隔ms时间

测试:每500ms生成一条数据

代码语言:javascript复制
java -jar /export/data/momo_init/MoMo_DataGen.jar 
/export/data/momo_init/MoMo_Data.xlsx 
/export/data/momo_data/ 
500

结果:生成模拟数据文件MOMO_DATA.dat,并且每条数据中字段分隔符为01

小结

  • 了解数据源的格式及实现模拟数据的生成

05:技术架构及技术选型

  • 目标掌握实时案例的技术架构及技术选型
  • 路径
    • step1:需求分析
    • step2:技术选型
    • step3:技术架构
  • 实施
    • 需求分析
      • 离线存储计算
        • 提供离线T 1的统计分析
        • 提供离线数据的即时查询
      • 实时存储计算
        • 提供实时统计分析
    • 技术选型
      • 离线
        • 数据采集:Flume
        • 离线存储:Hbase
        • 离线分析:Hive:复杂计算
        • 即时查询:Phoenix:高效查询
      • 实时
        • 数据采集:Flume
        • 实时存储:Kafka
        • 实时计算:Flink
        • 实时应用:MySQL FineBI 或者 Redis JavaWeb可视化
    • 技术架构
    • 为什么不直接将Flume的数据给Hbase,而统一的给了Kafka,再由Kafka到Hbase?
      • 避免高并发写导致机器负载过高、实现架构解耦、实现异步高效
      • 保证数据一致性
  • 小结
    • 掌握实时案例的技术架构及技术选型

06:Flume的回顾及安装

目标回顾Flume基本使用及实现Flume的安装测试

路径

  • step1:Flume回顾
  • step2:Flume的安装
  • step3:Flume的测试

实施

Flume的回顾

  • 功能:实时对文件或者网络端口进行数据流监听采集
  • 场景:文件实时采集
  • 开发
    • step1:先开发一个配置文件:properties【K=V】
    • step2:运行这个文件即可
  • 组成
    • Agent:一个Agent就是一个Flume程序
    • Source:负责监听数据源,将数据源的动态数据变成每一条Event数据,将Event数据流放入Channel
    • Channel:负责临时存储Source发送过来的数据,供Sink来取数据
    • Sink:负责从Channel拉取数据写入目标地
    • Event:代表一条数据对象
      • head:Map集合[KV]
      • body:byte[]

Flume的安装

上传安装包

代码语言:javascript复制
cd /export/software/
rz

解压安装

代码语言:javascript复制
tar -zxvf apache-flume-1.9.0-bin.tar.gz -C /export/server/
cd /export/server
mv apache-flume-1.9.0-bin flume-1.9.0-bin

修改配置

代码语言:javascript复制
#集成HDFS,拷贝HDFS配置文件
cd /export/server/flume-1.9.0-bin
cp /export/server/hadoop/etc/hadoop/core-site.xml  ./conf/
#修改Flume环境变量
cd /export/server/flume-1.9.0-bin/conf/
mv flume-env.sh.template flume-env.sh
vim flume-env.sh 
代码语言:javascript复制
#修改22行
export JAVA_HOME=/export/server/jdk1.8.0_65
#修改34行
export HADOOP_HOME=/export/server/hadoop-3.3.0

删除Flume自带的guava包,替换成Hadoop的

代码语言:javascript复制
cd /export/server/flume-1.9.0-bin 
rm -rf lib/guava-11.0.2.jar
cp /export/server/hadoop/share/hadoop/common/lib/guava-27.0-jre.jar lib/

创建目录

代码语言:javascript复制
cd /export/server/flume-1.9.0-bin
#程序配置文件存储目录
mkdir usercase
#Taildir元数据存储目录
mkdir position

Flume的测试

需求:采集聊天数据,写入HDFS

分析

  • Source:taildir:动态监听多个文件实现实时数据采集
  • Channel:mem:将数据缓存在内存
  • Sink:hdfs

开发

代码语言:javascript复制
vim /export/server/flume-1.9.0-bin/usercase/momo_mem_hdfs.properties
代码语言:javascript复制
# define a1
a1.sources = s1 
a1.channels = c1
a1.sinks = k1

#define s1
a1.sources.s1.type = TAILDIR
#指定一个元数据记录文件
a1.sources.s1.positionFile = /export/server/flume-1.9.0-bin/position/taildir_momo_hdfs.json
#将所有需要监控的数据源变成一个组
a1.sources.s1.filegroups = f1
#指定了f1是谁:监控目录下所有文件
a1.sources.s1.filegroups.f1 = /export/data/momo_data/.*
#指定f1采集到的数据的header中包含一个KV对
a1.sources.s1.headers.f1.type = momo
a1.sources.s1.fileHeader = true

#define c1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 1000

#define k1
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /flume/momo/test/daystr=%Y-%m-%d
a1.sinks.k1.hdfs.fileType = DataStream
#指定按照时间生成文件,一般关闭
a1.sinks.k1.hdfs.rollInterval = 0
#指定文件大小生成文件,一般120 ~ 125M对应的字节数
a1.sinks.k1.hdfs.rollSize = 102400
#指定event个数生成文件,一般关闭
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.filePrefix = momo
a1.sinks.k1.hdfs.fileSuffix = .log
a1.sinks.k1.hdfs.useLocalTimeStamp = true

#bound
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1

启动HDFS

代码语言:javascript复制
start-dfs.sh

运行Flume

代码语言:javascript复制
cd /export/server/flume-1.9.0-bin
bin/flume-ng agent -c conf/ -n a1 -f usercase/momo_mem_hdfs.properties -Dflume.root.logger=INFO,console

运行模拟数据

代码语言:javascript复制
java -jar /export/data/momo_init/MoMo_DataGen.jar 
/export/data/momo_init/MoMo_Data.xlsx 
/export/data/momo_data/ 
100

查看结果

小结

  • 回顾Flume基本使用及实现Flume的安装测试

07:Flume采集程序开发

目标实现案例Flume采集程序的开发

路径

  • step1:需求分析
  • step2:程序开发
  • step3:测试实现

实施

需求分析

需求:采集聊天数据,实时写入Kafka

Source:taildir

Channel:mem

Sink:Kafka sink

代码语言:javascript复制
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers = node1:9092,node2:9092,node3:9092
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.topic = mytopic
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.linger.ms = 1
a1.sinks.k1.kafka.producer.compression.type = snappy

程序开发

代码语言:javascript复制
vim /export/server/flume-1.9.0-bin/usercase/momo_mem_kafka.properties
代码语言:javascript复制
# define a1
a1.sources = s1 
a1.channels = c1
a1.sinks = k1

#define s1
a1.sources.s1.type = TAILDIR
#指定一个元数据记录文件
a1.sources.s1.positionFile = /export/server/flume-1.9.0-bin/position/taildir_momo_kafka.json
#将所有需要监控的数据源变成一个组
a1.sources.s1.filegroups = f1
#指定了f1是谁:监控目录下所有文件
a1.sources.s1.filegroups.f1 = /export/data/momo_data/.*
#指定f1采集到的数据的header中包含一个KV对
a1.sources.s1.headers.f1.type = momo
a1.sources.s1.fileHeader = true

#define c1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 1000

#define k1
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = MOMO_MSG
a1.sinks.k1.kafka.bootstrap.servers = node1:9092,node2:9092,node3:9092
a1.sinks.k1.kafka.flumeBatchSize = 10
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 100

#bound
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1

测试实现

启动Kafka

代码语言:javascript复制
start-zk-all.sh
start-kafka.sh 

创建Topic

代码语言:javascript复制
kafka-topics.sh --create --topic MOMO_MSG  --partitions 3 --replication-factor 2 --bootstrap-server node1:9092,node2:9092,node3:9092

列举

代码语言:javascript复制
kafka-topics.sh --list --bootstrap-server node1:9092,node2:9092,node3:9092

启动消费者

代码语言:javascript复制
kafka-console-consumer.sh --topic MOMO_MSG --bootstrap-server node1:9092,node2:9092,node3:9092

启动Flume程序

代码语言:javascript复制
cd /export/server/flume-1.9.0-bin
bin/flume-ng agent -c conf/ -n a1 -f usercase/momo_mem_kafka.properties -Dflume.root.logger=INFO,console

启动模拟数据

代码语言:javascript复制
java -jar /export/data/momo_init/MoMo_DataGen.jar 
/export/data/momo_init/MoMo_Data.xlsx 
/export/data/momo_data/ 
50

观察结果

小结

  • 实现案例Flume采集程序的开发

0 人点赞