Storm流处理项目案例

Storm流处理项目案例

大家好,又见面了,我是你们的朋友全栈君。

1.项目框架

  Storm流处理项目案例

 

 

======================程序需要一步一步的调试=====================

一:第一步,KafkaSpout与驱动类

1.此时启动的服务有

  Storm流处理项目案例

 

2.主驱动类

 1 package com.jun.it2;  2  3 import backtype.storm.Config;  4 import backtype.storm.LocalCluster;  5 import backtype.storm.StormSubmitter;  6 import backtype.storm.generated.AlreadyAliveException;  7 import backtype.storm.generated.InvalidTopologyException;  8 import backtype.storm.generated.StormTopology;  9 import backtype.storm.spout.SchemeAsMultiScheme; 10 import backtype.storm.topology.IRichSpout; 11 import backtype.storm.topology.TopologyBuilder; 12 import storm.kafka.*; 13 14 import java.util.UUID; 15 16 public class WebLogStatictis { 17 /** 18  * 主函数 19  * @param args 20 */ 21 public static void main(String[] args) { 22 WebLogStatictis webLogStatictis=new WebLogStatictis(); 23 StormTopology stormTopology=webLogStatictis.createTopology(); 24 Config config=new Config(); 25 //集群或者本地 26 //conf.setNumAckers(4); 27 if(args == null || args.length == 0){ 28 // 本地执行 29 LocalCluster localCluster = new LocalCluster(); 30 localCluster.submitTopology("webloganalyse", config , stormTopology); 31 }else{ 32 // 提交到集群上执行 33 config.setNumWorkers(4); // 指定使用多少个进程来执行该Topology 34 try { 35 StormSubmitter.submitTopology(args[0],config, stormTopology); 36 } catch (AlreadyAliveException e) { 37  e.printStackTrace(); 38 } catch (InvalidTopologyException e) { 39  e.printStackTrace(); 40  } 41  } 42 43  } 44 /** 45  * 构造一个kafkaspout 46  * @return 47 */ 48 private IRichSpout generateSpout(){ 49 BrokerHosts hosts = new ZkHosts("linux-hadoop01.ibeifeng.com:2181"); 50 String topic = "nginxlog"; 51 String zkRoot = "/" + topic; 52 String id = UUID.randomUUID().toString(); 53 SpoutConfig spoutConf = new SpoutConfig(hosts,topic,zkRoot,id); 54 spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme()); // 按字符串解析 55 spoutConf.forceFromStart = true; 56 KafkaSpout kafkaSpout = new KafkaSpout(spoutConf); 57 return kafkaSpout; 58  } 59 60 public StormTopology createTopology() { 61 TopologyBuilder topologyBuilder=new TopologyBuilder(); 62 //指定Spout 63  topologyBuilder.setSpout(WebLogConstants.KAFKA_SPOUT_ID,generateSpout()); 64 // 65 topologyBuilder.setBolt(WebLogConstants.WEB_LOG_PARSER_BOLT,new WebLogParserBolt()).shuffleGrouping(WebLogConstants.KAFKA_SPOUT_ID); 66 67 return topologyBuilder.createTopology(); 68  } 69 70 }

 

3.WebLogParserBolt

  这个主要的是打印Kafka的Spout发送的数据是否正确。

 1 package com.jun.it2;  2  3 import backtype.storm.task.OutputCollector;  4 import backtype.storm.task.TopologyContext;  5 import backtype.storm.topology.IRichBolt;  6 import backtype.storm.topology.OutputFieldsDeclarer;  7 import backtype.storm.tuple.Tuple;  8  9 import java.util.Map; 10 11 public class WebLogParserBolt implements IRichBolt { 12  @Override 13 public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) { 14 15  } 16 17  @Override 18 public void execute(Tuple tuple) { 19 String webLog=tuple.getStringByField("str"); 20  System.out.println(webLog); 21  } 22 23  @Override 24 public void cleanup() { 25 26  } 27 28  @Override 29 public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { 30 31  } 32 33  @Override 34 public Map<String, Object> getComponentConfiguration() { 35 return null; 36  } 37 }

 

4.运行Main

  先消费在Topic中的数据。

  Storm流处理项目案例

 

5.运行kafka的生产者

   bin/kafka-console-producer.sh –topic nginxlog –broker-list linux-hadoop01.ibeifeng.com:9092

  Storm流处理项目案例

 

6.拷贝数据到kafka生产者控制台

  Storm流处理项目案例

 

7.Main下面控制台的程序

  Storm流处理项目案例

 

二:第二步,解析Log

1.WebLogParserBolt

  如果要是验证,就删除两个部分,打开一个注释:

    删掉分流

    删掉发射

    打开打印的注释。

 

2.效果

  这个只要启动Main函数就可以验证。

  Storm流处理项目案例

 

3.WebLogParserBolt

 1 package com.jun.it2;  2  3 import backtype.storm.task.OutputCollector;  4 import backtype.storm.task.TopologyContext;  5 import backtype.storm.topology.IRichBolt;  6 import backtype.storm.topology.OutputFieldsDeclarer;  7 import backtype.storm.tuple.Fields;  8 import backtype.storm.tuple.Tuple;  9 import backtype.storm.tuple.Values; 10 11 import java.text.DateFormat; 12 import java.text.SimpleDateFormat; 13 import java.util.Date; 14 import java.util.Map; 15 import java.util.regex.Matcher; 16 import java.util.regex.Pattern; 17 18 import static com.jun.it2.WebLogConstants.*; 19 20 public class WebLogParserBolt implements IRichBolt { 21 private Pattern pattern; 22 23 private OutputCollector outputCollector; 24  @Override 25 public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) { 26 pattern = Pattern.compile("([^ ]*) [^ ]* [^ ]* \\[([\\d+]*)\\] \\\"[^ ]* ([^ ]*) [^ ]*\\\" \\d{3} \\d+ \\\"([^\"]*)\\\" \\\"([^\"]*)\\\" \\\"[^ ]*\\\""); 27 this.outputCollector = outputCollector; 28  } 29 30  @Override 31 public void execute(Tuple tuple) { 32 String webLog=tuple.getStringByField("str"); 33 if(webLog!= null || !"".equals(webLog)){ 34 35 Matcher matcher = pattern.matcher(webLog); 36 if(matcher.find()){ 37 // 38 String ip = matcher.group(1); 39 String serverTimeStr = matcher.group(2); 40 41 // 处理时间 42 long timestamp = Long.parseLong(serverTimeStr); 43 Date date = new Date(); 44  date.setTime(timestamp); 45 46 DateFormat df = new SimpleDateFormat("yyyyMMddHHmm"); 47 String dateStr = df.format(date); 48 String day = dateStr.substring(0,8); 49 String hour = dateStr.substring(0,10); 50 String minute = dateStr ; 51 52 String requestUrl = matcher.group(3); 53 String httpRefer = matcher.group(4); 54 String userAgent = matcher.group(5); 55 56 //可以验证是否匹配正确 57 // System.err.println(webLog); 58 // System.err.println( 59 // "ip=" + ip 60 // + ", serverTimeStr=" + serverTimeStr 61 // +", requestUrl=" + requestUrl 62 // +", httpRefer=" + httpRefer 63 // +", userAgent=" + userAgent 64 // ); 65 66 //分流 67 this.outputCollector.emit(WebLogConstants.IP_COUNT_STREAM, tuple,new Values(day, hour, minute, ip)); 68 this.outputCollector.emit(WebLogConstants.URL_PARSER_STREAM, tuple,new Values(day, hour, minute, requestUrl)); 69 this.outputCollector.emit(WebLogConstants.HTTPREFER_PARSER_STREAM, tuple,new Values(day, hour, minute, httpRefer)); 70 this.outputCollector.emit(WebLogConstants.USERAGENT_PARSER_STREAM, tuple,new Values(day, hour, minute, userAgent)); 71  } 72  } 73 this.outputCollector.ack(tuple); 74 75  } 76 77  @Override 78 public void cleanup() { 79 80  } 81 82  @Override 83 public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { 84 outputFieldsDeclarer.declareStream(WebLogConstants.IP_COUNT_STREAM,new Fields(DAY, HOUR, MINUTE, IP)); 85 outputFieldsDeclarer.declareStream(WebLogConstants.URL_PARSER_STREAM,new Fields(DAY, HOUR, MINUTE, REQUEST_URL)); 86 outputFieldsDeclarer.declareStream(WebLogConstants.HTTPREFER_PARSER_STREAM,new Fields(DAY, HOUR, MINUTE, HTTP_REFER)); 87 outputFieldsDeclarer.declareStream(WebLogConstants.USERAGENT_PARSER_STREAM,new Fields(DAY, HOUR, MINUTE, USERAGENT)); 88  } 89 90  @Override 91 public Map<String, Object> getComponentConfiguration() { 92 return null; 93  } 94 }

 

 

三:第三步,通用计数器

1.CountKpiBolt

 1 package com.jun.it2;  2  3 import backtype.storm.task.OutputCollector;  4 import backtype.storm.task.TopologyContext;  5 import backtype.storm.topology.IRichBolt;  6 import backtype.storm.topology.OutputFieldsDeclarer;  7 import backtype.storm.tuple.Fields;  8 import backtype.storm.tuple.Tuple;  9 import backtype.storm.tuple.Values; 10 11 import java.util.HashMap; 12 import java.util.Iterator; 13 import java.util.Map; 14 15 public class CountKpiBolt implements IRichBolt { 16 17 private String kpiType; 18 19 private Map<String,Integer> kpiCounts; 20 21 private String currentDay = ""; 22 23 private OutputCollector outputCollector; 24 25 public CountKpiBolt(String kpiType){ 26 this.kpiType = kpiType; 27  } 28 29  @Override 30 public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) { 31 this.kpiCounts = new HashMap<>(); 32 this.outputCollector = outputCollector; 33  } 34 35  @Override 36 public void execute(Tuple tuple) { 37 String day = tuple.getStringByField("day"); 38 String hour = tuple.getStringByField("hour"); 39 String minute = tuple.getStringByField("minute"); 40 String kpi = tuple.getString(3); 41 //日期与KPI组合 42 String kpiByDay = day + "_" + kpi; 43 String kpiByHour = hour +"_" + kpi; 44 String kpiByMinute = minute + "_" + kpi; 45 //将计数信息存放到Map中 46 int kpiCountByDay = 0; 47 int kpiCountByHour = 0; 48 int kpiCountByMinute = 0; 49 if(kpiCounts.containsKey(kpiByDay)){ 50 kpiCountByDay = kpiCounts.get(kpiByDay); 51  } 52 if(kpiCounts.containsKey(kpiByHour)){ 53 kpiCountByHour = kpiCounts.get(kpiByHour); 54  } 55 if(kpiCounts.containsKey(kpiByMinute)){ 56 kpiCountByMinute = kpiCounts.get(kpiByMinute); 57  } 58 kpiCountByDay ++; 59 kpiCountByHour ++; 60 kpiCountByMinute ++; 61  kpiCounts.put(kpiByDay, kpiCountByDay); 62  kpiCounts.put(kpiByHour, kpiCountByHour); 63  kpiCounts.put(kpiByMinute,kpiCountByMinute); 64 //隔天清空内存 65 if(!currentDay.equals(day)){ 66 // 说明隔天了 67 Iterator<Map.Entry<String,Integer>> iter = kpiCounts.entrySet().iterator(); 68 while(iter.hasNext()){ 69 Map.Entry<String,Integer> entry = iter.next(); 70 if(entry.getKey().startsWith(currentDay)){ 71  iter.remove(); 72  } 73  } 74  } 75 currentDay = day; 76 //发射 77 //发射两个字段 78 this.outputCollector.emit(tuple, new Values(kpiType+"_" + kpiByDay, kpiCountByDay)); 79 this.outputCollector.emit(tuple, new Values(kpiType+"_" + kpiByHour, kpiCountByHour)); 80 this.outputCollector.emit(tuple, new Values(kpiType+"_" + kpiByMinute, kpiCountByMinute)); 81 this.outputCollector.ack(tuple); 82 83  } 84 85  @Override 86 public void cleanup() { 87 88  } 89 90  @Override 91 public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { 92 outputFieldsDeclarer.declare(new Fields(WebLogConstants.SERVERTIME_KPI, WebLogConstants.KPI_COUNTS)); 93  } 94 95  @Override 96 public Map<String, Object> getComponentConfiguration() { 97 return null; 98  } 99 }

 

2.saveBolt.java

  主要是打印功能。

 1 package com.jun.it2;  2  3 import backtype.storm.task.OutputCollector;  4 import backtype.storm.task.TopologyContext;  5 import backtype.storm.topology.IRichBolt;  6 import backtype.storm.topology.OutputFieldsDeclarer;  7 import backtype.storm.tuple.Tuple;  8  9 import java.util.Map; 10 11 public class SaveBolt implements IRichBolt { 12 13  @Override 14 public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) { 15 16  } 17 18  @Override 19 public void execute(Tuple tuple) { 20 String serverTimeAndKpi = tuple.getStringByField(WebLogConstants.SERVERTIME_KPI); 21 Integer kpiCounts = tuple.getIntegerByField(WebLogConstants.KPI_COUNTS); 22 System.err.println("serverTimeAndKpi=" + serverTimeAndKpi + ", kpiCounts=" + kpiCounts); 23  } 24 25  @Override 26 public void cleanup() { 27 28  } 29 30  @Override 31 public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { 32 33  } 34 35  @Override 36 public Map<String, Object> getComponentConfiguration() { 37 return null; 38  } 39 }

 

3.效果

  Storm流处理项目案例

 

四:保存到HBase中

1.saveBolt.java

 1 package com.jun.it2;  2  3 import backtype.storm.task.OutputCollector;  4 import backtype.storm.task.TopologyContext;  5 import backtype.storm.topology.IRichBolt;  6 import backtype.storm.topology.OutputFieldsDeclarer;  7 import backtype.storm.tuple.Tuple;  8 import org.apache.hadoop.conf.Configuration;  9 import org.apache.hadoop.hbase.HBaseConfiguration; 10 import org.apache.hadoop.hbase.client.HTable; 11 import org.apache.hadoop.hbase.client.Put; 12 import org.apache.hadoop.hbase.util.Bytes; 13 14 import java.io.IOException; 15 import java.util.Map; 16 17 import static com.jun.it2.WebLogConstants.HBASE_TABLENAME; 18 19 public class SaveBolt implements IRichBolt { 20 private HTable table; 21 22 private OutputCollector outputCollector; 23  @Override 24 public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) { 25 Configuration configuration = HBaseConfiguration.create(); 26 try { 27 table = new HTable(configuration,HBASE_TABLENAME); 28 } catch (IOException e) { 29  e.printStackTrace(); 30 throw new RuntimeException(e); 31  } 32 33 this.outputCollector = outputCollector; 34  } 35 36  @Override 37 public void execute(Tuple tuple) { 38 String serverTimeAndKpi = tuple.getStringByField(WebLogConstants.SERVERTIME_KPI); 39 Integer kpiCounts = tuple.getIntegerByField(WebLogConstants.KPI_COUNTS); 40 // System.err.println("serverTimeAndKpi=" + serverTimeAndKpi + ", kpiCounts=" + kpiCounts); 41 if(serverTimeAndKpi!= null && kpiCounts != null){ 42 43 Put put = new Put(Bytes.toBytes(serverTimeAndKpi)); 44 String columnQuelifier = serverTimeAndKpi.split("_")[0]; 45  put.add(Bytes.toBytes(WebLogConstants.COLUMN_FAMILY), 46 Bytes.toBytes(columnQuelifier),Bytes.toBytes(""+kpiCounts)); 47 48 try { 49  table.put(put); 50 } catch (IOException e) { 51 throw new RuntimeException(e); 52  } 53  } 54 this.outputCollector.ack(tuple); 55  } 56 57  @Override 58 public void cleanup() { 59 if(table!= null){ 60 try { 61  table.close(); 62 } catch (IOException e) { 63  e.printStackTrace(); 64  } 65  } 66  } 67 68  @Override 69 public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { 70 71  } 72 73  @Override 74 public Map<String, Object> getComponentConfiguration() { 75 return null; 76  } 77 }

 

2.当前服务

  Storm流处理项目案例

 

3.进入Hbase建表

  Storm流处理项目案例

 

4.运行程序

  出现报错信息

 1 ERROR org.apache.hadoop.util.Shell - Failed to locate the winutils binary in the hadoop binary path  2 java.io.IOException: Could not locate executable null\bin\winutils.exe in the Hadoop binaries.  3 at org.apache.hadoop.util.Shell.getQualifiedBinPath(Shell.java:355) [hadoop-common-2.5.0-cdh5.3.6.jar:na]  4 at org.apache.hadoop.util.Shell.getWinUtilsPath(Shell.java:370) [hadoop-common-2.5.0-cdh5.3.6.jar:na]  5 at org.apache.hadoop.util.Shell.<clinit>(Shell.java:363) [hadoop-common-2.5.0-cdh5.3.6.jar:na]  6 at org.apache.hadoop.util.StringUtils.<clinit>(StringUtils.java:79) [hadoop-common-2.5.0-cdh5.3.6.jar:na]  7 at org.apache.hadoop.security.Groups.parseStaticMapping(Groups.java:104) [hadoop-common-2.5.0-cdh5.3.6.jar:na]  8 at org.apache.hadoop.security.Groups.<init>(Groups.java:86) [hadoop-common-2.5.0-cdh5.3.6.jar:na]  9 at org.apache.hadoop.security.Groups.<init>(Groups.java:66) [hadoop-common-2.5.0-cdh5.3.6.jar:na] 10 at org.apache.hadoop.security.Groups.getUserToGroupsMappingService(Groups.java:280) [hadoop-common-2.5.0-cdh5.3.6.jar:na] 11 at org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:269) [hadoop-common-2.5.0-cdh5.3.6.jar:na] 12 at org.apache.hadoop.security.UserGroupInformation.ensureInitialized(UserGroupInformation.java:246) [hadoop-common-2.5.0-cdh5.3.6.jar:na] 13 at org.apache.hadoop.security.UserGroupInformation.loginUserFromSubject(UserGroupInformation.java:775) [hadoop-common-2.5.0-cdh5.3.6.jar:na] 14 at org.apache.hadoop.security.UserGroupInformation.getLoginUser(UserGroupInformation.java:760) [hadoop-common-2.5.0-cdh5.3.6.jar:na] 15 at org.apache.hadoop.security.UserGroupInformation.getCurrentUser(UserGroupInformation.java:633) [hadoop-common-2.5.0-cdh5.3.6.jar:na] 16 at org.apache.hadoop.hbase.security.User$SecureHadoopUser.<init>(User.java:260) [hbase-common-0.98.6-cdh5.3.6.jar:na] 17 at org.apache.hadoop.hbase.security.User$SecureHadoopUser.<init>(User.java:256) [hbase-common-0.98.6-cdh5.3.6.jar:na] 18 at org.apache.hadoop.hbase.security.User.getCurrent(User.java:160) [hbase-common-0.98.6-cdh5.3.6.jar:na] 19 at org.apache.hadoop.hbase.security.UserProvider.getCurrent(UserProvider.java:89) [hbase-common-0.98.6-cdh5.3.6.jar:na] 20 at org.apache.hadoop.hbase.client.HConnectionKey.<init>(HConnectionKey.java:70) [hbase-client-0.98.6-cdh5.3.6.jar:na] 21 at org.apache.hadoop.hbase.client.HConnectionManager.getConnection(HConnectionManager.java:267) [hbase-client-0.98.6-cdh5.3.6.jar:na] 22 at org.apache.hadoop.hbase.client.HTable.<init>(HTable.java:199) [hbase-client-0.98.6-cdh5.3.6.jar:na] 23 at org.apache.hadoop.hbase.client.HTable.<init>(HTable.java:161) [hbase-client-0.98.6-cdh5.3.6.jar:na] 24 at com.jun.it2.SaveBolt.prepare(SaveBolt.java:27) [classes/:na] 25 at backtype.storm.daemon.executor$fn__3439$fn__3451.invoke(executor.clj:699) [storm-core-0.9.6.jar:0.9.6] 26 at backtype.storm.util$async_loop$fn__460.invoke(util.clj:461) [storm-core-0.9.6.jar:0.9.6] 27 at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na] 28 at java.lang.Thread.run(Thread.java:748) [na:1.8.0_144]

 

5.网上的解决方

  1.下载winutils的windows版本

    GitHub上,有人提供了winutils的windows的版本,项目地址是:https://github.com/srccodes/hadoop-common-2.2.0-bin,直接下载此项目的zip包,下载后是文件名是hadoop-common-2.2.0-bin-master.zip,随便解压到一个目录

  2.配置环境变量

    增加用户变量HADOOP_HOME,值是下载的zip包解压的目录,然后在系统变量path里增加$HADOOP_HOME\bin 即可。  

    再次运行程序,正常执行。

 

6.截图

  Storm流处理项目案例

 

7.添加配置文件

  这个是必须的,在window下面。

  Storm流处理项目案例

 

8.最终执行效果

  Storm流处理项目案例

 

 五:PS—程序

1.主驱动类

 1 package com.jun.it2;  2  3 import backtype.storm.Config;  4 import backtype.storm.LocalCluster;  5 import backtype.storm.StormSubmitter;  6 import backtype.storm.generated.AlreadyAliveException;  7 import backtype.storm.generated.InvalidTopologyException;  8 import backtype.storm.generated.StormTopology;  9 import backtype.storm.spout.SchemeAsMultiScheme; 10 import backtype.storm.topology.IRichSpout; 11 import backtype.storm.topology.TopologyBuilder; 12 import backtype.storm.tuple.Fields; 13 import org.apache.hadoop.fs.Path; 14 import storm.kafka.*; 15 16 import java.io.File; 17 import java.io.IOException; 18 import java.util.UUID; 19 20 public class WebLogStatictis { 21 22 /** 23  * 主函数 24  * @param args 25 */ 26 public static void main(String[] args) throws IOException { 27 WebLogStatictis webLogStatictis=new WebLogStatictis(); 28 StormTopology stormTopology=webLogStatictis.createTopology(); 29 Config config=new Config(); 30 //集群或者本地 31 //conf.setNumAckers(4); 32 if(args == null || args.length == 0){ 33 // 本地执行 34 LocalCluster localCluster = new LocalCluster(); 35 localCluster.submitTopology("webloganalyse2", config , stormTopology); 36 }else{ 37 // 提交到集群上执行 38 config.setNumWorkers(4); // 指定使用多少个进程来执行该Topology 39 try { 40 StormSubmitter.submitTopology(args[0],config, stormTopology); 41 } catch (AlreadyAliveException e) { 42  e.printStackTrace(); 43 } catch (InvalidTopologyException e) { 44  e.printStackTrace(); 45  } 46  } 47 48  } 49 /** 50  * 构造一个kafkaspout 51  * @return 52 */ 53 private IRichSpout generateSpout(){ 54 BrokerHosts hosts = new ZkHosts("linux-hadoop01.ibeifeng.com:2181"); 55 String topic = "nginxlog"; 56 String zkRoot = "/" + topic; 57 String id = UUID.randomUUID().toString(); 58 SpoutConfig spoutConf = new SpoutConfig(hosts,topic,zkRoot,id); 59 spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme()); // 按字符串解析 60 spoutConf.forceFromStart = true; 61 KafkaSpout kafkaSpout = new KafkaSpout(spoutConf); 62 return kafkaSpout; 63  } 64 65 public StormTopology createTopology() { 66 TopologyBuilder topologyBuilder=new TopologyBuilder(); 67 //指定Spout 68  topologyBuilder.setSpout(WebLogConstants.KAFKA_SPOUT_ID,generateSpout()); 69 //指定WebLogParserBolt 70 topologyBuilder.setBolt(WebLogConstants.WEB_LOG_PARSER_BOLT,new WebLogParserBolt()).shuffleGrouping(WebLogConstants.KAFKA_SPOUT_ID); 71 //指定CountKpiBolt:第一个参数是组件,第二个参数是流ID,第三个参数是分组字段 72 topologyBuilder.setBolt(WebLogConstants.COUNT_IP_BOLT, new CountKpiBolt(WebLogConstants.IP_KPI)) 73 .fieldsGrouping(WebLogConstants.WEB_LOG_PARSER_BOLT, WebLogConstants.IP_COUNT_STREAM, new Fields(WebLogConstants.IP)); 74 //指定SaveBolt:汇总 75 topologyBuilder.setBolt(WebLogConstants.SAVE_BOLT ,new SaveBolt(),3) 76  .shuffleGrouping(WebLogConstants.COUNT_IP_BOLT) 77  ; 78 return topologyBuilder.createTopology(); 79  } 80 81 }

 

2.常量类

 1 package com.jun.it2;  2  3 public class WebLogConstants {  4 //Spout与Bolt的ID  5 public static String KAFKA_SPOUT_ID="kafkaSpoutId";  6 public static final String WEB_LOG_PARSER_BOLT = "webLogParserBolt";  7 public static final String COUNT_IP_BOLT = "countIpBolt";  8 public static final String COUNT_BROWSER_BOLT = "countBrowserBolt";  9 public static final String COUNT_OS_BOLT = "countOsBolt"; 10 public static final String USER_AGENT_PARSER_BOLT = "userAgentParserBolt"; 11 public static final String SAVE_BOLT = "saveBolt"; 12 13 //流ID 14 public static final String IP_COUNT_STREAM = "ipCountStream"; 15 public static final String URL_PARSER_STREAM = "urlParserStream"; 16 public static final String HTTPREFER_PARSER_STREAM = "httpReferParserStream"; 17 public static final String USERAGENT_PARSER_STREAM = "userAgentParserStream"; 18 public static final String BROWSER_COUNT_STREAM = "browserCountStream"; 19 public static final String OS_COUNT_STREAM = "osCountStream"; 20 21 22 //tuple key名称 23 public static final String DAY = "day"; 24 public static final String HOUR = "hour"; 25 public static final String MINUTE = "minute"; 26 public static final String IP = "ip"; 27 public static final String REQUEST_URL = "requestUrl"; 28 public static final String HTTP_REFER = "httpRefer"; 29 public static final String USERAGENT = "userAgent"; 30 public static final String BROWSER = "browser"; 31 public static final String OS = "os"; 32 public static final String SERVERTIME_KPI = "serverTimeAndKpi"; 33 public static final String KPI_COUNTS = "kpiCounts"; 34 35 36 //kpi类型 37 public static final String IP_KPI = "I"; 38 public static final String URL_KPI = "U"; 39 public static final String BROWSER_KPI = "B"; 40 public static final String OS_KPI = "O"; 41 42 43 //Hbase 44 public static final String HBASE_TABLENAME = "weblogstatictis"; 45 public static final String COLUMN_FAMILY = "info"; 46 }

 

3.解析类

 1 package com.jun.it2;  2  3 import backtype.storm.task.OutputCollector;  4 import backtype.storm.task.TopologyContext;  5 import backtype.storm.topology.IRichBolt;  6 import backtype.storm.topology.OutputFieldsDeclarer;  7 import backtype.storm.tuple.Fields;  8 import backtype.storm.tuple.Tuple;  9 import backtype.storm.tuple.Values; 10 11 import java.text.DateFormat; 12 import java.text.SimpleDateFormat; 13 import java.util.Date; 14 import java.util.Map; 15 import java.util.regex.Matcher; 16 import java.util.regex.Pattern; 17 18 import static com.jun.it2.WebLogConstants.*; 19 20 public class WebLogParserBolt implements IRichBolt { 21 private Pattern pattern; 22 23 private OutputCollector outputCollector; 24  @Override 25 public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) { 26 pattern = Pattern.compile("([^ ]*) [^ ]* [^ ]* \\[([\\d+]*)\\] \\\"[^ ]* ([^ ]*) [^ ]*\\\" \\d{3} \\d+ \\\"([^\"]*)\\\" \\\"([^\"]*)\\\" \\\"[^ ]*\\\""); 27 this.outputCollector = outputCollector; 28  } 29 30  @Override 31 public void execute(Tuple tuple) { 32 String webLog=tuple.getStringByField("str"); 33 if(webLog!= null || !"".equals(webLog)){ 34 35 Matcher matcher = pattern.matcher(webLog); 36 if(matcher.find()){ 37 // 38 String ip = matcher.group(1); 39 String serverTimeStr = matcher.group(2); 40 41 // 处理时间 42 long timestamp = Long.parseLong(serverTimeStr); 43 Date date = new Date(); 44  date.setTime(timestamp); 45 46 DateFormat df = new SimpleDateFormat("yyyyMMddHHmm"); 47 String dateStr = df.format(date); 48 String day = dateStr.substring(0,8); 49 String hour = dateStr.substring(0,10); 50 String minute = dateStr ; 51 52 String requestUrl = matcher.group(3); 53 String httpRefer = matcher.group(4); 54 String userAgent = matcher.group(5); 55 56 //可以验证是否匹配正确 57 // System.err.println(webLog); 58 // System.err.println( 59 // "ip=" + ip 60 // + ", serverTimeStr=" + serverTimeStr 61 // +", requestUrl=" + requestUrl 62 // +", httpRefer=" + httpRefer 63 // +", userAgent=" + userAgent 64 // ); 65 66 //分流 67 this.outputCollector.emit(WebLogConstants.IP_COUNT_STREAM, tuple,new Values(day, hour, minute, ip)); 68 this.outputCollector.emit(WebLogConstants.URL_PARSER_STREAM, tuple,new Values(day, hour, minute, requestUrl)); 69 this.outputCollector.emit(WebLogConstants.HTTPREFER_PARSER_STREAM, tuple,new Values(day, hour, minute, httpRefer)); 70 this.outputCollector.emit(WebLogConstants.USERAGENT_PARSER_STREAM, tuple,new Values(day, hour, minute, userAgent)); 71  } 72  } 73 this.outputCollector.ack(tuple); 74 75  } 76 77  @Override 78 public void cleanup() { 79 80  } 81 82  @Override 83 public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { 84 outputFieldsDeclarer.declareStream(WebLogConstants.IP_COUNT_STREAM,new Fields(DAY, HOUR, MINUTE, IP)); 85 outputFieldsDeclarer.declareStream(WebLogConstants.URL_PARSER_STREAM,new Fields(DAY, HOUR, MINUTE, REQUEST_URL)); 86 outputFieldsDeclarer.declareStream(WebLogConstants.HTTPREFER_PARSER_STREAM,new Fields(DAY, HOUR, MINUTE, HTTP_REFER)); 87 outputFieldsDeclarer.declareStream(WebLogConstants.USERAGENT_PARSER_STREAM,new Fields(DAY, HOUR, MINUTE, USERAGENT)); 88  } 89 90  @Override 91 public Map<String, Object> getComponentConfiguration() { 92 return null; 93  } 94 }

 

4.计算类

 1 package com.jun.it2;  2  3 import backtype.storm.task.OutputCollector;  4 import backtype.storm.task.TopologyContext;  5 import backtype.storm.topology.IRichBolt;  6 import backtype.storm.topology.OutputFieldsDeclarer;  7 import backtype.storm.tuple.Fields;  8 import backtype.storm.tuple.Tuple;  9 import backtype.storm.tuple.Values; 10 11 import java.util.HashMap; 12 import java.util.Iterator; 13 import java.util.Map; 14 15 public class CountKpiBolt implements IRichBolt { 16 17 private String kpiType; 18 19 private Map<String,Integer> kpiCounts; 20 21 private String currentDay = ""; 22 23 private OutputCollector outputCollector; 24 25 public CountKpiBolt(String kpiType){ 26 this.kpiType = kpiType; 27  } 28 29  @Override 30 public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) { 31 this.kpiCounts = new HashMap<>(); 32 this.outputCollector = outputCollector; 33  } 34 35  @Override 36 public void execute(Tuple tuple) { 37 String day = tuple.getStringByField("day"); 38 String hour = tuple.getStringByField("hour"); 39 String minute = tuple.getStringByField("minute"); 40 String kpi = tuple.getString(3); 41 //日期与KPI组合 42 String kpiByDay = day + "_" + kpi; 43 String kpiByHour = hour +"_" + kpi; 44 String kpiByMinute = minute + "_" + kpi; 45 //将计数信息存放到Map中 46 int kpiCountByDay = 0; 47 int kpiCountByHour = 0; 48 int kpiCountByMinute = 0; 49 if(kpiCounts.containsKey(kpiByDay)){ 50 kpiCountByDay = kpiCounts.get(kpiByDay); 51  } 52 if(kpiCounts.containsKey(kpiByHour)){ 53 kpiCountByHour = kpiCounts.get(kpiByHour); 54  } 55 if(kpiCounts.containsKey(kpiByMinute)){ 56 kpiCountByMinute = kpiCounts.get(kpiByMinute); 57  } 58 kpiCountByDay ++; 59 kpiCountByHour ++; 60 kpiCountByMinute ++; 61  kpiCounts.put(kpiByDay, kpiCountByDay); 62  kpiCounts.put(kpiByHour, kpiCountByHour); 63  kpiCounts.put(kpiByMinute,kpiCountByMinute); 64 //隔天清空内存 65 if(!currentDay.equals(day)){ 66 // 说明隔天了 67 Iterator<Map.Entry<String,Integer>> iter = kpiCounts.entrySet().iterator(); 68 while(iter.hasNext()){ 69 Map.Entry<String,Integer> entry = iter.next(); 70 if(entry.getKey().startsWith(currentDay)){ 71  iter.remove(); 72  } 73  } 74  } 75 currentDay = day; 76 //发射 77 //发射两个字段 78 this.outputCollector.emit(tuple, new Values(kpiType+"_" + kpiByDay, kpiCountByDay)); 79 this.outputCollector.emit(tuple, new Values(kpiType+"_" + kpiByHour, kpiCountByHour)); 80 this.outputCollector.emit(tuple, new Values(kpiType+"_" + kpiByMinute, kpiCountByMinute)); 81 this.outputCollector.ack(tuple); 82 83  } 84 85  @Override 86 public void cleanup() { 87 88  } 89 90  @Override 91 public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { 92 outputFieldsDeclarer.declare(new Fields(WebLogConstants.SERVERTIME_KPI, WebLogConstants.KPI_COUNTS)); 93  } 94 95  @Override 96 public Map<String, Object> getComponentConfiguration() { 97 return null; 98  } 99 }

 

5.保存类

 1 package com.jun.it2;  2  3 import backtype.storm.task.OutputCollector;  4 import backtype.storm.task.TopologyContext;  5 import backtype.storm.topology.IRichBolt;  6 import backtype.storm.topology.OutputFieldsDeclarer;  7 import backtype.storm.tuple.Tuple;  8 import org.apache.hadoop.conf.Configuration;  9 import org.apache.hadoop.hbase.HBaseConfiguration; 10 import org.apache.hadoop.hbase.client.HTable; 11 import org.apache.hadoop.hbase.client.Put; 12 import org.apache.hadoop.hbase.util.Bytes; 13 14 import java.io.IOException; 15 import java.util.Map; 16 17 import static com.jun.it2.WebLogConstants.HBASE_TABLENAME; 18 19 public class SaveBolt implements IRichBolt { 20 private HTable table; 21 22 private OutputCollector outputCollector; 23  @Override 24 public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) { 25 Configuration configuration = HBaseConfiguration.create(); 26 try { 27 table = new HTable(configuration,HBASE_TABLENAME); 28 } catch (IOException e) { 29  e.printStackTrace(); 30 throw new RuntimeException(e); 31  } 32 33 this.outputCollector = outputCollector; 34  } 35 36  @Override 37 public void execute(Tuple tuple) { 38 String serverTimeAndKpi = tuple.getStringByField(WebLogConstants.SERVERTIME_KPI); 39 Integer kpiCounts = tuple.getIntegerByField(WebLogConstants.KPI_COUNTS); 40 System.err.println("serverTimeAndKpi=" + serverTimeAndKpi + ", kpiCounts=" + kpiCounts); 41 if(serverTimeAndKpi!= null && kpiCounts != null){ 42 43 Put put = new Put(Bytes.toBytes(serverTimeAndKpi)); 44 String columnQuelifier = serverTimeAndKpi.split("_")[0]; 45  put.add(Bytes.toBytes(WebLogConstants.COLUMN_FAMILY), 46 Bytes.toBytes(columnQuelifier),Bytes.toBytes(""+kpiCounts)); 47 48 try { 49  table.put(put); 50 } catch (IOException e) { 51 throw new RuntimeException(e); 52  } 53  } 54 this.outputCollector.ack(tuple); 55  } 56 57  @Override 58 public void cleanup() { 59 if(table!= null){ 60 try { 61  table.close(); 62 } catch (IOException e) { 63  e.printStackTrace(); 64  } 65  } 66  } 67 68  @Override 69 public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { 70 71  } 72 73  @Override 74 public Map<String, Object> getComponentConfiguration() { 75 return null; 76  } 77 }

 

 

 

  

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

  

 

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/107485.html原文链接:https://javaforall.cn

【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛

【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...

(0)
blank

相关推荐

  • Android开发必备工具

    Android开发必备工具工欲善其事,必先利其器,在Android项目的开发中,借助工具能使开发效率大幅提升,下面分享我经常使用的工具,欢迎各位同学补充。1.AndroidStudioAndroid程序员的吃饭工具,可以说现在绝大部分的安卓项目都是跑在AndroidStudio上面的。AndroidStudio是基于IntelliJIDEA且适用于开发Android应用的官方集成开发环境(IDE…

  • 基于51单片机四路循迹小车

    基于51单片机四路循迹小车这学期开设的51单片机课程的课程设计即将验收,今天开始正式着手做循迹小车~

  • ASP.NET图书管理系统简单实现步骤「建议收藏」

    ASP.NET图书管理系统简单实现步骤「建议收藏」一、数据库添加 ![用户信息表](https://img-blog.csdnimg.cn/20190105001446419.PNG?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3FxXzQ0MDMyNDEx,size_16,color_…

  • 数据库课程设计-宿舍管理系统「建议收藏」

    数据库课程设计-宿舍管理系统「建议收藏」最近写完了数据库的课程设计,想把整个源码的编辑过程发出来。程序很简单,需要有很多完善的地方,在这里,我想和大家分享写这个程序的心路历程。首先,在开始写程序之前。我们需要先写一些工具类,来辅助完成整个程序的构建,在这里我把连接jdbc的代码放在了一个包下面。如下图:在这里我们先来写最基本的类,jdbcDrive,这是负责和数据库进行连接,并且执行语句的类publ…

  • JAVA 解析Xml字符串

    JAVA 解析Xml字符串JAVA解析Xml字符串(dom4j)

  • win10 硬盘图标变成空白解决办法是什么_新电脑如何分区硬盘win10

    win10 硬盘图标变成空白解决办法是什么_新电脑如何分区硬盘win10Win10硬盘图标变成空白解决办法如图,有时候会出现这种硬盘图标变成空白的情况,虽然不影响使用,但强迫症表示真的很难受,这里给出解决办法:按下组合键WIN+R,输入regedit再按回车运行,这时候会有一个提示,选择“是”。一层一层找到计算机\HKEY_LOCAL_MACHINE\SOFTWARE\Microsoft\Windows\CurrentVersion\Explorer\DriveIcons\这个路径,或者也可以直接在上方地址栏输入,注意我这里路径最开始显示的是计

    2022年10月18日

发表回复

您的电子邮箱地址不会被公开。

关注全栈程序员社区公众号