现在的位置: 首页 > 综合 > 正文

ActiveMQ in Action(6)

2018年08月07日 ⁄ 综合 ⁄ 共 11632字 ⁄ 字号 评论关闭

2.6 Features
    ActiveMQ包含了很多功能强大的特性,下面简要介绍其中的几个。
2.6.1
Exclusive Consumer
   
Queue中的消息是按照顺序被分发到consumers的。然而,当你有多个consumers同时从相同的queue中提取消息时,你将失去这个保
证。因为这些消息是被多个线程并发的处理。有的时候,保证消息按照顺序处理是很重要的。例如,你可能不希望在插入订单操作结束之前执行更新这个订单的操
作。
    ActiveMQ从4.x版本起开始支持Exclusive Consumer (或者说Exclusive Queues)。
Broker会从多个consumers中挑选一个consumer来处理queue中所有的消息,从而保证了消息的有序处理。如果这个consumer
失效,那么broker会自动切换到其它的consumer。
    可以通过Destination Options
来创建一个Exclusive Consumer,如下:

Java代码 复制代码
  1. queue = 
    new


     ActiveMQQueue(
    "TEST.QUEUE?consumer.exclusive=true"

    );
      

  2. consumer = session.createConsumer(queue);  
Java代码
  1. queue = 
    new
     ActiveMQQueue(
    "TEST.QUEUE?consumer.exclusive=true"
    );  
  2. consumer = session.createConsumer(queue);  

    顺便说一下,可以给consumer设置优先级,以便针对网络情况(如network hops)进行优化,如下:

Java代码 复制代码
  1. queue = 
    new


     ActiveMQQueue(
    "TEST.QUEUE?consumer.exclusive=true &consumer.priority=10"

    );  
Java代码
  1. queue = 
    new
     ActiveMQQueue(
    "TEST.QUEUE?consumer.exclusive=true &consumer.priority=10"
    );  

 

2.6.2 Message Groups
    用Apache官方文档的话说,Message Groups
rock!它是Exclusive Consumer功能的增强。逻辑上,Message Groups 可以看成是一种并发的Exclusive
Consumer。跟所有的消息都由唯一的consumer处理不同,JMS 消息属性JMSXGroupID 被用来区分message
group。Message Groups特性保证所有具有相同JMSXGroupID
的消息会被分发到相同的consumer(只要这个consumer保持active)。另外一方面,Message
Groups特性也是一种负载均衡的机制。
   
在一个消息被分发到consumer之前,broker首先检查消息JMSXGroupID属性。如果存在,那么broker
会检查是否有某个consumer拥有这个message
group。如果没有,那么broker会选择一个consumer,并将它关联到这个message
group。此后,这个consumer会接收这个message group的所有消息,直到:

  • Consumer被关闭。
  • Message group被关闭。通过发送一个消息,并设置这个消息的JMSXGroupSeq为0。

   从4.1版本开始,ActiveMQ支持一个布尔字段JMSXGroupFirstForConsumer 。当某个message
group的第一个消息被发送到consumer的时候,这个字段被设置。如果客户使用failover
transport连接到broker。在由于网络问题等造成客户重新连接到broker的时候,相同message
group的消息可能会被分发到不同与之前的consumer,因此JMSXGroupFirstForConsumer字段也会被重新设置。 

   以下是使用message groups的例子:

Java代码 复制代码
  1. Mesasge message = session.createTextMessage(
    "<foo>hey</foo>"

    );
      

  2. message.setStringProperty(
    "JMSXGroupID"


    "IBM_NASDAQ_20/4/05"

    );
      
  3. ...   
  4. producer.send(message);  
Java代码
  1. Mesasge message = session.createTextMessage(
    "<foo>hey</foo>"
    );  
  2. message.setStringProperty("JMSXGroupID"

    "IBM_NASDAQ_20/4/05"
    );  
  3. ...  
  4. producer.send(message);  


2.6.3 JMS Selectors
    JMS Selectors用于在订阅中,基于消息属性对进行消息的过滤。JMS
Selectors由SQL92语法定义。以下是个Selectors的例子:

Java代码 复制代码
  1. consumer = session.createConsumer(destination, 
    "JMSType = 'car' AND weight > 2500"

    );  
Java代码
  1. consumer = session.createConsumer(destination, 
    "JMSType = 'car' AND weight > 2500"
    );  

     在JMS Selectors表达式中,可以使用IN、NOT IN、LIKE等,例如:
    LIKE '12%3'
('123' true,'12993' true,'1234' false)
    LIKE 'l_se' ('lose'
true,'loose' false)
    LIKE '/_%' ESCAPE '/' ('_foo' true,'foo'
false)
    需要注意的是,JMS
Selectors表达式中的日期和时间需要使用标准的long型毫秒值。另外表达式中的属性不会自动进行类型转换,例如:

Java代码 复制代码
  1. myMessage.setStringProperty(
    "NumberOfOrders"


    "2"

    );  
Java代码
  1. myMessage.setStringProperty(
    "NumberOfOrders"

    "2"
    );  

    "NumberOfOrders > 1" 求值结果是false。关于JMS
Selectors的详细文档请参考javax.jms.Message的javadoc。
    上一小节介绍的Message
Groups虽然可以保证具有相同message
group的消息被唯一的consumer顺序处理,但是却不能确定被哪个consumer处理。在某些情况下,Message
Groups可以和JMS Selector一起工作,例如:
   
设想有三个consumers分别是A、B和C。你可以在producer中为消息设置三个message
groups分别是"A"、"B"和"C"。然后令consumer A使用"JMXGroupID =
'A'"作为selector。B和C也同理。这样就可以保证message group A的消息只被consumer
A处理。需要注意的是,这种做法有以下缺点:

  • producer必须知道当前正在运行的consumers,也就是说producer和consumer被耦合到一起。
  • 如果某个consumer失效,那么应该被这个consumer消费的消息将会一直被积压在broker上。

2.6.4 Pending Message Limit Strategy
   
首先简要介绍一下prefetch机制。ActiveMQ通过prefetch机制来提高性能,这意味这客户端的内存里可能会缓存一定数量的消息。缓存消
息的数量由prefetch limit来控制。当某个consumer的prefetch
buffer已经达到上限,那么broker不会再向consumer分发消息,直到consumer向broker发送消息的确认。可以通过在
ActiveMQConnectionFactory或者ActiveMQConnection上设置ActiveMQPrefetchPolicy对象
来配置prefetch policy。也可以通过connection options或者destination options来配置。例如:
   
tcp://localhost:61616?jms.prefetchPolicy.all=50
   
tcp://localhost:61616?jms.prefetchPolicy.queuePrefetch=1
    queue =
new ActiveMQQueue("TEST.QUEUE?consumer.prefetchSize=10");
   
prefetch size的缺省值如下:

  • persistent queues (default value: 1000)
  • non-persistent queues (default value: 1000)
  • persistent topics (default value: 100)
  • non-persistent topics (default value: Short.MAX_VALUE -1)

   
慢消费者会在非持久的topics上导致问题:一旦消息积压起来,会导致broker把大量消息保存在内存中,broker也会因此而变慢。未来
ActiveMQ可能会实现磁盘缓存,但是这也还是会存在性能问题。目前ActiveMQ使用Pending Message Limit
Strategy来解决这个问题。除了prefetch
buffer之外,你还要配置缓存消息的上限,超过这个上限后,新消息到来时会丢弃旧消息。通过在配置文件的destination
map中配置PendingMessageLimitStrategy,可以为不用的topic namespace配置不同的策略。目前有以下两种:

  • ConstantPendingMessageLimitStrategy。这个策略使用常量限制。

    如:<constantPendingMessageLimitStrategy limit="50"/>
  • PrefetchRatePendingMessageLimitStrategy。这个策略使用prefetch size的倍数限制。

    如:<prefetchRatePendingMessageLimitStrategy multiplier="2.5"/>

   在以上两种方式中,如果设置0意味着除了prefetch之外不再缓存消息;如果设置-1意味着禁止丢弃消息。 
   
此外,你还可以配置消息的丢弃策略,目前有以下两种:

  • oldestMessageEvictionStrategy。这个策略丢弃最旧的消息。
  • oldestMessageWithLowestPriorityEvictionStrategy。这个策略丢弃最旧的,而且具有最低优先级的
    消息。

   以下是个ActiveMQ配置文件的例子:

Xml代码 复制代码
  1. <
    broker


     
    persistent

    =
    "false"

     
    brokerName

    =
    "${brokername}"

     
    xmlns

    =
    "http://activemq.org/config/1.0"

    >


      
  2.     
    <
    destinationPolicy
    >


      
  3.       
    <
    policyMap
    >


      
  4.         
    <
    policyEntries
    >


      
  5.           
    <
    policyEntry


     
    topic

    =
    "PRICES.>"

    >


      
  6.             
    <!--  10 seconds worth -->

      
  7.             
    <
    subscriptionRecoveryPolicy
    >


      
  8.               
    <
    timedSubscriptionRecoveryPolicy


     
    recoverDuration

    =
    "10000"

     
    />


      
  9.             
    </
    subscriptionRecoveryPolicy
    >


      
  10.                
  11.             
    <!-- lets force old messages to be discarded for slow consumers -->

      
  12.             
    <
    pendingMessageLimitStrategy
    >


      
  13.               
    <
    constantPendingMessageLimitStrategy


     
    limit

    =
    "10"

    />


      
  14.             
    </
    pendingMessageLimitStrategy
    >


      
  15.           
    </
    policyEntry
    >


      
  16.         
    </
    policyEntries
    >


      
  17.       
    </
    policyMap
    >


      
  18.     
    </
    destinationPolicy
    >


      
  19.     ...   
  20. </
    broker
    >


      
Xml代码
  1. <
    broker
     
    persistent
    =
    "false"
     
    brokerName
    =
    "${brokername}"
     
    xmlns
    =
    "http://activemq.org/config/1.0"
    >
      
  2.     <
    destinationPolicy
    >
      
  3.       <
    policyMap
    >
      
  4.         <
    policyEntries
    >
      
  5.           <
    policyEntry
     
    topic
    =
    "PRICES.>"
    >
      
  6.             <!--  10 seconds worth -->
      
  7.             <
    subscriptionRecoveryPolicy
    >
      
  8.               <
    timedSubscriptionRecoveryPolicy
     
    recoverDuration
    =
    "10000"
     
    />
      
  9.             </
    subscriptionRecoveryPolicy
    >
      
  10.               
  11.             <!-- lets force old messages to be discarded for slow consumers -->
      
  12.             <
    pendingMessageLimitStrategy
    >
      
  13.               <
    constantPendingMessageLimitStrategy
     
    limit
    =
    "10"
    />
      
  14.             </
    pendingMessageLimitStrategy
    >
      
  15.           </
    policyEntry
    >
      
  16.         </
    policyEntries
    >
      
  17.       </
    policyMap
    >
      
  18.     </
    destinationPolicy
    >
      
  19.     ...  
  20. </
    broker
    >
      

 

2.6.5 Composite Destinations
    从1.1版本起, ActiveMQ支持composite
destinations。它允许用一个虚拟的destination 代表多个destinations。例如你可以通过composite
destinations在一个操作中同时向12个queue发送消息。在composite
destinations中,多个destination之间采用","分割。例如:

Java代码 复制代码
  1. Queue queue = 
    new


     ActiveMQQueue(
    "FOO.A,FOO.B,FOO.C"

    );  
Java代码
  1. Queue queue = 
    new
     ActiveMQQueue(
    "FOO.A,FOO.B,FOO.C"
    );  

   如果你希望使用不同类型的destination,那么需要加上前缀如queue:// 或topic://,例如: 

Java代码 复制代码
  1. Queue queue = 
    new


     ActiveMQQueue(
    "FOO.A,topic://NOTIFY.FOO.A"

    );   
Java代码
  1. Queue queue = 
    new
     ActiveMQQueue(
    "FOO.A,topic://NOTIFY.FOO.A"
    );   

   以下是ActiveMQ配置文件进行配置的一个例子:

Xml代码 复制代码
  1. <
    destinationInterceptors
    >


      
  2.   
    <
    virtualDestinationInterceptor
    >


      
  3.     
    <
    virtualDestinations
    >


      
  4.       
    <
    compositeQueue


     
    name

    =
    "MY.QUEUE"

    >


      
  5.         
    <
    forwardTo
    >


      
  6.           
    <
    queue


     
    physicalName

    =
    "FOO"

     
    />


      
  7.           
    <
    topic


     
    physicalName

    =
    "BAR"

     
    />


      
  8.         
    </
    forwardTo
    >


      
  9.       
    </
    compositeQueue
    >


      
  10.     
    </
    virtualDestinations
    >


      
  11.   
    </
    virtualDestinationInterceptor
    >


      
  12. </
    destinationInterceptors
    >


      
Xml代码
  1. <
    destinationInterceptors
    >
      
  2.   <
    virtualDestinationInterceptor
    >
      
  3.     <
    virtualDestinations
    >
      
  4.       <
    compositeQueue
     
    name
    =
    "MY.QUEUE"
    >
      
  5.         <
    forwardTo
    >
      
  6.           <
    queue
     
    physicalName
    =
    "FOO"
     
    />
      
  7.           <
    topic
     
    physicalName
    =
    "BAR"
     
    />
      
  8.         </
    forwardTo
    >
      
  9.       </
    compositeQueue
    >
      
  10.     </
    virtualDestinations
    >
      
  11.   </
    virtualDestinationInterceptor
    >
      
  12. </
    destinationInterceptors
    >
      

   可以在转发前,先通过JMS Selector判断一个消息是否需要转发,例如:

Xml代码 复制代码
  1. <
    destinationInterceptors
    >


      
  2.   
    <
    virtualDestinationInterceptor
    >


      
  3.     
    <
    virtualDestinations
    >


      
  4.       
    <
    compositeQueue


     
    name

    =
    "MY.QUEUE"

    >


      
  5.         
    <
    forwardTo
    >


      
  6.           
    <
    filteredDestination


     
    selector

    =
    "odd = 'yes'"

     
    queue

    =
    "FOO"

    />


      
  7.           
    <
    filteredDestination


     
    selector

    =
    "i = 5"

     
    topic

    =
    "BAR"

    />


      
  8.         
    </
    forwardTo
    >


      
  9.       
    </
    compositeQueue
    >


      
  10.     
    </
    virtualDestinations
    >


      
  11.   
    </
    virtualDestinationInterceptor
    >


      
  12. </
    destinationInterceptors
    >


      
Xml代码
  1. <
    destinationInterceptors
    >
      
  2.   <
    virtualDestinationInterceptor
    >
      
  3.     <
    virtualDestinations
    >
      
  4.       <
    compositeQueue
     
    name
    =
    "MY.QUEUE"
    >
      
  5.         <
    forwardTo
    >
      
  6.           <
    filteredDestination
     
    selector
    =
    "odd = 'yes'"
     
    queue
    =
    "FOO"
    />
      
  7.           <
    filteredDestination
     
    selector
    =
    "i = 5"
     
    topic
    =
    "BAR"
    />
      
  8.         </
    forwardTo
    >
      
  9.       </
    compositeQueue
    >
      
  10.     </
    virtualDestinations
    >
      
  11.   </
    virtualDestinationInterceptor
    >
      
  12. </
    destinationInterceptors
    >
      

 

2.6.6 Mirrored Queues
   
每个queue中的消息只能被一个consumer消费。然而,有时候你可能希望能够监视生产者和消费者之间的消息流。你可以通过使用Virtual
Destinations 来建立一个virtual queue 来把消息转发到多个queues中。但是
为系统中每个queue都进行如此的配置可能会很麻烦。
    ActiveMQ支持Mirrored
Queues。Broker会把发送到某个queue的所有消息转发到一个名称类似的topic,因此监控程序可以订阅这个mirrored queue
topic。为了启用Mirrored
Queues,首先要将BrokerService的useMirroredQueues属性设置成true,然后可以通过
destinationInterceptors设置其它属性,如mirror
topic的前缀,缺省是"VirtualTopic.Mirror."。以下是ActiveMQ配置文件的一个例子:

Xml代码 复制代码
  1. <
    broker


     
    xmlns

    =
    "http://activemq.org/config/1.0"

     
    brokerName

    =
    "MirroredQueuesBroker1"

     
    useMirroredQueues

    =
    "true"

    >


      
  2.   
  3.   
    <
    transportConnectors
    >


      
  4.     
    <
    transportConnector


     
    uri

    =
    "tcp://localhost:61616"

    />


      
  5.   
    </
    transportConnectors
    >


      
  6.      
  7.   
    <
    destinationInterceptors
    >


      
  8.       
    <
    mirroredQueue


     
    copyMessage

     = 
    "true"

     
    prefix

    =
    "Mirror.Topic"

    />


      
  9.   
    </
    destinationInterceptors
    >


      
  10.   ...   
  11. </
    broker
    >


      
Xml代码
  1. <
    broker
     
    xmlns
    =
    "http://activemq.org/config/1.0"
     
    brokerName
    =
    "MirroredQueuesBroker1"
     
    useMirroredQueues
    =
    "true"
    >
      
  2.   
  3.   <
    transportConnectors
    >
      
  4.     <
    transportConnector
     
    uri
    =
    "tcp://localhost:61616"
    />
      
  5.   </
    transportConnectors
    >
      
  6.     
  7.   <
    destinationInterceptors
    >
      
  8.       <
    mirroredQueue
     
    copyMessage
     = 
    "true"
     
    prefix
    =
    "Mirror.Topic"
    />
      
  9.   </
    destinationInterceptors
    >
      
  10.   ...  
  11. </
    broker
    >
      

   
假如某个producer向名为Foo.Bar的queue中发送消息,那么你可以通过订阅名为Mirror.Topic.Foo.Bar的topic来
获得发送到Foo.Bar中的所有消息。 

抱歉!评论已关闭.