现在的位置: 首页 > 综合 > 正文

storm入门

2018年05月20日 ⁄ 综合 ⁄ 共 2710字 ⁄ 字号 评论关闭

hadoop,  批处理,通过磁盘IO处理,吞吐量大;

strom,实时,通过内存处理,吞吐量小;

高频:高频率;

生产者和消费者之间使用Message Queue,有Kafka, Active MessageQ和Rabbit Mq

                             图:Hadoop与Storm的比较

       

mapreduce的组成由JobTracker和TaskTracker。其中,JobTracker是主节点,TaskTracker是从节点。
mapreduce执行的是job。job表示用户提交的一段计算代码。job在运行时,先提交给JobTracker,然后由JobTracker分给TaskTracker执行。
在job运行时,分为map阶段和reduce阶段。每个阶段中传递的数据单位是kv对。

storm的组成由nimbus和supervisor。其中,nimbus是主节点,supervisor是从节点。
storm执行的是topology。topology表示用户提交的一段计算代码。toplogy在运行时,先提交给nimbus,然后由nimbus分配给supervisor执行。
在topology运行时,分为spout阶段和bolt阶段。每个阶段中传递的数据单位是tuple。

                       图:创建一个Maven的项目

                                                                        图:编辑POM文件

    写Storm程序对该文件进行单词计数

package mystorm;

import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.commons.io.FileUtils;

import clojure.main;

import backtype.storm.LocalCluster;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

//本地模式
public class WordCountApp {
	
	public static void main(String[] args) {
		final TopologyBuilder topologyBuilder = new TopologyBuilder();

		topologyBuilder.setSpout("1", new MySpout());
		topologyBuilder.setBolt("2", new MyBolt()).shuffleGrouping("1");
		
		final HashMap conf = new HashMap();
		
		final LocalCluster localCluster = new LocalCluster();
		localCluster.submitTopology(WordCountApp.class.getSimpleName(), conf, topologyBuilder.createTopology());
	}
}

class MySpout extends BaseRichSpout{
	private static final long serialVersionUID = 1L;

	SpoutOutputCollector collector = null;

	public void open(Map arg0, TopologyContext arg1, SpoutOutputCollector collector) {
		this.collector = collector;
	}
	
	//最最重要的方法,处理数据的。简单认为是死循环的,监听文件内容的变化
	public void nextTuple() {
		try {
			final List<String> readLines = FileUtils.readLines(new File("h:/a.txt"));
			for (String line : readLines) {
				//把每一行看作一个tuple
				final Values tuple = new Values(line);
				//collector把tuple送出去,交给bolt处理
				collector.emit(tuple);
			}
			Thread.sleep(2000L);
		} catch (Exception e) {
			e.printStackTrace();
		}
	}


	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		final Fields fields = new Fields("line");
		declarer.declare(fields);
	}
	
}

class MyBolt extends BaseRichBolt{

	public void execute(Tuple tuple) {
		final String line = tuple.getString(0);
		System.err.println(line);
		try {
			Thread.sleep(2000L);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}

	public void prepare(Map arg0, TopologyContext arg1, OutputCollector arg2) {
		// TODO Auto-generated method stub
		
	}

	public void declareOutputFields(OutputFieldsDeclarer arg0) {
		// TODO Auto-generated method stub
		
	}
	
}

抱歉!评论已关闭.