现在的位置: 首页 > 综合 > 正文

Apache ActiveMQ 高级应用 – 自定义分发策略

2019年06月20日 ⁄ 综合 ⁄ 共 2122字 ⁄ 字号 评论关闭

在某些业务场景下,可能会用到按需分发消息。

对于AMQ他内置了很多的分发策略可供我们选择(DispatchPolicy的实现类),如:PriorityDispatchPolicy, PriorityNetworkDispatchPolicy, RoundRobinDispatchPolicy, SimpleDispatchPolicy, StrictOrderDispatchPolicy。

那我们也可以自己去实现DispatchPolicy接口,做一个适合特定业务场景的分发策略。

首先我们要去http://svn.apache.org/repos/asf/activemq这里下载对应版本的源码;

本例中为方便起见,我们拷贝了一段SimpleDispatchPolicy类的代码(路径:org.apache.activemq.broker.region.policy),当做我们的自定义类的内容,如:

public class TestDispatchPolicy implements DispatchPolicy {

    public boolean dispatch(MessageReference node, MessageEvaluationContext msgContext, List consumers) throws Exception {

    	ActiveMQDestination _destination = node.getMessage().getDestination();
    	
    	// 取得Topic name 和前缀,如:topic://Topic.foo
    	System.out.println("-------->_destination.getQualifiedName:"+ _destination.getQualifiedName());
    	// 取得Topic name,如:Topic.foo
    	System.out.println("-------->_destination.getPhysicalName:"+ _destination.getPhysicalName());
        synchronized (consumers) {
            int count = 0;
            for (Iterator iter = consumers.iterator(); iter.hasNext();) {
                Subscription sub = (Subscription)iter.next();
                // 取得消费者的clientId,如:connection.setClientID("YourClientID");
                System.out.println("-------->sub.getContext().getClientId:"+ sub.getContext().getClientId());
                // Only dispatch to interested subscriptions
                if (!sub.matches(node, msgContext)) {
                    sub.unmatched(node);
                    continue;
                }

                sub.add(node);
                count++;
            }
            return count > 0;
        }
    }

}

ok,接着我们做如下几个步骤:

1、将这个类放到activemq project/activemq-broker的相应路径下,在activemq-project/目录下执行mvn package;

2、待maven执行完成后,将activemq-broker-version.jar和activemq-spring-version.jar放入到apache-activemq-version-bin/lib/目录下,替换对应的文件;

3、修改activemq.xml,加入元素dispatchPolicy,完成。

Apache ActiveMQ单点基本配置 activemq.xml为基础,修改的内容如下:

 <destinationPolicy>
            <policyMap>
              <policyEntries>
                <policyEntry topic=">" producerFlowControl="false" memoryLimit="10mb">

		    <dispatchPolicy>
                         <! -- 新增的分发策略 -->
                         <testDispatchPolicy/>
		    </dispatchPolicy>
                    <pendingMessageLimitStrategy>
                         <constantPendingMessageLimitStrategy limit="1000"/>
                    </pendingMessageLimitStrategy>
                </policyEntry>
                
              </policyEntries>
            </policyMap>
        </destinationPolicy>

至此我们就实现了Activemq的自定义分发策略功能,启动activemq,查看一下控制台。

参考资源:

http://activemq.apache.org/dispatch-policies.html

http://activemq.apache.org/maven/5.9.0/apidocs/index.html?deprecated-list.html

抱歉!评论已关闭.