Storm初体验

2022-06-29 21:31:52 浏览数 (1)

Storm 是一个开源的、大数据处理系统,与其他系统不同,它旨在用于分布式实时处理且与语言无关。了解更多请自己google,安装过程也请自己搜索。

做了一个简单的例子

package mapstorm;

import backtype.storm.Config; import backtype.storm.StormSubmitter; import backtype.storm.topology.TopologyBuilder; import backtype.storm.tuple.Fields;

public class StormMain {

 public static void main(String[] args) throws Exception {

  TopologyBuilder builder = new TopologyBuilder();         builder.setSpout("word-reader", new WordReader());         builder.setBolt("word-normalizer", new WordNormalizer()).shuffleGrouping("word-reader");         builder.setBolt("word-counter", new WordCounter(), 1).fieldsGrouping("word-normalizer", new Fields("word"));

        //Configuration         Config conf = new Config();         conf.put("wordsFile", args[0]);         conf.setDebug(true);       //conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1); StormSubmitter.submitTopology("wordCounterTopology", conf, builder.createTopology());       //  Thread.sleep(1000);         //StormSubmitter.("wordCounterTopology");     //  StormSubmitter.shutdown();         //Topology run         //conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);         //LocalCluster cluster = new LocalCluster();         //cluster.submitTopology("Getting-Started-Toplogie", conf, builder.createTopology());         //Thread.sleep(2000);         //cluster.shutdown();         //  }

}

----------------------------

package mapstorm;

import Java.util.HashMap; import java.util.Map;

import backtype.storm.task.TopologyContext; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseBasicBolt; import backtype.storm.tuple.Tuple;

public class WordCounter extends BaseBasicBolt {  private static final long serialVersionUID = 5678586644899822142L;  Integer id;     String name;     Map<String, Integer> counters;

 @Override  public void execute(Tuple input, BasicOutputCollector collector) {     String str = input.getString(0);         System.out.println("WordCounter word " str);         if(!counters.containsKey(str)){             counters.put(str, 1);         }else{             Integer c = counters.get(str) 1;             counters.put(str, c);         }  }

 @Override  public void declareOutputFields(OutputFieldsDeclarer declarer) {}

 @Override     public void cleanup() {         System.out.println("-- Word Counter [" name "-" id "] --");         for(Map.Entry<String, Integer> entry : counters.entrySet()){             System.out.println(entry.getKey() ": " entry.getValue());         }         System.out.println("finish-----------");     }

    @Override     public void prepare(Map stormConf, TopologyContext context) {         this.counters = new HashMap<String, Integer>();         this.name = context.getThisComponentId();         this.id = context.getThisTaskId();     } }

----------------------------------

package mapstorm;

import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseBasicBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values;

public class WordNormalizer extends BaseBasicBolt {  public void cleanup() {   System.out.println("finish");  }

 @Override  public void execute(Tuple input, BasicOutputCollector collector) {   String sentence = input.getString(0);         String[] words = sentence.split(" ");         System.out.println("WordNormalizer recevie  " sentence);         for(String word : words){             word = word.trim();             if(!word.isEmpty()){                 word = word.toLowerCase();                 System.out.println("WordNormalizer recevie " sentence "words  " word);                 collector.emit(new Values(word));             }         }  }

 @Override  public void declareOutputFields(OutputFieldsDeclarer declarer) {   declarer.declare(new Fields("word"));  }

}

-------------------------------------

package mapstorm;

import java.io.BufferedReader; import java.io.FileNotFoundException; import java.io.FileReader; import java.util.Map;

import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichSpout; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values;

public class WordReader extends BaseRichSpout {  private SpoutOutputCollector collector;     private FileReader fileReader;     private String filePath;     private boolean completed = false;     public void ack(Object msgId) {         System.out.println("OK:" msgId);     }     public void close() {}

    public void fail(Object msgId) {         System.out.println("FAIL:" msgId);     }

 @Override  public void open(Map conf, TopologyContext context,    SpoutOutputCollector collector) {   try {             this.fileReader = new FileReader(conf.get("wordsFile").toString());         } catch (FileNotFoundException e) {             throw new RuntimeException("Error reading file [" conf.get("wordFile") "]");         }      this.filePath = conf.get("wordsFile").toString();         this.collector = collector;

 }

 @Override  public void nextTuple() {   if(completed){             try {                 Thread.sleep(1000);             } catch (InterruptedException e) {             }             return;         }         String str;         BufferedReader reader =new BufferedReader(fileReader);         try{             while((str = reader.readLine()) != null){              System.out.println("WordReader read" str);                 this.collector.emit(new Values(str),str);                 System.out.println("WordReader out" str);             }         }catch(Exception e){             throw new RuntimeException("Error reading tuple",e);         }finally{             completed = true;         }

 }

 @Override  public void declareOutputFields(OutputFieldsDeclarer declarer) {   declarer.declare(new Fields("line"));

 }

}

完成后打包成storm.jar

通过storm jar storm.jar mapstorm.StormMain  /data/words.txt即可启动运行.ps:words.txt要分发到各Supervisor相应目录下。

可以通过storm ui页面看到Topology中多了一条任务。

如果要终止任务storm kill name即可,这里是storm kill wordCounterTopology

0 人点赞