现在的位置: 首页 > 综合 > 正文

storm学习三 drpc学习

2019年05月23日 ⁄ 综合 ⁄ 共 3429字 ⁄ 字号 评论关闭

1. DRPC介绍

Storm是一个分布式实时处理框架,它支持以DRPC方式调用.可以理解为Storm是一个集群,DRPC提供了集群中处理功能的访问接口.

其实即使不通过DRPC,而是通过在Topoloye中的spout中建立一个TCP/HTTP监听来接收数据,在最后一个Bolt中将数据发送到指定位置也是可以的。这是后话,后面再进行介绍。而DPRC则是Storm提供的一套开发组建,使用DRPC可以极大的简化这一过程。通过配置drpc服务器,将storm的topology发布为drpc服务。客户端程序可以调用drpc服务将数据发送到storm集群中,并接收处理结果的反馈。这种方式需要drpc服务器进行转发,其中drpc服务器底层通过thrift实现。适合的业务场景主要是实时计算。并且扩展性良好,可以增加每个节点的工作worker数量来动态扩展。

2.Strom drpc服务配置:

端口可以不用配置,默认是:3772

Nimbus节点的配置:

storm.zookeeper.servers:

    - "10.10.249.195"

    - "10.10.249.196"

#

# nimbus.host: "nimbus"

## Locations of the drpc servers

drpc.servers:

    - "10.10.249.197"

Supervisor节点的配置:

########### These MUST be filled in for astorm configuration

storm.zookeeper.servers:

    - "10.10.249.195"

    - "10.10.249.196"

#

nimbus.host: "10.10.249.195"

#

## Locations of the drpc servers

drpc.servers:

    - "10.10.249.197"

#    - "server2"

supervisor.slots.ports:

    -6700

    -6701

    -6702

2.DRPC的使用

DRPC包括服务端和客户端两部分。引用官方的一张图片来进行说明:

1)服务端

服务端由四部分组成:包括一个DRPC Server, 一个 DPRC Spout,一个Topology和一个ReturnResult。

在实际使用中,主要有三个步骤:

a.启动Storm中的DRPC Server;

   首先,修改Storm/conf/storm.yaml中的drpc server地址(上面已经给出例子);需要注意的是:必须修改所有Nimbus和supervisor上的配置文件,设置drpc server地址。否则在运行过程中可能无法返回结果。

  然后,通过 ./storm drpc     命令启动drpc server。

b.创建一个DRPC 的Topology,提交到storm中运行。

  该Toplogy和普通的Topology稍有不同,可以通过两种方式创建:

  创建方法一:直接使用 Storm 提供的LinearDRPCTopologyBuilder。 (不过该方法在0.82版本中显示为已过期,不建议使用)

public static class ExclaimBolt extends BaseBasicBolt {
    public void execute(Tuple tuple, BasicOutputCollector collector) {
        String input = tuple.getString(1);
        collector.emit(new Values(tuple.getValue(0), input + "!"));
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("id", "result"));
    }
}

public static void main(String[] args) throws Exception {
    LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("exclamation");
    builder.addBolt(new ExclaimBolt(), 3);
    		Config conf = new Config();
		conf.setDebug(true);
		if (args == null || args.length == 0) {
			LocalDRPC drpc = new LocalDRPC();
			LocalCluster cluster = new LocalCluster();

			cluster.submitTopology("drpc-demo", conf,
					builder.createLocalTopology(drpc));

			for (String word : new String[] { "hello", "goodbye" }) {
				System.err.println("Result for \"" + word + "\": "
						+ drpc.execute("exclamation", word));
			}

			cluster.shutdown();
			drpc.shutdown();
		} else {
			// conf.setNumWorkers(3);
			StormSubmitter.submitTopology("exclamation", conf,
					builder.createRemoteTopology());
		}
}

创建方法二:直接使用 Storm
提供的通用TopologyBuilder。 不过需要自己手动加上开始的DRPCSpout和结束的ReturnResults。

                            TopologyBuilder builder = new TopologyBuilder(); 
			    //开始的Spout
			    DRPCSpout drpcSpout = new DRPCSpout("exclamation");
			    builder.setSpout("drpc-input", drpcSpout,5);
			   
                            //真正处理的Bolt 
			    builder.setBolt("cpp", new CppBolt(), 5)
			    		.noneGrouping("drpc-input");
                           
                            //结束的ReturnResults
			    builder.setBolt("return", new ReturnResults(),5)
	    		.noneGrouping("cpp");
	    
			    Config conf = new Config();
			    conf.setDebug(false);
			    conf.setMaxTaskParallelism(3);
			    
			    try
			    {
			    	StormSubmitter.submitTopology("exclamation", conf,builder.createTopology());
			    }
			    catch (Exception e)
			    {
			    	e.printStackTrace();
			    }

c.通过DRPCClient对Cluster进行访问

需要修改客户端配置文件 ~/.storm/storm.yaml,配置drpc server的地址。修改方法可storm服务端一样。

访问代码就很简单了:

DRPCClient client = new DRPCClient("10.100.211.232", 3772);
String result = client.execute("exclamation","test");

注意如果是本地模式,topology的提交和drpc的访问都有不同。

    LocalDRPC drpc = new LocalDRPC();
    LocalCluster cluster = new LocalCluster();
    cluster.submitTopology("drpc-demo", conf,builder.createLocalTopology(drpc));
        // 访问
    for (String word : new String[] { "hello", "goodbye" }) {
    System.err.println("Result for \"" + word + "\": "+ drpc.execute("exclamation", word));
	}
    cluster.shutdown();
    drpc.shutdown();

抱歉!评论已关闭.