hadoop, 批处理,通过磁盘IO处理,吞吐量大;
strom,实时,通过内存处理,吞吐量小;
高频:高频率;
生产者和消费者之间使用Message Queue,有Kafka, Active MessageQ和Rabbit Mq
图:Hadoop与Storm的比较
mapreduce的组成由JobTracker和TaskTracker。其中,JobTracker是主节点,TaskTracker是从节点。
mapreduce执行的是job。job表示用户提交的一段计算代码。job在运行时,先提交给JobTracker,然后由JobTracker分给TaskTracker执行。
在job运行时,分为map阶段和reduce阶段。每个阶段中传递的数据单位是kv对。
storm的组成由nimbus和supervisor。其中,nimbus是主节点,supervisor是从节点。
storm执行的是topology。topology表示用户提交的一段计算代码。toplogy在运行时,先提交给nimbus,然后由nimbus分配给supervisor执行。
在topology运行时,分为spout阶段和bolt阶段。每个阶段中传递的数据单位是tuple。
图:创建一个Maven的项目
图:编辑POM文件
写Storm程序对该文件进行单词计数
package mystorm; import java.io.File; import java.io.IOException; import java.util.HashMap; import java.util.List; import java.util.Map; import org.apache.commons.io.FileUtils; import clojure.main; import backtype.storm.LocalCluster; import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.TopologyBuilder; import backtype.storm.topology.base.BaseRichBolt; import backtype.storm.topology.base.BaseRichSpout; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; //本地模式 public class WordCountApp { public static void main(String[] args) { final TopologyBuilder topologyBuilder = new TopologyBuilder(); topologyBuilder.setSpout("1", new MySpout()); topologyBuilder.setBolt("2", new MyBolt()).shuffleGrouping("1"); final HashMap conf = new HashMap(); final LocalCluster localCluster = new LocalCluster(); localCluster.submitTopology(WordCountApp.class.getSimpleName(), conf, topologyBuilder.createTopology()); } } class MySpout extends BaseRichSpout{ private static final long serialVersionUID = 1L; SpoutOutputCollector collector = null; public void open(Map arg0, TopologyContext arg1, SpoutOutputCollector collector) { this.collector = collector; } //最最重要的方法,处理数据的。简单认为是死循环的,监听文件内容的变化 public void nextTuple() { try { final List<String> readLines = FileUtils.readLines(new File("h:/a.txt")); for (String line : readLines) { //把每一行看作一个tuple final Values tuple = new Values(line); //collector把tuple送出去,交给bolt处理 collector.emit(tuple); } Thread.sleep(2000L); } catch (Exception e) { e.printStackTrace(); } } public void declareOutputFields(OutputFieldsDeclarer declarer) { final Fields fields = new Fields("line"); declarer.declare(fields); } } class MyBolt extends BaseRichBolt{ public void execute(Tuple tuple) { final String line = tuple.getString(0); System.err.println(line); try { Thread.sleep(2000L); } catch (InterruptedException e) { e.printStackTrace(); } } public void prepare(Map arg0, TopologyContext arg1, OutputCollector arg2) { // TODO Auto-generated method stub } public void declareOutputFields(OutputFieldsDeclarer arg0) { // TODO Auto-generated method stub } }