大家好,又见面了,我是你们的朋友全栈君。
1.项目框架
======================程序需要一步一步的调试=====================
一:第一步,KafkaSpout与驱动类
1.此时启动的服务有
2.主驱动类
代码语言:javascript复制 1 package com.jun.it2;
2
3 import backtype.storm.Config;
4 import backtype.storm.LocalCluster;
5 import backtype.storm.StormSubmitter;
6 import backtype.storm.generated.AlreadyAliveException;
7 import backtype.storm.generated.InvalidTopologyException;
8 import backtype.storm.generated.StormTopology;
9 import backtype.storm.spout.SchemeAsMultiScheme;
10 import backtype.storm.topology.IRichSpout;
11 import backtype.storm.topology.TopologyBuilder;
12 import storm.kafka.*;
13
14 import java.util.UUID;
15
16 public class WebLogStatictis {
17 /**
18 * 主函数
19 * @param args
20 */
21 public static void main(String[] args) {
22 WebLogStatictis webLogStatictis=new WebLogStatictis();
23 StormTopology stormTopology=webLogStatictis.createTopology();
24 Config config=new Config();
25 //集群或者本地
26 //conf.setNumAckers(4);
27 if(args == null || args.length == 0){
28 // 本地执行
29 LocalCluster localCluster = new LocalCluster();
30 localCluster.submitTopology("webloganalyse", config , stormTopology);
31 }else{
32 // 提交到集群上执行
33 config.setNumWorkers(4); // 指定使用多少个进程来执行该Topology
34 try {
35 StormSubmitter.submitTopology(args[0],config, stormTopology);
36 } catch (AlreadyAliveException e) {
37 e.printStackTrace();
38 } catch (InvalidTopologyException e) {
39 e.printStackTrace();
40 }
41 }
42
43 }
44 /**
45 * 构造一个kafkaspout
46 * @return
47 */
48 private IRichSpout generateSpout(){
49 BrokerHosts hosts = new ZkHosts("linux-hadoop01.ibeifeng.com:2181");
50 String topic = "nginxlog";
51 String zkRoot = "/" topic;
52 String id = UUID.randomUUID().toString();
53 SpoutConfig spoutConf = new SpoutConfig(hosts,topic,zkRoot,id);
54 spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme()); // 按字符串解析
55 spoutConf.forceFromStart = true;
56 KafkaSpout kafkaSpout = new KafkaSpout(spoutConf);
57 return kafkaSpout;
58 }
59
60 public StormTopology createTopology() {
61 TopologyBuilder topologyBuilder=new TopologyBuilder();
62 //指定Spout
63 topologyBuilder.setSpout(WebLogConstants.KAFKA_SPOUT_ID,generateSpout());
64 //
65 topologyBuilder.setBolt(WebLogConstants.WEB_LOG_PARSER_BOLT,new WebLogParserBolt()).shuffleGrouping(WebLogConstants.KAFKA_SPOUT_ID);
66
67 return topologyBuilder.createTopology();
68 }
69
70 }
3.WebLogParserBolt
这个主要的是打印Kafka的Spout发送的数据是否正确。
代码语言:javascript复制 1 package com.jun.it2;
2
3 import backtype.storm.task.OutputCollector;
4 import backtype.storm.task.TopologyContext;
5 import backtype.storm.topology.IRichBolt;
6 import backtype.storm.topology.OutputFieldsDeclarer;
7 import backtype.storm.tuple.Tuple;
8
9 import java.util.Map;
10
11 public class WebLogParserBolt implements IRichBolt {
12 @Override
13 public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
14
15 }
16
17 @Override
18 public void execute(Tuple tuple) {
19 String webLog=tuple.getStringByField("str");
20 System.out.println(webLog);
21 }
22
23 @Override
24 public void cleanup() {
25
26 }
27
28 @Override
29 public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
30
31 }
32
33 @Override
34 public Map<String, Object> getComponentConfiguration() {
35 return null;
36 }
37 }
4.运行Main
先消费在Topic中的数据。
5.运行kafka的生产者
bin/kafka-console-producer.sh –topic nginxlog –broker-list linux-hadoop01.ibeifeng.com:9092
6.拷贝数据到kafka生产者控制台
7.Main下面控制台的程序
二:第二步,解析Log
1.WebLogParserBolt
如果要是验证,就删除两个部分,打开一个注释:
删掉分流
删掉发射
打开打印的注释。
2.效果
这个只要启动Main函数就可以验证。
3.WebLogParserBolt
代码语言:javascript复制 1 package com.jun.it2;
2
3 import backtype.storm.task.OutputCollector;
4 import backtype.storm.task.TopologyContext;
5 import backtype.storm.topology.IRichBolt;
6 import backtype.storm.topology.OutputFieldsDeclarer;
7 import backtype.storm.tuple.Fields;
8 import backtype.storm.tuple.Tuple;
9 import backtype.storm.tuple.Values;
10
11 import java.text.DateFormat;
12 import java.text.SimpleDateFormat;
13 import java.util.Date;
14 import java.util.Map;
15 import java.util.regex.Matcher;
16 import java.util.regex.Pattern;
17
18 import static com.jun.it2.WebLogConstants.*;
19
20 public class WebLogParserBolt implements IRichBolt {
21 private Pattern pattern;
22
23 private OutputCollector outputCollector;
24 @Override
25 public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
26 pattern = Pattern.compile("([^ ]*) [^ ]* [^ ]* \[([\d ]*)\] \"[^ ]* ([^ ]*) [^ ]*\" \d{3} \d \"([^"]*)\" \"([^"]*)\" \"[^ ]*\"");
27 this.outputCollector = outputCollector;
28 }
29
30 @Override
31 public void execute(Tuple tuple) {
32 String webLog=tuple.getStringByField("str");
33 if(webLog!= null || !"".equals(webLog)){
34
35 Matcher matcher = pattern.matcher(webLog);
36 if(matcher.find()){
37 //
38 String ip = matcher.group(1);
39 String serverTimeStr = matcher.group(2);
40
41 // 处理时间
42 long timestamp = Long.parseLong(serverTimeStr);
43 Date date = new Date();
44 date.setTime(timestamp);
45
46 DateFormat df = new SimpleDateFormat("yyyyMMddHHmm");
47 String dateStr = df.format(date);
48 String day = dateStr.substring(0,8);
49 String hour = dateStr.substring(0,10);
50 String minute = dateStr ;
51
52 String requestUrl = matcher.group(3);
53 String httpRefer = matcher.group(4);
54 String userAgent = matcher.group(5);
55
56 //可以验证是否匹配正确
57 // System.err.println(webLog);
58 // System.err.println(
59 // "ip=" ip
60 // ", serverTimeStr=" serverTimeStr
61 // ", requestUrl=" requestUrl
62 // ", httpRefer=" httpRefer
63 // ", userAgent=" userAgent
64 // );
65
66 //分流
67 this.outputCollector.emit(WebLogConstants.IP_COUNT_STREAM, tuple,new Values(day, hour, minute, ip));
68 this.outputCollector.emit(WebLogConstants.URL_PARSER_STREAM, tuple,new Values(day, hour, minute, requestUrl));
69 this.outputCollector.emit(WebLogConstants.HTTPREFER_PARSER_STREAM, tuple,new Values(day, hour, minute, httpRefer));
70 this.outputCollector.emit(WebLogConstants.USERAGENT_PARSER_STREAM, tuple,new Values(day, hour, minute, userAgent));
71 }
72 }
73 this.outputCollector.ack(tuple);
74
75 }
76
77 @Override
78 public void cleanup() {
79
80 }
81
82 @Override
83 public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
84 outputFieldsDeclarer.declareStream(WebLogConstants.IP_COUNT_STREAM,new Fields(DAY, HOUR, MINUTE, IP));
85 outputFieldsDeclarer.declareStream(WebLogConstants.URL_PARSER_STREAM,new Fields(DAY, HOUR, MINUTE, REQUEST_URL));
86 outputFieldsDeclarer.declareStream(WebLogConstants.HTTPREFER_PARSER_STREAM,new Fields(DAY, HOUR, MINUTE, HTTP_REFER));
87 outputFieldsDeclarer.declareStream(WebLogConstants.USERAGENT_PARSER_STREAM,new Fields(DAY, HOUR, MINUTE, USERAGENT));
88 }
89
90 @Override
91 public Map<String, Object> getComponentConfiguration() {
92 return null;
93 }
94 }
三:第三步,通用计数器
1.CountKpiBolt
代码语言:javascript复制 1 package com.jun.it2;
2
3 import backtype.storm.task.OutputCollector;
4 import backtype.storm.task.TopologyContext;
5 import backtype.storm.topology.IRichBolt;
6 import backtype.storm.topology.OutputFieldsDeclarer;
7 import backtype.storm.tuple.Fields;
8 import backtype.storm.tuple.Tuple;
9 import backtype.storm.tuple.Values;
10
11 import java.util.HashMap;
12 import java.util.Iterator;
13 import java.util.Map;
14
15 public class CountKpiBolt implements IRichBolt {
16
17 private String kpiType;
18
19 private Map<String,Integer> kpiCounts;
20
21 private String currentDay = "";
22
23 private OutputCollector outputCollector;
24
25 public CountKpiBolt(String kpiType){
26 this.kpiType = kpiType;
27 }
28
29 @Override
30 public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
31 this.kpiCounts = new HashMap<>();
32 this.outputCollector = outputCollector;
33 }
34
35 @Override
36 public void execute(Tuple tuple) {
37 String day = tuple.getStringByField("day");
38 String hour = tuple.getStringByField("hour");
39 String minute = tuple.getStringByField("minute");
40 String kpi = tuple.getString(3);
41 //日期与KPI组合
42 String kpiByDay = day "_" kpi;
43 String kpiByHour = hour "_" kpi;
44 String kpiByMinute = minute "_" kpi;
45 //将计数信息存放到Map中
46 int kpiCountByDay = 0;
47 int kpiCountByHour = 0;
48 int kpiCountByMinute = 0;
49 if(kpiCounts.containsKey(kpiByDay)){
50 kpiCountByDay = kpiCounts.get(kpiByDay);
51 }
52 if(kpiCounts.containsKey(kpiByHour)){
53 kpiCountByHour = kpiCounts.get(kpiByHour);
54 }
55 if(kpiCounts.containsKey(kpiByMinute)){
56 kpiCountByMinute = kpiCounts.get(kpiByMinute);
57 }
58 kpiCountByDay ;
59 kpiCountByHour ;
60 kpiCountByMinute ;
61 kpiCounts.put(kpiByDay, kpiCountByDay);
62 kpiCounts.put(kpiByHour, kpiCountByHour);
63 kpiCounts.put(kpiByMinute,kpiCountByMinute);
64 //隔天清空内存
65 if(!currentDay.equals(day)){
66 // 说明隔天了
67 Iterator<Map.Entry<String,Integer>> iter = kpiCounts.entrySet().iterator();
68 while(iter.hasNext()){
69 Map.Entry<String,Integer> entry = iter.next();
70 if(entry.getKey().startsWith(currentDay)){
71 iter.remove();
72 }
73 }
74 }
75 currentDay = day;
76 //发射
77 //发射两个字段
78 this.outputCollector.emit(tuple, new Values(kpiType "_" kpiByDay, kpiCountByDay));
79 this.outputCollector.emit(tuple, new Values(kpiType "_" kpiByHour, kpiCountByHour));
80 this.outputCollector.emit(tuple, new Values(kpiType "_" kpiByMinute, kpiCountByMinute));
81 this.outputCollector.ack(tuple);
82
83 }
84
85 @Override
86 public void cleanup() {
87
88 }
89
90 @Override
91 public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
92 outputFieldsDeclarer.declare(new Fields(WebLogConstants.SERVERTIME_KPI, WebLogConstants.KPI_COUNTS));
93 }
94
95 @Override
96 public Map<String, Object> getComponentConfiguration() {
97 return null;
98 }
99 }
2.saveBolt.java
主要是打印功能。
代码语言:javascript复制 1 package com.jun.it2;
2
3 import backtype.storm.task.OutputCollector;
4 import backtype.storm.task.TopologyContext;
5 import backtype.storm.topology.IRichBolt;
6 import backtype.storm.topology.OutputFieldsDeclarer;
7 import backtype.storm.tuple.Tuple;
8
9 import java.util.Map;
10
11 public class SaveBolt implements IRichBolt {
12
13 @Override
14 public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
15
16 }
17
18 @Override
19 public void execute(Tuple tuple) {
20 String serverTimeAndKpi = tuple.getStringByField(WebLogConstants.SERVERTIME_KPI);
21 Integer kpiCounts = tuple.getIntegerByField(WebLogConstants.KPI_COUNTS);
22 System.err.println("serverTimeAndKpi=" serverTimeAndKpi ", kpiCounts=" kpiCounts);
23 }
24
25 @Override
26 public void cleanup() {
27
28 }
29
30 @Override
31 public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
32
33 }
34
35 @Override
36 public Map<String, Object> getComponentConfiguration() {
37 return null;
38 }
39 }
3.效果
四:保存到HBase中
1.saveBolt.java
代码语言:javascript复制 1 package com.jun.it2;
2
3 import backtype.storm.task.OutputCollector;
4 import backtype.storm.task.TopologyContext;
5 import backtype.storm.topology.IRichBolt;
6 import backtype.storm.topology.OutputFieldsDeclarer;
7 import backtype.storm.tuple.Tuple;
8 import org.apache.hadoop.conf.Configuration;
9 import org.apache.hadoop.hbase.HBaseConfiguration;
10 import org.apache.hadoop.hbase.client.HTable;
11 import org.apache.hadoop.hbase.client.Put;
12 import org.apache.hadoop.hbase.util.Bytes;
13
14 import java.io.IOException;
15 import java.util.Map;
16
17 import static com.jun.it2.WebLogConstants.HBASE_TABLENAME;
18
19 public class SaveBolt implements IRichBolt {
20 private HTable table;
21
22 private OutputCollector outputCollector;
23 @Override
24 public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
25 Configuration configuration = HBaseConfiguration.create();
26 try {
27 table = new HTable(configuration,HBASE_TABLENAME);
28 } catch (IOException e) {
29 e.printStackTrace();
30 throw new RuntimeException(e);
31 }
32
33 this.outputCollector = outputCollector;
34 }
35
36 @Override
37 public void execute(Tuple tuple) {
38 String serverTimeAndKpi = tuple.getStringByField(WebLogConstants.SERVERTIME_KPI);
39 Integer kpiCounts = tuple.getIntegerByField(WebLogConstants.KPI_COUNTS);
40 // System.err.println("serverTimeAndKpi=" serverTimeAndKpi ", kpiCounts=" kpiCounts);
41 if(serverTimeAndKpi!= null && kpiCounts != null){
42
43 Put put = new Put(Bytes.toBytes(serverTimeAndKpi));
44 String columnQuelifier = serverTimeAndKpi.split("_")[0];
45 put.add(Bytes.toBytes(WebLogConstants.COLUMN_FAMILY),
46 Bytes.toBytes(columnQuelifier),Bytes.toBytes("" kpiCounts));
47
48 try {
49 table.put(put);
50 } catch (IOException e) {
51 throw new RuntimeException(e);
52 }
53 }
54 this.outputCollector.ack(tuple);
55 }
56
57 @Override
58 public void cleanup() {
59 if(table!= null){
60 try {
61 table.close();
62 } catch (IOException e) {
63 e.printStackTrace();
64 }
65 }
66 }
67
68 @Override
69 public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
70
71 }
72
73 @Override
74 public Map<String, Object> getComponentConfiguration() {
75 return null;
76 }
77 }
2.当前服务
3.进入Hbase建表
4.运行程序
出现报错信息
代码语言:javascript复制 1 ERROR org.apache.hadoop.util.Shell - Failed to locate the winutils binary in the hadoop binary path
2 java.io.IOException: Could not locate executable nullbinwinutils.exe in the Hadoop binaries.
3 at org.apache.hadoop.util.Shell.getQualifiedBinPath(Shell.java:355) [hadoop-common-2.5.0-cdh5.3.6.jar:na]
4 at org.apache.hadoop.util.Shell.getWinUtilsPath(Shell.java:370) [hadoop-common-2.5.0-cdh5.3.6.jar:na]
5 at org.apache.hadoop.util.Shell.<clinit>(Shell.java:363) [hadoop-common-2.5.0-cdh5.3.6.jar:na]
6 at org.apache.hadoop.util.StringUtils.<clinit>(StringUtils.java:79) [hadoop-common-2.5.0-cdh5.3.6.jar:na]
7 at org.apache.hadoop.security.Groups.parseStaticMapping(Groups.java:104) [hadoop-common-2.5.0-cdh5.3.6.jar:na]
8 at org.apache.hadoop.security.Groups.<init>(Groups.java:86) [hadoop-common-2.5.0-cdh5.3.6.jar:na]
9 at org.apache.hadoop.security.Groups.<init>(Groups.java:66) [hadoop-common-2.5.0-cdh5.3.6.jar:na]
10 at org.apache.hadoop.security.Groups.getUserToGroupsMappingService(Groups.java:280) [hadoop-common-2.5.0-cdh5.3.6.jar:na]
11 at org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:269) [hadoop-common-2.5.0-cdh5.3.6.jar:na]
12 at org.apache.hadoop.security.UserGroupInformation.ensureInitialized(UserGroupInformation.java:246) [hadoop-common-2.5.0-cdh5.3.6.jar:na]
13 at org.apache.hadoop.security.UserGroupInformation.loginUserFromSubject(UserGroupInformation.java:775) [hadoop-common-2.5.0-cdh5.3.6.jar:na]
14 at org.apache.hadoop.security.UserGroupInformation.getLoginUser(UserGroupInformation.java:760) [hadoop-common-2.5.0-cdh5.3.6.jar:na]
15 at org.apache.hadoop.security.UserGroupInformation.getCurrentUser(UserGroupInformation.java:633) [hadoop-common-2.5.0-cdh5.3.6.jar:na]
16 at org.apache.hadoop.hbase.security.User$SecureHadoopUser.<init>(User.java:260) [hbase-common-0.98.6-cdh5.3.6.jar:na]
17 at org.apache.hadoop.hbase.security.User$SecureHadoopUser.<init>(User.java:256) [hbase-common-0.98.6-cdh5.3.6.jar:na]
18 at org.apache.hadoop.hbase.security.User.getCurrent(User.java:160) [hbase-common-0.98.6-cdh5.3.6.jar:na]
19 at org.apache.hadoop.hbase.security.UserProvider.getCurrent(UserProvider.java:89) [hbase-common-0.98.6-cdh5.3.6.jar:na]
20 at org.apache.hadoop.hbase.client.HConnectionKey.<init>(HConnectionKey.java:70) [hbase-client-0.98.6-cdh5.3.6.jar:na]
21 at org.apache.hadoop.hbase.client.HConnectionManager.getConnection(HConnectionManager.java:267) [hbase-client-0.98.6-cdh5.3.6.jar:na]
22 at org.apache.hadoop.hbase.client.HTable.<init>(HTable.java:199) [hbase-client-0.98.6-cdh5.3.6.jar:na]
23 at org.apache.hadoop.hbase.client.HTable.<init>(HTable.java:161) [hbase-client-0.98.6-cdh5.3.6.jar:na]
24 at com.jun.it2.SaveBolt.prepare(SaveBolt.java:27) [classes/:na]
25 at backtype.storm.daemon.executor$fn__3439$fn__3451.invoke(executor.clj:699) [storm-core-0.9.6.jar:0.9.6]
26 at backtype.storm.util$async_loop$fn__460.invoke(util.clj:461) [storm-core-0.9.6.jar:0.9.6]
27 at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
28 at java.lang.Thread.run(Thread.java:748) [na:1.8.0_144]
5.网上的解决方
1.下载winutils的windows版本
GitHub上,有人提供了winutils的windows的版本,项目地址是:https://github.com/srccodes/hadoop-common-2.2.0-bin,直接下载此项目的zip包,下载后是文件名是hadoop-common-2.2.0-bin-master.zip,随便解压到一个目录
2.配置环境变量
增加用户变量HADOOP_HOME,值是下载的zip包解压的目录,然后在系统变量path里增加$HADOOP_HOMEbin 即可。
再次运行程序,正常执行。
6.截图
7.添加配置文件
这个是必须的,在window下面。
8.最终执行效果
五:PS—程序
1.主驱动类
代码语言:javascript复制 1 package com.jun.it2;
2
3 import backtype.storm.Config;
4 import backtype.storm.LocalCluster;
5 import backtype.storm.StormSubmitter;
6 import backtype.storm.generated.AlreadyAliveException;
7 import backtype.storm.generated.InvalidTopologyException;
8 import backtype.storm.generated.StormTopology;
9 import backtype.storm.spout.SchemeAsMultiScheme;
10 import backtype.storm.topology.IRichSpout;
11 import backtype.storm.topology.TopologyBuilder;
12 import backtype.storm.tuple.Fields;
13 import org.apache.hadoop.fs.Path;
14 import storm.kafka.*;
15
16 import java.io.File;
17 import java.io.IOException;
18 import java.util.UUID;
19
20 public class WebLogStatictis {
21
22 /**
23 * 主函数
24 * @param args
25 */
26 public static void main(String[] args) throws IOException {
27 WebLogStatictis webLogStatictis=new WebLogStatictis();
28 StormTopology stormTopology=webLogStatictis.createTopology();
29 Config config=new Config();
30 //集群或者本地
31 //conf.setNumAckers(4);
32 if(args == null || args.length == 0){
33 // 本地执行
34 LocalCluster localCluster = new LocalCluster();
35 localCluster.submitTopology("webloganalyse2", config , stormTopology);
36 }else{
37 // 提交到集群上执行
38 config.setNumWorkers(4); // 指定使用多少个进程来执行该Topology
39 try {
40 StormSubmitter.submitTopology(args[0],config, stormTopology);
41 } catch (AlreadyAliveException e) {
42 e.printStackTrace();
43 } catch (InvalidTopologyException e) {
44 e.printStackTrace();
45 }
46 }
47
48 }
49 /**
50 * 构造一个kafkaspout
51 * @return
52 */
53 private IRichSpout generateSpout(){
54 BrokerHosts hosts = new ZkHosts("linux-hadoop01.ibeifeng.com:2181");
55 String topic = "nginxlog";
56 String zkRoot = "/" topic;
57 String id = UUID.randomUUID().toString();
58 SpoutConfig spoutConf = new SpoutConfig(hosts,topic,zkRoot,id);
59 spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme()); // 按字符串解析
60 spoutConf.forceFromStart = true;
61 KafkaSpout kafkaSpout = new KafkaSpout(spoutConf);
62 return kafkaSpout;
63 }
64
65 public StormTopology createTopology() {
66 TopologyBuilder topologyBuilder=new TopologyBuilder();
67 //指定Spout
68 topologyBuilder.setSpout(WebLogConstants.KAFKA_SPOUT_ID,generateSpout());
69 //指定WebLogParserBolt
70 topologyBuilder.setBolt(WebLogConstants.WEB_LOG_PARSER_BOLT,new WebLogParserBolt()).shuffleGrouping(WebLogConstants.KAFKA_SPOUT_ID);
71 //指定CountKpiBolt:第一个参数是组件,第二个参数是流ID,第三个参数是分组字段
72 topologyBuilder.setBolt(WebLogConstants.COUNT_IP_BOLT, new CountKpiBolt(WebLogConstants.IP_KPI))
73 .fieldsGrouping(WebLogConstants.WEB_LOG_PARSER_BOLT, WebLogConstants.IP_COUNT_STREAM, new Fields(WebLogConstants.IP));
74 //指定SaveBolt:汇总
75 topologyBuilder.setBolt(WebLogConstants.SAVE_BOLT ,new SaveBolt(),3)
76 .shuffleGrouping(WebLogConstants.COUNT_IP_BOLT)
77 ;
78 return topologyBuilder.createTopology();
79 }
80
81 }
2.常量类
代码语言:javascript复制 1 package com.jun.it2;
2
3 public class WebLogConstants {
4 //Spout与Bolt的ID
5 public static String KAFKA_SPOUT_ID="kafkaSpoutId";
6 public static final String WEB_LOG_PARSER_BOLT = "webLogParserBolt";
7 public static final String COUNT_IP_BOLT = "countIpBolt";
8 public static final String COUNT_BROWSER_BOLT = "countBrowserBolt";
9 public static final String COUNT_OS_BOLT = "countOsBolt";
10 public static final String USER_AGENT_PARSER_BOLT = "userAgentParserBolt";
11 public static final String SAVE_BOLT = "saveBolt";
12
13 //流ID
14 public static final String IP_COUNT_STREAM = "ipCountStream";
15 public static final String URL_PARSER_STREAM = "urlParserStream";
16 public static final String HTTPREFER_PARSER_STREAM = "httpReferParserStream";
17 public static final String USERAGENT_PARSER_STREAM = "userAgentParserStream";
18 public static final String BROWSER_COUNT_STREAM = "browserCountStream";
19 public static final String OS_COUNT_STREAM = "osCountStream";
20
21
22 //tuple key名称
23 public static final String DAY = "day";
24 public static final String HOUR = "hour";
25 public static final String MINUTE = "minute";
26 public static final String IP = "ip";
27 public static final String REQUEST_URL = "requestUrl";
28 public static final String HTTP_REFER = "httpRefer";
29 public static final String USERAGENT = "userAgent";
30 public static final String BROWSER = "browser";
31 public static final String OS = "os";
32 public static final String SERVERTIME_KPI = "serverTimeAndKpi";
33 public static final String KPI_COUNTS = "kpiCounts";
34
35
36 //kpi类型
37 public static final String IP_KPI = "I";
38 public static final String URL_KPI = "U";
39 public static final String BROWSER_KPI = "B";
40 public static final String OS_KPI = "O";
41
42
43 //Hbase
44 public static final String HBASE_TABLENAME = "weblogstatictis";
45 public static final String COLUMN_FAMILY = "info";
46 }
3.解析类
代码语言:javascript复制 1 package com.jun.it2;
2
3 import backtype.storm.task.OutputCollector;
4 import backtype.storm.task.TopologyContext;
5 import backtype.storm.topology.IRichBolt;
6 import backtype.storm.topology.OutputFieldsDeclarer;
7 import backtype.storm.tuple.Fields;
8 import backtype.storm.tuple.Tuple;
9 import backtype.storm.tuple.Values;
10
11 import java.text.DateFormat;
12 import java.text.SimpleDateFormat;
13 import java.util.Date;
14 import java.util.Map;
15 import java.util.regex.Matcher;
16 import java.util.regex.Pattern;
17
18 import static com.jun.it2.WebLogConstants.*;
19
20 public class WebLogParserBolt implements IRichBolt {
21 private Pattern pattern;
22
23 private OutputCollector outputCollector;
24 @Override
25 public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
26 pattern = Pattern.compile("([^ ]*) [^ ]* [^ ]* \[([\d ]*)\] \"[^ ]* ([^ ]*) [^ ]*\" \d{3} \d \"([^"]*)\" \"([^"]*)\" \"[^ ]*\"");
27 this.outputCollector = outputCollector;
28 }
29
30 @Override
31 public void execute(Tuple tuple) {
32 String webLog=tuple.getStringByField("str");
33 if(webLog!= null || !"".equals(webLog)){
34
35 Matcher matcher = pattern.matcher(webLog);
36 if(matcher.find()){
37 //
38 String ip = matcher.group(1);
39 String serverTimeStr = matcher.group(2);
40
41 // 处理时间
42 long timestamp = Long.parseLong(serverTimeStr);
43 Date date = new Date();
44 date.setTime(timestamp);
45
46 DateFormat df = new SimpleDateFormat("yyyyMMddHHmm");
47 String dateStr = df.format(date);
48 String day = dateStr.substring(0,8);
49 String hour = dateStr.substring(0,10);
50 String minute = dateStr ;
51
52 String requestUrl = matcher.group(3);
53 String httpRefer = matcher.group(4);
54 String userAgent = matcher.group(5);
55
56 //可以验证是否匹配正确
57 // System.err.println(webLog);
58 // System.err.println(
59 // "ip=" ip
60 // ", serverTimeStr=" serverTimeStr
61 // ", requestUrl=" requestUrl
62 // ", httpRefer=" httpRefer
63 // ", userAgent=" userAgent
64 // );
65
66 //分流
67 this.outputCollector.emit(WebLogConstants.IP_COUNT_STREAM, tuple,new Values(day, hour, minute, ip));
68 this.outputCollector.emit(WebLogConstants.URL_PARSER_STREAM, tuple,new Values(day, hour, minute, requestUrl));
69 this.outputCollector.emit(WebLogConstants.HTTPREFER_PARSER_STREAM, tuple,new Values(day, hour, minute, httpRefer));
70 this.outputCollector.emit(WebLogConstants.USERAGENT_PARSER_STREAM, tuple,new Values(day, hour, minute, userAgent));
71 }
72 }
73 this.outputCollector.ack(tuple);
74
75 }
76
77 @Override
78 public void cleanup() {
79
80 }
81
82 @Override
83 public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
84 outputFieldsDeclarer.declareStream(WebLogConstants.IP_COUNT_STREAM,new Fields(DAY, HOUR, MINUTE, IP));
85 outputFieldsDeclarer.declareStream(WebLogConstants.URL_PARSER_STREAM,new Fields(DAY, HOUR, MINUTE, REQUEST_URL));
86 outputFieldsDeclarer.declareStream(WebLogConstants.HTTPREFER_PARSER_STREAM,new Fields(DAY, HOUR, MINUTE, HTTP_REFER));
87 outputFieldsDeclarer.declareStream(WebLogConstants.USERAGENT_PARSER_STREAM,new Fields(DAY, HOUR, MINUTE, USERAGENT));
88 }
89
90 @Override
91 public Map<String, Object> getComponentConfiguration() {
92 return null;
93 }
94 }
4.计算类
代码语言:javascript复制 1 package com.jun.it2;
2
3 import backtype.storm.task.OutputCollector;
4 import backtype.storm.task.TopologyContext;
5 import backtype.storm.topology.IRichBolt;
6 import backtype.storm.topology.OutputFieldsDeclarer;
7 import backtype.storm.tuple.Fields;
8 import backtype.storm.tuple.Tuple;
9 import backtype.storm.tuple.Values;
10
11 import java.util.HashMap;
12 import java.util.Iterator;
13 import java.util.Map;
14
15 public class CountKpiBolt implements IRichBolt {
16
17 private String kpiType;
18
19 private Map<String,Integer> kpiCounts;
20
21 private String currentDay = "";
22
23 private OutputCollector outputCollector;
24
25 public CountKpiBolt(String kpiType){
26 this.kpiType = kpiType;
27 }
28
29 @Override
30 public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
31 this.kpiCounts = new HashMap<>();
32 this.outputCollector = outputCollector;
33 }
34
35 @Override
36 public void execute(Tuple tuple) {
37 String day = tuple.getStringByField("day");
38 String hour = tuple.getStringByField("hour");
39 String minute = tuple.getStringByField("minute");
40 String kpi = tuple.getString(3);
41 //日期与KPI组合
42 String kpiByDay = day "_" kpi;
43 String kpiByHour = hour "_" kpi;
44 String kpiByMinute = minute "_" kpi;
45 //将计数信息存放到Map中
46 int kpiCountByDay = 0;
47 int kpiCountByHour = 0;
48 int kpiCountByMinute = 0;
49 if(kpiCounts.containsKey(kpiByDay)){
50 kpiCountByDay = kpiCounts.get(kpiByDay);
51 }
52 if(kpiCounts.containsKey(kpiByHour)){
53 kpiCountByHour = kpiCounts.get(kpiByHour);
54 }
55 if(kpiCounts.containsKey(kpiByMinute)){
56 kpiCountByMinute = kpiCounts.get(kpiByMinute);
57 }
58 kpiCountByDay ;
59 kpiCountByHour ;
60 kpiCountByMinute ;
61 kpiCounts.put(kpiByDay, kpiCountByDay);
62 kpiCounts.put(kpiByHour, kpiCountByHour);
63 kpiCounts.put(kpiByMinute,kpiCountByMinute);
64 //隔天清空内存
65 if(!currentDay.equals(day)){
66 // 说明隔天了
67 Iterator<Map.Entry<String,Integer>> iter = kpiCounts.entrySet().iterator();
68 while(iter.hasNext()){
69 Map.Entry<String,Integer> entry = iter.next();
70 if(entry.getKey().startsWith(currentDay)){
71 iter.remove();
72 }
73 }
74 }
75 currentDay = day;
76 //发射
77 //发射两个字段
78 this.outputCollector.emit(tuple, new Values(kpiType "_" kpiByDay, kpiCountByDay));
79 this.outputCollector.emit(tuple, new Values(kpiType "_" kpiByHour, kpiCountByHour));
80 this.outputCollector.emit(tuple, new Values(kpiType "_" kpiByMinute, kpiCountByMinute));
81 this.outputCollector.ack(tuple);
82
83 }
84
85 @Override
86 public void cleanup() {
87
88 }
89
90 @Override
91 public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
92 outputFieldsDeclarer.declare(new Fields(WebLogConstants.SERVERTIME_KPI, WebLogConstants.KPI_COUNTS));
93 }
94
95 @Override
96 public Map<String, Object> getComponentConfiguration() {
97 return null;
98 }
99 }
5.保存类
代码语言:javascript复制 1 package com.jun.it2;
2
3 import backtype.storm.task.OutputCollector;
4 import backtype.storm.task.TopologyContext;
5 import backtype.storm.topology.IRichBolt;
6 import backtype.storm.topology.OutputFieldsDeclarer;
7 import backtype.storm.tuple.Tuple;
8 import org.apache.hadoop.conf.Configuration;
9 import org.apache.hadoop.hbase.HBaseConfiguration;
10 import org.apache.hadoop.hbase.client.HTable;
11 import org.apache.hadoop.hbase.client.Put;
12 import org.apache.hadoop.hbase.util.Bytes;
13
14 import java.io.IOException;
15 import java.util.Map;
16
17 import static com.jun.it2.WebLogConstants.HBASE_TABLENAME;
18
19 public class SaveBolt implements IRichBolt {
20 private HTable table;
21
22 private OutputCollector outputCollector;
23 @Override
24 public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
25 Configuration configuration = HBaseConfiguration.create();
26 try {
27 table = new HTable(configuration,HBASE_TABLENAME);
28 } catch (IOException e) {
29 e.printStackTrace();
30 throw new RuntimeException(e);
31 }
32
33 this.outputCollector = outputCollector;
34 }
35
36 @Override
37 public void execute(Tuple tuple) {
38 String serverTimeAndKpi = tuple.getStringByField(WebLogConstants.SERVERTIME_KPI);
39 Integer kpiCounts = tuple.getIntegerByField(WebLogConstants.KPI_COUNTS);
40 System.err.println("serverTimeAndKpi=" serverTimeAndKpi ", kpiCounts=" kpiCounts);
41 if(serverTimeAndKpi!= null && kpiCounts != null){
42
43 Put put = new Put(Bytes.toBytes(serverTimeAndKpi));
44 String columnQuelifier = serverTimeAndKpi.split("_")[0];
45 put.add(Bytes.toBytes(WebLogConstants.COLUMN_FAMILY),
46 Bytes.toBytes(columnQuelifier),Bytes.toBytes("" kpiCounts));
47
48 try {
49 table.put(put);
50 } catch (IOException e) {
51 throw new RuntimeException(e);
52 }
53 }
54 this.outputCollector.ack(tuple);
55 }
56
57 @Override
58 public void cleanup() {
59 if(table!= null){
60 try {
61 table.close();
62 } catch (IOException e) {
63 e.printStackTrace();
64 }
65 }
66 }
67
68 @Override
69 public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
70
71 }
72
73 @Override
74 public Map<String, Object> getComponentConfiguration() {
75 return null;
76 }
77 }
发布者:全栈程序员栈长,转载请注明出处:https://javaforall.cn/107485.html原文链接:https://javaforall.cn