storm的程序开发可以理解为三步骤
(1)Spout
(2)blot
(3)main类(这个应该一成不变)
废话不多讲,直接上代码:
package main.java.storm.cookbook; import java.util.Map; import java.util.Random; import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichSpout; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; import backtype.storm.utils.Utils; public class HelloWorldSpout extends BaseRichSpout { private static final long serialVersionUID = -4646687160233411001L; private SpoutOutputCollector collector; private int referenceRandom; private static final int MAX_RANDOM = 10; public HelloWorldSpout(){ final Random rand = new Random(); referenceRandom = rand.nextInt(MAX_RANDOM); } @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.collector = collector; } @Override public void nextTuple() { Utils.sleep(100); final Random rand = new Random(); int instanceRandom = rand.nextInt(MAX_RANDOM); if(instanceRandom == referenceRandom){ collector.emit(new Values("Hello World")); } else { collector.emit(new Values("Other Random Word")); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("sentence")); } }
blot代码:
package main.java.storm.cookbook; import java.util.Map; import org.apache.log4j.Logger; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; public class HelloWorldBolt extends BaseRichBolt { public static Logger LOG = Logger.getLogger(HelloWorldBolt.class); private static final long serialVersionUID = -841805977046116528L; private int myCount = 0; @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { } @Override public void execute(Tuple input) { String test = input.getStringByField("sentence"); if(test == "Hello World"){ myCount++; System.out.println("Found a Hello World! My Count is now: " + Integer.toString(myCount)); LOG.debug("Found a Hello World! My Count is now: " + Integer.toString(myCount)); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("myCount")); } }
主类开发:
package main.java.storm.cookbook; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.StormSubmitter; import backtype.storm.topology.TopologyBuilder; import backtype.storm.utils.Utils; public class HelloWorldTopology { /** * @param args * @throws Exception * @throws */ public static void main(String[] args) throws Exception { TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("randomHelloWorld", new HelloWorldSpout(), 2); builder.setBolt("HelloWorldBolt", new HelloWorldBolt(), 10) .<u><span style="color:#ff0000;">shuffleGrouping("randomHelloWorld"); //设置依赖,说明接受哪边来的数据</span></u> Config conf = new Config(); conf.setDebug(true); if(args!=null && args.length > 0) { conf.setNumWorkers(20); StormSubmitter.submitTopology(args[0], conf, builder.createTopology()); } else { LocalCluster cluster = new LocalCluster(); cluster.submitTopology("test", conf, builder.createTopology()); Utils.sleep(10000); cluster.killTopology("test"); cluster.shutdown(); } } }