数据仓库实战 2

2020-12-18 11:46:26 浏览数 (1)

我们接着来看数据采集模块

Flume采集日志数据到Kafka

首先我们需要用Flume采集日志数据到Kafka

配置

日志采集我们采用的是flume,比较传统成熟的日志采集项目。

首先我们从实时生成的日志文件通过flume采集到kafka中。log日志的格式是app-yyyy-mm-dd.log

CDH7.1.1中移除了flume组件,代替的是Nifi。我们直接将flume下载到节点中,具体的配置如下。

在/data0/apache-flume-1.9.0-bin/conf目录下创建file-flume-kafka.conf文件

代码语言:javascript复制
[root@cdh3 conf]# cat file-flume-kafka.conf 
a1.sources=r1
a1.channels=c1 c2

# configure source
a1.sources.r1.type = TAILDIR
a1.sources.r1.positionFile = /data0/apache-flume-1.9.0-bin/test/log_position.json
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /tmp/logs/app. 
a1.sources.r1.fileHeader = true
a1.sources.r1.channels = c1 c2

#interceptor
a1.sources.r1.interceptors =  i1 i2
a1.sources.r1.interceptors.i1.type = com.soundhearer.flume.interceptor.LogETLInterceptor$Builder
a1.sources.r1.interceptors.i2.type = com.soundhearer.flume.interceptor.LogTypeInterceptor$Builder

a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = topic
a1.sources.r1.selector.mapping.topic_start = c1
a1.sources.r1.selector.mapping.topic_event = c2

# configure channel
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = cdh1.macro.com:9092,cdh2.macro.com:9092,cdh2.macro.com:9092
a1.channels.c1.kafka.topic = topic_start
a1.channels.c1.parseAsFlumeEvent = false
a1.channels.c1.kafka.consumer.group.id = flume-consumer

a1.channels.c2.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c2.kafka.bootstrap.servers = cdh1.macro.com:9092,cdh2.macro.com:9092,cdh2.macro.com:9092
a1.channels.c2.kafka.topic = topic_event
a1.channels.c2.parseAsFlumeEvent = false
a1.channels.c2.kafka.consumer.group.id = flume-consumer

com.soundhearer.flume.interceptor.LogETLInterceptor和com.soundhearer.flume.interceptor.LogTypeInterceptor是自定义的拦截器的全类名。需要根据用户自定义的拦截器做相应修改。

Flume的ETL和分类型拦截器

本项目中自定义了两个拦截器,分别是:ETL拦截器、日志类型区分拦截器。

ETL拦截器主要用于,过滤时间戳不合法和Json数据不完整的日志

日志类型区分拦截器主要用于,将启动日志和事件日志区分开来,方便发往Kafka的不同Topic。

具体的代码,在我的github上。

Kafka创建topic

我们通过命令行创建两个topic

代码语言:javascript复制
kafka-topics --zookeeper cdh1.macro.com:2181,cdh2.macro.com:2181,cdh3.macro.com:2181/kafka --create --topic topic_start --partitions 3 --replication-factor 2

查看topic

代码语言:javascript复制
[root@cdh3 conf]# kafka-topics --zookeeper cdh1.macro.com:2181,cdh2.macro.com:2181,cdh3.macro.com:2181/kafka --list
20/11/24 18:41:34 INFO zookeeper.ClientCnxn: Session establishment complete on server cdh2.macro.com/192.168.0.207:2181, sessionid = 0x1007949b99a034d, negotiated timeout = 30000
20/11/24 18:41:34 INFO zookeeper.ZooKeeperClient: [ZooKeeperClient] Connected.
ATLAS_ENTITIES
ATLAS_HOOK
ATLAS_SPARK_HOOK
__consumer_offsets
fill
topic_event
topic_start
20/11/24 18:41:34 INFO zookeeper.ZooKeeperClient: [ZooKeeperClient] Closing.
20/11/24 18:41:34 INFO zookeeper.ZooKeeper: Session: 0x1007949b99a034d closed
20/11/24 18:41:34 INFO zookeeper.ClientCnxn: EventThread shut down for session: 0x1007949b99a034d
20/11/24 18:41:34 INFO zookeeper.ZooKeeperClient: [ZooKeeperClient] Closed.

kafka机器数量计算

Kafka机器数量(经验公式)=2(峰值生产速度副本数/100) 1

先拿到峰值生产速度,再根据设定的副本数,就能预估出需要部署Kafka的数量。

比如我们的峰值生产速度是50M/s。副本数为2。

Kafka机器数量=2(502/100) 1=3台

启动Flume采集

在/data0/apache-flume-1.9.0-bin/bin目录下执行如下命令

代码语言:javascript复制
nohup flume-ng agent --name a1 --conf-file ../conf/file-flume-kafka.conf &

消费kafka topic数据,发现已经有数据了

代码语言:javascript复制
kafka-console-consumer --bootstrap-server cdh1.macro.com:9092,cdh2.macro.com:9092,cdh3.macro.com:9092 --from-beginning --topic topic_start

kafka-console-consumer --bootstrap-server cdh1.macro.com:9092,cdh2.macro.com:9092,cdh3.macro.com:9092 --from-beginning --topic topic_event

1606152976690|{"cm":{"ln":"-90.5","sv":"V2.0.7","os":"8.2.8","g":"ZPWDFI86@gmail.com","mid":"991","nw":"3G","l":"en","vc":"5","hw":"640*960","ar":"MX","uid":"991","t":"1606064013535","la":"-40.9","md":"Huawei-1","vn":"1.3.6","ba":"Huawei","sr":"Q"},"ap":"app","et":[{"ett":"1606139735962","en":"display","kv":{"goodsid":"244","action":"2","extend1":"1","place":"5","category":"81"}},{"ett":"1606060625200","en":"newsdetail","kv":{"entry":"1","goodsid":"245","news_staytime":"18","loading_time":"0","action":"1","showtype":"3","category":"23","type1":"102"}},{"ett":"1606148719063","en":"loading","kv":{"extend2":"","loading_time":"45","action":"2","extend1":"","type":"2","type1":"102","loading_way":"1"}},{"ett":"1606112496011","en":"comment","kv":{"p_comment_id":1,"addtime":"1606069010840","praise_count":692,"other_id":0,"comment_id":1,"reply_count":58,"userid":5,"content":"爹钧异"}},{"ett":"1606138524102","en":"favorites","kv":{"course_id":8,"id":0,"add_time":"1606078090460","userid":2}}]}
1606152976691|{"cm":{"ln":"-58.1","sv":"V2.6.0","os":"8.0.4","g":"R2Q998F1@gmail.com","mid":"995","nw":"3G","l":"en","vc":"2","hw":"640*960","ar":"MX","uid":"995","t":"1606111827871","la":"6.4","md":"Huawei-17","vn":"1.0.5","ba":"Huawei","sr":"I"},"ap":"app","et":[{"ett":"1606129460089","en":"newsdetail","kv":{"entry":"1","goodsid":"245","news_staytime":"42","loading_time":"0","action":"1","showtype":"5","category":"79","type1":"201"}},{"ett":"1606100900686","en":"ad","kv":{"entry":"3","show_style":"3","action":"4","detail":"102","source":"1","behavior":"1","content":"2","newstype":"8"}},{"ett":"1606098687596","en":"active_foreground","kv":{"access":"","push_id":"3"}},{"ett":"1606067052812","en":"active_background","kv":{"active_source":"3"}},{"ett":"1606068620224","en":"error","kv":{"errorDetail":"java.lang.NullPointerException\n    at cn.lift.appIn.web.AbstractBaseController.validInbound(AbstractBaseController.java:72)\n at cn.lift.dfdf.web.AbstractBaseController.validInbound","errorBrief":"at cn.lift.dfdf.web.AbstractBaseController.validInbound(AbstractBaseController.java:72)"}},{"ett":"1606076123601","en":"favorites","kv":{"course_id":6,"id":0,"add_time":"1606133566208","userid":2}}]}

Flume消费Kafka数据到HDFS

接着我们通过flume消费kafka数据到HDFS

配置

在cdh2节点部署另一个Flume,在/data0/apache-flume-1.9.0-bin/conf目录下创建kafka-flume-hdfs.conf文件

代码语言:javascript复制
[root@cdh2 conf]# cat kafka-flume-hdfs.conf 
## 组件
a1.sources=r1 r2
a1.channels=c1 c2
a1.sinks=k1 k2

## source1
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 5000
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = cdh1.macro.com:9092,cdh2.macro.com:9092,cdh2.macro.com:9092
a1.sources.r1.kafka.topics=topic_start

## source2
a1.sources.r2.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r2.batchSize = 5000
a1.sources.r2.batchDurationMillis = 2000
a1.sources.r2.kafka.bootstrap.servers = cdh1.macro.com:9092,cdh2.macro.com:9092,cdh2.macro.com:9092
a1.sources.r2.kafka.topics=topic_event

## channel1
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /data0/apache-flume-1.9.0-bin/checkpoint/behavior1
a1.channels.c1.dataDirs = /data0/apache-flume-1.9.0-bin/data/behavior1/
a1.channels.c1.maxFileSize = 2146435071
a1.channels.c1.capacity = 1000000
a1.channels.c1.keep-alive = 6

## channel2
a1.channels.c2.type = file
a1.channels.c2.checkpointDir = /data0/apache-flume-1.9.0-bin/checkpoint/behavior2
a1.channels.c2.dataDirs = /data0/apache-flume-1.9.0-bin/data/behavior2/
a1.channels.c2.maxFileSize = 2146435071
a1.channels.c2.capacity = 1000000
a1.channels.c2.keep-alive = 6

## sink1
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /origin_data/gmall/log/topic_start/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = logstart-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = second

##sink2
a1.sinks.k2.type = hdfs
a1.sinks.k2.hdfs.path = /origin_data/gmall/log/topic_event/%Y-%m-%d
a1.sinks.k2.hdfs.filePrefix = logevent-
a1.sinks.k2.hdfs.round = true
a1.sinks.k2.hdfs.roundValue = 10
a1.sinks.k2.hdfs.roundUnit = second

## 不要产生大量小文件
a1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0

a1.sinks.k2.hdfs.rollInterval = 10
a1.sinks.k2.hdfs.rollSize = 134217728
a1.sinks.k2.hdfs.rollCount = 0

## 控制输出文件是原生文件。
a1.sinks.k1.hdfs.fileType = CompressedStream 
a1.sinks.k2.hdfs.fileType = CompressedStream 

a1.sinks.k1.hdfs.codeC = lzop
a1.sinks.k2.hdfs.codeC = lzop

## 拼装
a1.sources.r1.channels = c1
a1.sinks.k1.channel= c1

a1.sources.r2.channels = c2
a1.sinks.k2.channel= c2

启动flume消费kafka

在HDFS中创建origin_data目录

代码语言:javascript复制
hadoop fs -mkdir /origin_data

在/data0/apache-flume-1.9.0-bin/bin目录下执行如下命令

代码语言:javascript复制
nohup flume-ng agent --name a1 --conf-file ../conf/kafka-flume-hdfs.conf &

可以看到HDFS origin_data目录下已经生成了数据,flume成功地消费kafka数据到HDFS中了。

代码语言:javascript复制
[root@cdh2 bin]# hadoop fs -ls /origin_data/gmall/log
Found 2 items
drwxr-xr-x   - hive hive          0 2020-11-24 02:41 /origin_data/gmall/log/topic_event
drwxr-xr-x   - hive hive          0 2020-11-24 10:19 /origin_data/gmall/log/topic_start
[root@cdh2 bin]# hadoop fs -ls /origin_data/gmall/log/topic_event
Found 1 items
drwxr-xr-x   - hive hive          0 2020-11-24 02:41 /origin_data/gmall/log/topic_event/2020-11-24

0 人点赞