现在的位置: 首页 > 综合 > 正文

activemq的连接池,通过spring的JmsTemplate发送消息到指定的Destination

2013年04月24日 ⁄ 综合 ⁄ 共 6360字 ⁄ 字号 评论关闭

我们使用jms一般是使用spring-jms和activemq相结合,通过spring的JmsTemplate发送消息到指定的Destination。

 

    首先定义一个activemq的连接池:

 

Xml代码
复制代码 收藏代码
  1. <bean id="connectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory"  
  2.     destroy-method="stop">  
  3.     <property name="connectionFactory">  
  4.         <bean class="org.apache.activemq.ActiveMQConnectionFactory">  
  5.             <property name="brokerURL"  
  6.                 value="failover:(tcp://192.168.20.23:61616?wireFormat.maxInactivityDuration=0)&amp;maxReconnectDelay=1000" />  
  7.         </bean>  
  8.     </property>  
  9.     <property name="maxConnections" value="1"></property>  
  10. </bean>  
 

定义jmsTempalte的实例:

 

Xml代码
复制代码 收藏代码
  1. <bean id="oamTmpTopic" class="org.apache.activemq.command.ActiveMQTopic">  
  2.     <constructor-arg value="oamTmpTopic" />  
  3. </bean>  
  4.   
  5. <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">  
  6.     <property name="connectionFactory" ref="connectionFactory" />  
  7.     <property name="defaultDestination" ref="oamTmpTopic" />  
  8.     <property name="explicitQosEnabled" value="true" />  
  9.     <property name="deliveryMode" value="1" />  
  10. </bean>  
定义生产者SendMessage.java:

 

Java代码
复制代码 收藏代码
  1. import javax.jms.JMSException;   
  2. import javax.jms.Message;   
  3. import javax.jms.Session;   
  4. import javax.jms.TextMessage;   
  5. import javax.jms.Topic;   
  6.   
  7. import org.springframework.jms.core.JmsTemplate;   
  8. import org.springframework.jms.core.MessageCreator;   
  9.   
  10. public class SendMessage {   
  11.   
  12.     private JmsTemplate jmsTemplate;   
  13.   
  14.     private String topicName;   
  15.   
  16.     private Topic topic;   
  17.   
  18.     public void setJmsTemplate(JmsTemplate jmsTemplate) {
      
  19.         this.jmsTemplate jmsTemplate;   
  20.     }   
  21.   
  22.     public void setTopicName(String topicName) {
      
  23.         this.topicName topicName;   
  24.     }   
  25.   
  26.     public void sendMessage(final String message) {
      
  27.   
  28.         try {   
  29.             if (topic == null{
      
  30.                 topic jmsTemplate.getConnectionFactory().createConnection()   
  31.                         .createSession(falseSession.AUTO_ACKNOWLEDGE)
      
  32.                         .createTopic(topicName);   
  33.             }   
  34.             jmsTemplate.send(topic,new MessageCreator() {
      
  35.   
  36.                 @Override  
  37.                 public Message createMessage(Session session)
      
  38.                         throws JMSException {
      
  39.   
  40.                     TextMessage textMessage session   
  41.                             .createTextMessage(message);
      
  42.                     return textMessage;
      
  43.                 }   
  44.             });   
  45.         catch (JMSException e) {   
  46.             e.printStackTrace();   
  47.         }   
  48.     }   
  49.  
 

定义消费者TestListener.java:

 

Java代码
复制代码 收藏代码
  1. import javax.jms.JMSException;   
  2. import javax.jms.Message;   
  3. import javax.jms.MessageListener;   
  4. import javax.jms.Session;   
  5. import javax.jms.Topic;   
  6.   
  7. import org.springframework.jms.core.JmsTemplate;   
  8. import org.springframework.jms.listener.DefaultMessageListenerContainer;   
  9.   
  10. public class TestListener implements MessageListener{
      
  11.        
  12.     private JmsTemplate jmsTemplate;   
  13.        
  14.     private String topicName;   
  15.        
  16.     public TestListener(JmsTemplate jmsTemplate,String topicName){   
  17.            
  18.         this.jmsTemplate jmsTemplate;   
  19.            
  20.         this.topicName topicName;   
  21.            
  22.         Topic topic;   
  23.         try {   
  24.             topic this.jmsTemplate.getConnectionFactory().createConnection().createSession(false,
      
  25.                     Session.AUTO_ACKNOWLEDGE).createTopic(this.topicName);
      
  26.                
  27.             DefaultMessageListenerContainer dmc new DefaultMessageListenerContainer();
      
  28.             dmc.setPubSubDomain(true);   
  29.             dmc.setDestination(topic);   
  30.             dmc.setConnectionFactory(this.jmsTemplate.getConnectionFactory());
      
  31.             dmc.setPubSubNoLocal(true);   
  32.             dmc.setMessageListener(this);   
  33.             dmc.setSessionAcknowledgeMode(Session.AUTO_ACKNOWLEDGE);   
  34.             dmc.initialize();   
  35.             dmc.start();   
  36.         catch (JMSException e) {   
  37.             e.printStackTrace();   
  38.         }   
  39.     }   
  40.   
  41.     @Override  
  42.     public void onMessage(Message message) {
      
  43.            
  44.         System.out.println(message);   
  45.     }   
  46.   
  47.  
然后在spring的配置文件中定义相关的bean:

 

Xml代码
复制代码 收藏代码
  1. <bean id="testListener" class="net.kentop.test.jms.TestListener">  
  2. <constructor-arg ref="jmsTemplate"></constructor-arg>  
  3. <constructor-arg value="testTopic"></constructor-arg>  
  4. </bean>  
  5.   
  6. <bean id="sendMessage" class="net.kentop.test.jms.SendMessage">  
  7. <property name="jmsTemplate" ref="jmsTemplate"></property>  
  8. <property name="topicName" value="testTopic"></property>  
  9. </bean>  
编写测试代码BeanTest.java:

 

Java代码
复制代码 收藏代码
  1. import org.springframework.context.ApplicationContext;   
  2. import org.springframework.context.support.ClassPathXmlApplicationContext;   
  3.   
  4. public class BeanTest {   
  5.   
  6.     public static ApplicationContext context new ClassPathXmlApplicationContext("infrastructure-config.xml");
      
  7.        
  8.     public static void main(String args[]){
      
  9.            
  10.         SendMessage sendMessage (SendMessage) context.getBean("sendMessage");
      
  11.            
  12.         sendMessage.sendMessage("hahahha,我来测试了");   
  13.         sendMessage.sendMessage("dfsdfsfsdfsdfsdf");   
  14.         sendMessage.sendMessage("come on baby!");   
  15.         sendMessage.sendMessage("hahahha,我来测试了2");   
  16.         sendMessage.sendMessage("dfsdfsfsdfsdfsdf2");   
  17.         sendMessage.sendMessage("come on baby!2");   
  18.         sendMessage.sendMessage("hahahha,我来测试了3");   
  19.         sendMessage.sendMessage("dfsdfsfsdfsdfsdf3");   
  20.         sendMessage.sendMessage("come on baby!3");   
  21.         sendMessage.sendMessage("hahahha,我来测试了4");   
  22.         sendMessage.sendMessage("dfsdfsfsdfsdfsdf4");   
  23.         sendMessage.sendMessage("come on baby!4");   
  24.     }   
  25.  
 

    但是这个时候会发现,消费者是无法接收到消费者消息的。因为我们在定义消费者时,定义了以下的代码:

 

Java代码
复制代码 收藏代码
  1. DefaultMessageListenerContainer dmc new DefaultMessageListenerContainer();
      
  2. dmc.setPubSubDomain(true);   
  3. dmc.setDestination(topic);   
  4. dmc.setConnectionFactory(this.jmsTemplate2.getConnectionFactory());    
  5.                      dmc.setPubSubNoLocal(true);           
      
  6.                      dmc.setMessageListener(this);
      
  7. dmc.setSessionAcknowledgeMode(Session.AUTO_ACKNOWLEDGE);   
  8. dmc.initialize();   
  9. dmc.start();  
   上面的代码中的:

 

Java代码
复制代码 收藏代码
  1. dmc.setPubSubNoLocal(true);  
 

   
当设置pubSubNoLocal为true时,消费者不会接收来自同一个连接的消息。因为我们在上面的配置文件中定义了连接池的最大连接数为1,因此每次使用的连接都是同一个连接,所以就消费者就接收不到消息。只有当pubSubNoLocal为false时,消费者才能接收到来自同一个连接的消息。

 

    当然,也可以设置连接池的最大连接数为多个,比如为10,这样就可能不会每次都是用同一个连接,消费者也可以接收到消息。但是这样的话,不是每个消息都可以接收到,因为这样的话不排除有时候消费者和生产者有使用同一个连接的可能。如果一定要设置pubSubNoLocal为true的话,那么就必须要使用不同的连接。

 

    在这里也要注意的是:

 

Java代码
复制代码

抱歉!评论已关闭.