Storm 是一个开源的、大数据处理系统,与其他系统不同,它旨在用于分布式实时处理且与语言无关。了解更多请自己google,安装过程也请自己搜索。
做了一个简单的例子
package mapstorm;
import backtype.storm.Config; import backtype.storm.StormSubmitter; import backtype.storm.topology.TopologyBuilder; import backtype.storm.tuple.Fields;
public class StormMain {
public static void main(String[] args) throws Exception {
TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("word-reader", new WordReader()); builder.setBolt("word-normalizer", new WordNormalizer()).shuffleGrouping("word-reader"); builder.setBolt("word-counter", new WordCounter(), 1).fieldsGrouping("word-normalizer", new Fields("word"));
//Configuration Config conf = new Config(); conf.put("wordsFile", args[0]); conf.setDebug(true); //conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1); StormSubmitter.submitTopology("wordCounterTopology", conf, builder.createTopology()); // Thread.sleep(1000); //StormSubmitter.("wordCounterTopology"); // StormSubmitter.shutdown(); //Topology run //conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1); //LocalCluster cluster = new LocalCluster(); //cluster.submitTopology("Getting-Started-Toplogie", conf, builder.createTopology()); //Thread.sleep(2000); //cluster.shutdown(); // }
}
----------------------------
package mapstorm;
import Java.util.HashMap; import java.util.Map;
import backtype.storm.task.TopologyContext; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseBasicBolt; import backtype.storm.tuple.Tuple;
public class WordCounter extends BaseBasicBolt { private static final long serialVersionUID = 5678586644899822142L; Integer id; String name; Map<String, Integer> counters;
@Override public void execute(Tuple input, BasicOutputCollector collector) { String str = input.getString(0); System.out.println("WordCounter word " str); if(!counters.containsKey(str)){ counters.put(str, 1); }else{ Integer c = counters.get(str) 1; counters.put(str, c); } }
@Override public void declareOutputFields(OutputFieldsDeclarer declarer) {}
@Override public void cleanup() { System.out.println("-- Word Counter [" name "-" id "] --"); for(Map.Entry<String, Integer> entry : counters.entrySet()){ System.out.println(entry.getKey() ": " entry.getValue()); } System.out.println("finish-----------"); }
@Override public void prepare(Map stormConf, TopologyContext context) { this.counters = new HashMap<String, Integer>(); this.name = context.getThisComponentId(); this.id = context.getThisTaskId(); } }
----------------------------------
package mapstorm;
import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseBasicBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values;
public class WordNormalizer extends BaseBasicBolt { public void cleanup() { System.out.println("finish"); }
@Override public void execute(Tuple input, BasicOutputCollector collector) { String sentence = input.getString(0); String[] words = sentence.split(" "); System.out.println("WordNormalizer recevie " sentence); for(String word : words){ word = word.trim(); if(!word.isEmpty()){ word = word.toLowerCase(); System.out.println("WordNormalizer recevie " sentence "words " word); collector.emit(new Values(word)); } } }
@Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); }
}
-------------------------------------
package mapstorm;
import java.io.BufferedReader; import java.io.FileNotFoundException; import java.io.FileReader; import java.util.Map;
import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichSpout; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values;
public class WordReader extends BaseRichSpout { private SpoutOutputCollector collector; private FileReader fileReader; private String filePath; private boolean completed = false; public void ack(Object msgId) { System.out.println("OK:" msgId); } public void close() {}
public void fail(Object msgId) { System.out.println("FAIL:" msgId); }
@Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { try { this.fileReader = new FileReader(conf.get("wordsFile").toString()); } catch (FileNotFoundException e) { throw new RuntimeException("Error reading file [" conf.get("wordFile") "]"); } this.filePath = conf.get("wordsFile").toString(); this.collector = collector;
}
@Override public void nextTuple() { if(completed){ try { Thread.sleep(1000); } catch (InterruptedException e) { } return; } String str; BufferedReader reader =new BufferedReader(fileReader); try{ while((str = reader.readLine()) != null){ System.out.println("WordReader read" str); this.collector.emit(new Values(str),str); System.out.println("WordReader out" str); } }catch(Exception e){ throw new RuntimeException("Error reading tuple",e); }finally{ completed = true; }
}
@Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("line"));
}
}
完成后打包成storm.jar
通过storm jar storm.jar mapstorm.StormMain /data/words.txt即可启动运行.ps:words.txt要分发到各Supervisor相应目录下。
可以通过storm ui页面看到Topology中多了一条任务。
如果要终止任务storm kill name即可,这里是storm kill wordCounterTopology