现在的位置: 首页 > 综合 > 正文

Apache ActiveMQ 负载均衡

2018年03月30日 ⁄ 综合 ⁄ 共 3012字 ⁄ 字号 评论关闭

前面讲到Apache ActiveMQ 集群配置方法 的实现,主要是为了解决点单故障的情况,如果要对AMQ进行分流、提高吞吐率,那么可以尝试搭建一个负载均衡的AMQ集群。

要搭建这样的一个环境也是非常的简单,我们只需要增加几行配置项到activemq.xml里就好,剩下的事情全部都由AMQ去做。

AMQ负载均衡的实现有三种方案:

1、static

2、Multicast Discovery

3、MasterSlave Discovery

可以参考官网实现:

http://activemq.apache.org/networks-of-brokers.html

本例使用的是静态路由static,他的缺点就是需要把已知的节点都要预先配置进去,不像动态路由灵活。

Apache ActiveMQ单点基本配置基础上,用新的activemq.xml替换:

activemq.xml:

<beans
  xmlns="http://www.springframework.org/schema/beans"
  xmlns:amq="http://activemq.apache.org/schema/core"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
  http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">

    <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
        <property name="locations">
            <value>file:${activemq.conf}/credentials.properties</value>
        </property>
    </bean>

    <broker xmlns="http://activemq.apache.org/schema/core" brokerName="brokerTester3" dataDirectory="${activemq.data}">
        <destinationPolicy>
            <policyMap>
              <policyEntries>
                <!-- 属性enableAudit=false,是防止消息在回流后被当做重复消息而不被转发 -->
                <policyEntry queue=">" producerFlowControl="false" memoryLimit="10mb" enableAudit="false">
                     <!-- 属性replayWhenNoConsumers=true,保证在该节点断开,并重启后,且consumers已经连接到另外一个节点上的情况下,消息自动回流到原始节点 -->
                     <networkBridgeFilterFactory>
			<conditionalNetworkBridgeFilterFactory replayWhenNoConsumers="true"/>
		     </networkBridgeFilterFactory>
                </policyEntry>
              </policyEntries>
            </policyMap>
        </destinationPolicy>

        <managementContext>
            <managementContext createConnector="false"/>
        </managementContext>

        <!-- 该节点的配置必须要在persistenceAdapter节点之前 -->
	<networkConnectors>
            <!-- 静态路由 -->
            <networkConnector uri="static:(tcp://192.168.0.87:61617)" duplex="true"/>
	</networkConnectors>
       
        <persistenceAdapter>
           <kahaDB directory="${activemq.data}/kahadb"
					enableIndexWriteAsync="true"
					enableJournalDiskSyncs="false"/>
        </persistenceAdapter>

          <systemUsage>
            <systemUsage>
                <memoryUsage>
                    <memoryUsage limit="1400 mb"/>
                </memoryUsage>
                <storeUsage>
                    <storeUsage limit="100 gb"/>
                </storeUsage>
                <tempUsage>
                    <tempUsage limit="50 gb"/>
                </tempUsage>
            </systemUsage>
        </systemUsage>

        <transportConnectors>
            <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
            <transportConnector name="openwire" uri="nio://0.0.0.0:61616?maximumConnections=1000&wireformat.maxFrameSize=104857600"/>
            <transportConnector name="amqp" uri="amqp://0.0.0.0:5671?maximumConnections=1000&wireformat.maxFrameSize=104857600"/>
        </transportConnectors>

        <shutdownHooks>
            <bean xmlns="http://www.springframework.org/schema/beans" class="org.apache.activemq.hooks.SpringContextHook" />
        </shutdownHooks>

    </broker>

    <import resource="jetty.xml"/>

</beans>

同样需要注意的是,如果在一台设备上部署多个AMQ实例,要注意修改对应的端口号。

e.g.:这里会存在一个问题,假设这里配置的节点数为3个节点(A,B,C),且配置了消息回流。

1、假设节点A不接收消费者,只接受生产者;

2、当消费者X连接到B节点消费A节点上的消息(由于AMQ的机制,B节点会预先消费A节点上一定数量的消息到B节点,默认值1000,且假设只有1000个消息待消费),在消费过程中B节点断开,且B节点上有未消费的消息存在,这时消费者X自动路由到C节点上想继续消费未消费完成的消息;

3、由于当前的未消费的消息都存储于B节点上,当B节点重启后按照当前的配置方式,AMQ的逻辑应该是当X与A节点建立连接后B节点消息会回流,且可以正常消费,而当前消费者X是连接到了C节点上。

如果在业务上发生上面的这个场景,以当前的集群负载配置方式是无法满足需求的。

抱歉!评论已关闭.