参照上图
构建DRPC拓补图的拓补构造器:
package backtype.storm.drpc; import backtype.storm.Constants; import backtype.storm.ILocalDRPC; import backtype.storm.coordination.BatchBoltExecutor; import backtype.storm.coordination.CoordinatedBolt; import backtype.storm.coordination.CoordinatedBolt.FinishedCallback; import backtype.storm.coordination.CoordinatedBolt.IdStreamSpec; import backtype.storm.coordination.CoordinatedBolt.SourceArgs; import backtype.storm.coordination.IBatchBolt; import backtype.storm.generated.StormTopology; import backtype.storm.generated.StreamInfo; import backtype.storm.grouping.CustomStreamGrouping; import backtype.storm.topology.*; import backtype.storm.tuple.Fields; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; // Trident subsumes the functionality provided by this class, so it's deprecated @Deprecated public class LinearDRPCTopologyBuilder { String _function; List<Component> _components = new ArrayList<Component>(); public LinearDRPCTopologyBuilder(String function) { _function = function; } public LinearDRPCInputDeclarer addBolt(IBatchBolt bolt, Number parallelism) { return addBolt(new BatchBoltExecutor(bolt), parallelism); } public LinearDRPCInputDeclarer addBolt(IBatchBolt bolt) { return addBolt(bolt, 1); } @Deprecated public LinearDRPCInputDeclarer addBolt(IRichBolt bolt, Number parallelism) { if (parallelism == null) parallelism = 1; Component component = new Component(bolt, parallelism.intValue()); _components.add(component); return new InputDeclarerImpl(component); } @Deprecated public LinearDRPCInputDeclarer addBolt(IRichBolt bolt) { return addBolt(bolt, null); } public LinearDRPCInputDeclarer addBolt(IBasicBolt bolt, Number parallelism) { return addBolt(new BasicBoltExecutor(bolt), parallelism); } public LinearDRPCInputDeclarer addBolt(IBasicBolt bolt) { return addBolt(bolt, null); } public StormTopology createLocalTopology(ILocalDRPC drpc) { return createTopology(new DRPCSpout(_function, drpc)); } public StormTopology createRemoteTopology() { return createTopology(new DRPCSpout(_function)); } private StormTopology createTopology(DRPCSpout spout) { final String SPOUT_ID = "spout"; final String PREPARE_ID = "prepare-request"; TopologyBuilder builder = new TopologyBuilder(); builder.setSpout(SPOUT_ID, spout); builder.setBolt(PREPARE_ID, new PrepareRequest()) .noneGrouping(SPOUT_ID); int i = 0; for (; i < _components.size(); i++) { Component component = _components.get(i); Map<String, SourceArgs> source = new HashMap<String, SourceArgs>(); if (i == 1) { source.put(boltId(i - 1), SourceArgs.single()); } else if (i >= 2) { source.put(boltId(i - 1), SourceArgs.all()); } IdStreamSpec idSpec = null; if (i == _components.size() - 1 && component.bolt instanceof FinishedCallback) { idSpec = IdStreamSpec.makeDetectSpec(PREPARE_ID, PrepareRequest.ID_STREAM); } BoltDeclarer declarer = builder.setBolt(boltId(i), new CoordinatedBolt(component.bolt, source, idSpec), component.parallelism); for (Map conf : component.componentConfs) { declarer.addConfigurations(conf); } if (idSpec != null) { declarer.fieldsGrouping(idSpec.getGlobalStreamId() .get_componentId(), PrepareRequest.ID_STREAM, new Fields("request")); } if (i == 0 && component.declarations.isEmpty()) { declarer.noneGrouping(PREPARE_ID, PrepareRequest.ARGS_STREAM); } else { String prevId; if (i == 0) { prevId = PREPARE_ID; } else { prevId = boltId(i - 1); } for (InputDeclaration declaration : component.declarations) { declaration.declare(prevId, declarer); } } if (i > 0) { declarer.directGrouping(boltId(i - 1), Constants.COORDINATED_STREAM_ID); } } IRichBolt lastBolt = _components.get(_components.size() - 1).bolt; OutputFieldsGetter getter = new OutputFieldsGetter(); lastBolt.declareOutputFields(getter); Map<String, StreamInfo> streams = getter.getFieldsDeclaration(); if (streams.size() != 1) { throw new RuntimeException( "Must declare exactly one stream from last bolt in LinearDRPCTopology"); } String outputStream = streams.keySet().iterator().next(); List<String> fields = streams.get(outputStream).get_output_fields(); if (fields.size() != 2) { throw new RuntimeException( "Output stream of last component in LinearDRPCTopology must contain exactly two fields. The first should be the request id, and the second should be the result."); } <span style="color:#ff0000;"><span style="background-color: rgb(153, 153, 153);">builder.setBolt(boltId(i), new JoinResult(PREPARE_ID))</span> .fieldsGrouping(boltId(i - 1), outputStream, new Fields(fields.get(0))) .fieldsGrouping(PREPARE_ID, PrepareRequest.RETURN_STREAM, new Fields("request")); i++; <span style="background-color: rgb(153, 153, 153);">builder.setBolt(boltId(i), new ReturnResults()).noneGrouping(</span> boltId(i - 1)); return builder.createTopology();</span> } private static String boltId(int index) { return "bolt" + index; } private static class Component { public IRichBolt bolt; public int parallelism; public List<Map> componentConfs; public List<InputDeclaration> declarations = new ArrayList<InputDeclaration>(); public Component(IRichBolt bolt, int parallelism) { this.bolt = bolt; this.parallelism = parallelism; this.componentConfs = new ArrayList(); } } private static interface InputDeclaration { public void declare(String prevComponent, InputDeclarer declarer); } private class InputDeclarerImpl extends BaseConfigurationDeclarer<LinearDRPCInputDeclarer> implements LinearDRPCInputDeclarer { Component _component; public InputDeclarerImpl(Component component) { _component = component; } @Override public LinearDRPCInputDeclarer fieldsGrouping(final Fields fields) { addDeclaration(new InputDeclaration() { @Override public void declare(String prevComponent, InputDeclarer declarer) { declarer.fieldsGrouping(prevComponent, fields); } }); return this; } @Override public LinearDRPCInputDeclarer fieldsGrouping(final String streamId, final Fields fields) { addDeclaration(new InputDeclaration() { @Override public void declare(String prevComponent, InputDeclarer declarer) { declarer.fieldsGrouping(prevComponent, streamId, fields); } }); return this; } @Override public LinearDRPCInputDeclarer globalGrouping() { addDeclaration(new InputDeclaration() { @Override public void declare(String prevComponent, InputDeclarer declarer) { declarer.globalGrouping(prevComponent); } }); return this; } @Override public LinearDRPCInputDeclarer globalGrouping(final String streamId) { addDeclaration(new InputDeclaration() { @Override public void declare(String prevComponent, InputDeclarer declarer) { declarer.globalGrouping(prevComponent, streamId); } }); return this; } @Override public LinearDRPCInputDeclarer shuffleGrouping() { addDeclaration(new InputDeclaration() { @Override public void declare(String prevComponent, InputDeclarer declarer) { declarer.shuffleGrouping(prevComponent); } }); return this; } @Override public LinearDRPCInputDeclarer shuffleGrouping(final String streamId) { addDeclaration(new InputDeclaration() { @Override public void declare(String prevComponent, InputDeclarer declarer) { declarer.shuffleGrouping(prevComponent, streamId); } }); return this; } @Override public LinearDRPCInputDeclarer localOrShuffleGrouping() { addDeclaration(new InputDeclaration() { @Override public void declare(String prevComponent, InputDeclarer declarer) { declarer.localOrShuffleGrouping(prevComponent); } }); return this; } @Override public LinearDRPCInputDeclarer localOrShuffleGrouping( final String streamId) { addDeclaration(new InputDeclaration() { @Override public void declare(String prevComponent, InputDeclarer declarer) { declarer.localOrShuffleGrouping(prevComponent, streamId); } }); return this; } @Override public LinearDRPCInputDeclarer noneGrouping() { addDeclaration(new InputDeclaration() { @Override public void declare(String prevComponent, InputDeclarer declarer) { declarer.noneGrouping(prevComponent); } }); return this; } @Override public LinearDRPCInputDeclarer noneGrouping(final String streamId) { addDeclaration(new InputDeclaration() { @Override public void declare(String prevComponent, InputDeclarer declarer) { declarer.noneGrouping(prevComponent, streamId); } }); return this; } @Override public LinearDRPCInputDeclarer allGrouping() { addDeclaration(new InputDeclaration() { @Override public void declare(String prevComponent, InputDeclarer declarer) { declarer.allGrouping(prevComponent); } }); return this; } @Override public LinearDRPCInputDeclarer allGrouping(final String streamId) { addDeclaration(new InputDeclaration() { @Override public void declare(String prevComponent, InputDeclarer declarer) { declarer.allGrouping(prevComponent, streamId); } }); return this; } @Override public LinearDRPCInputDeclarer directGrouping() { addDeclaration(new InputDeclaration() { @Override public void declare(String prevComponent, InputDeclarer declarer) { declarer.directGrouping(prevComponent); } }); return this; } @Override public LinearDRPCInputDeclarer directGrouping(final String streamId) { addDeclaration(new InputDeclaration() { @Override public void declare(String prevComponent, InputDeclarer declarer) { declarer.directGrouping(prevComponent, streamId); } }); return this; } @Override public LinearDRPCInputDeclarer customGrouping( final CustomStreamGrouping grouping) { addDeclaration(new InputDeclaration() { @Override public void declare(String prevComponent, InputDeclarer declarer) { declarer.customGrouping(prevComponent, grouping); } }); return this; } @Override public LinearDRPCInputDeclarer customGrouping(final String streamId, final CustomStreamGrouping grouping) { addDeclaration(new InputDeclaration() { @Override public void declare(String prevComponent, InputDeclarer declarer) { declarer.customGrouping(prevComponent, streamId, grouping); } }); return this; } private void addDeclaration(InputDeclaration declaration) { _component.declarations.add(declaration); } @Override public LinearDRPCInputDeclarer addConfigurations(Map conf) { _component.componentConfs.add(conf); return this; } } }
package backtype.storm.drpc; import backtype.storm.Config; import backtype.storm.ILocalDRPC; import backtype.storm.generated.DRPCRequest; import backtype.storm.generated.DistributedRPCInvocations; import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichSpout; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; import backtype.storm.utils.ServiceRegistry; import backtype.storm.utils.Utils; import com.alibaba.fastjson.JSON; import org.apache.thrift7.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; public class DRPCSpout extends BaseRichSpout { public static Logger LOG = LoggerFactory.getLogger(DRPCSpout.class); SpoutOutputCollector _collector; List<DRPCInvocationsClient> _clients = new ArrayList<DRPCInvocationsClient>(); String _function; String _local_drpc_id = null; private static class DRPCMessageId { String id; int index; public DRPCMessageId(String id, int index) { this.id = id; this.index = index; } } public DRPCSpout(String function) { _function = function; } public DRPCSpout(String function, ILocalDRPC drpc) { _function = function; _local_drpc_id = drpc.getServiceId(); } @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { _collector = collector; if (_local_drpc_id == null) { int numTasks = context.getComponentTasks( context.getThisComponentId()).size(); int index = context.getThisTaskIndex(); int port = Utils.getInt(conf.get(Config.DRPC_INVOCATIONS_PORT)); List<String> servers = (List<String>) conf.get(Config.DRPC_SERVERS); if (servers == null || servers.isEmpty()) { throw new RuntimeException( "No DRPC servers configured for topology"); } if (numTasks < servers.size()) { for (String s : servers) { _clients.add(new DRPCInvocationsClient(s, port)); } } else { int i = index % servers.size(); _clients.add(new DRPCInvocationsClient(servers.get(i), port)); } } } @Override public void close() { for (DRPCInvocationsClient client : _clients) { client.close(); } } @Override public void nextTuple() { boolean gotRequest = false; if (_local_drpc_id == null) { for (int i = 0; i < _clients.size(); i++) { DRPCInvocationsClient client = _clients.get(i); try { DRPCRequest req = client.fetchRequest(_function); if (req.get_request_id().length() > 0) { Map returnInfo = new HashMap(); returnInfo.put("id", req.get_request_id()); returnInfo.put("host", client.getHost()); returnInfo.put("port", client.getPort()); gotRequest = true; _collector.emit(new Values(req.get_func_args(), JSON.toJSONString(returnInfo)), new DRPCMessageId(req.get_request_id(), i)); break; } } catch (TException e) { LOG.error("Failed to fetch DRPC result from DRPC server", e); } } } else { DistributedRPCInvocations.Iface drpc = (DistributedRPCInvocations.Iface) ServiceRegistry .getService(_local_drpc_id); if (drpc != null) { // can happen during shutdown of drpc while // topology is still up try { DRPCRequest req = drpc.fetchRequest(_function); if (req.get_request_id().length() > 0) { Map returnInfo = new HashMap(); returnInfo.put("id", req.get_request_id()); returnInfo.put("host", _local_drpc_id); returnInfo.put("port", 0); gotRequest = true; _collector.emit(new Values(req.get_func_args(), JSON.toJSONString(returnInfo)), new DRPCMessageId(req.get_request_id(), 0)); } } catch (TException e) { throw new RuntimeException(e); } } } if (!gotRequest) { Utils.sleep(1); } } @Override public void ack(Object msgId) { } @Override public void fail(Object msgId) { DRPCMessageId did = (DRPCMessageId) msgId; DistributedRPCInvocations.Iface client; if (_local_drpc_id == null) { client = _clients.get(did.index); } else { client = (DistributedRPCInvocations.Iface) ServiceRegistry .getService(_local_drpc_id); } try { client.failRequest(did.id); } catch (TException e) { LOG.error("Failed to fail request", e); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("args", "return-info")); } }
returnResult源码
package backtype.storm.drpc; import backtype.storm.Config; import backtype.storm.generated.DistributedRPCInvocations; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichBolt; import backtype.storm.tuple.Tuple; import backtype.storm.utils.ServiceRegistry; import backtype.storm.utils.Utils; import com.alibaba.fastjson.JSON; import org.apache.thrift7.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; public class ReturnResults extends BaseRichBolt { public static final Logger LOG = LoggerFactory .getLogger(ReturnResults.class); OutputCollector _collector; boolean local; Map<List, DRPCInvocationsClient> _clients = new HashMap<List, DRPCInvocationsClient>(); @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { _collector = collector; local = stormConf.get(Config.STORM_CLUSTER_MODE).equals("local"); } @Override public void execute(Tuple input) { String result = (String) input.getValue(0); String returnInfo = (String) input.getValue(1); if (returnInfo != null) { Map retMap = (Map) JSON.parse(returnInfo); final String host = (String) retMap.get("host"); final int port = Utils.getInt(retMap.get("port")); String id = (String) retMap.get("id"); DistributedRPCInvocations.Iface client; if (local) { client = (DistributedRPCInvocations.Iface) ServiceRegistry .getService(host); } else { List server = new ArrayList() { { add(host); add(port); } }; if (!_clients.containsKey(server)) { _clients.put(server, new DRPCInvocationsClient(host, port)); } client = _clients.get(server); } try { client.result(id, result); _collector.ack(input); } catch (TException e) { LOG.error("Failed to return results to DRPC server", e); _collector.fail(input); } } } @Override public void cleanup() { for (DRPCInvocationsClient c : _clients.values()) { c.close(); } } public void declareOutputFields(OutputFieldsDeclarer declarer) { } }