我们使用jms一般是使用spring-jms和activemq相结合,通过spring的JmsTemplate发送消息到指定的Destination。
- <bean
id="connectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" -
-
name="connectionFactory"> -
class="org.apache.activemq.ActiveMQConnectionFactor y" > -
name="brokerURL" -
/> -
-
-
name="maxConnections" value="1"></property> - </bean>
定义jmsTempalte的实例:
- <bean
id="oamTmpTopic" class="org.apache.activemq.command.ActiveMQTopic"> -
value="oamTmpTopic" /> - </bean>
-
- <bean
id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> -
name="connectionFactory" ref="connectionFactory" /> -
name="defaultDestination" ref="oamTmpTopic" /> -
name="explicitQosEnabled" value="true" /> -
name="deliveryMode" value="1" /> - </bean>
定义生产者SendMessage.java:
- import
javax.jms.JMSException; - import
javax.jms.Message; - import
javax.jms.Session; - import
javax.jms.TextMessage; - import
javax.jms.Topic; -
- import
org.springframework.jms.core.JmsTemplate; - import
org.springframework.jms.core.MessageCreator; -
- public
class SendMessage { -
-
JmsTemplate jmsTemplate; -
-
String topicName; -
-
Topic topic; -
-
void setJmsTemplate(JmsTemplate jmsTemplate) {
-
= jmsTemplate; -
} -
-
void setTopicName(String topicName) {
-
= topicName; -
} -
-
void sendMessage( finalString message) {
-
-
{ -
(topic null)== {
-
topic = jmsTemplate.getConnectionFactory().createConnection() -
.createSession( Session.AUTO_ACKNOWLEDGE)
-
.createTopic(topicName); -
} -
jmsTemplate.send(topic, MessageCreator() {
-
-
-
Message createMessage(Session session)
-
JMSException {
-
-
TextMessage textMessage = session -
.createTextMessage(message);
-
textMessage;
-
} -
}); -
} (JMSException e) { -
e.printStackTrace(); -
} -
} - }
定义消费者TestListener.java:
- import
javax.jms.JMSException; - import
javax.jms.Message; - import
javax.jms.MessageListener; - import
javax.jms.Session; - import
javax.jms.Topic; -
- import
org.springframework.jms.core.JmsTemplate; - import
org.springframework.jms.listener.DefaultMessageListenerCo ntainer; -
- public
class TestListener implementsMessageListener{
-
-
JmsTemplate jmsTemplate; -
-
String topicName; -
-
TestListener(JmsTemplate jmsTemplate,String topicName){ -
-
= jmsTemplate; -
-
= topicName; -
-
Topic topic; -
{ -
topic =
-
Session.AUTO_ACKNOWLEDGE).createTopic(
-
-
DefaultMessageListenerCo ntainer dmc = DefaultMessageListenerCo ntainer();
-
dmc.setPubSubDomain( -
dmc.setDestination(topic); -
dmc.setConnectionFactory(
-
dmc.setPubSubNoLocal( -
dmc.setMessageListener( -
dmc.setSessionAcknowledgeMod e(Session.AUTO_ACKNOWLEDGE); -
dmc.initialize(); -
dmc.start(); -
} (JMSException e) { -
e.printStackTrace(); -
} -
} -
-
-
void onMessage(Message message) {
-
-
System.out.println(message); -
} -
- }
然后在spring的配置文件中定义相关的bean:
- <bean
id="testListener" class="net.kentop.test.jms.TestListener"> - <constructor-arg
ref="jmsTemplate"></constructor-arg> - <constructor-arg
value="testTopic"></constructor-arg> - </bean>
-
- <bean
id="sendMessage" class="net.kentop.test.jms.SendMessage"> - <property
name="jmsTemplate" ref="jmsTemplate"></property> - <property
name="topicName" value="testTopic"></property> - </bean>
编写测试代码BeanTest.java:
- import
org.springframework.context.ApplicationContext; - import
org.springframework.context.support.ClassPathXmlApplicationC ontext; -
- public
class BeanTest { -
-
static ApplicationContext newcontext = ClassPathXmlApplicationC "infrastructure-config.xml");ontext(
-
-
static void main(String args[]){
-
-
SendMessage sendMessage = (SendMessage) context.getBean(
-
-
sendMessage.sendMessage( -
sendMessage.sendMessage( -
sendMessage.sendMessage( on );baby!" -
sendMessage.sendMessage( -
sendMessage.sendMessage( -
sendMessage.sendMessage( on );baby!2" -
sendMessage.sendMessage( -
sendMessage.sendMessage( -
sendMessage.sendMessage( on );baby!3" -
sendMessage.sendMessage( -
sendMessage.sendMessage( -
sendMessage.sendMessage( on );baby!4" -
} - }
- DefaultMessageListenerCo
ntainer newdmc = DefaultMessageListenerCo ntainer();
- dmc.setPubSubDomain(true);
- dmc.setDestination(topic);
- dmc.setConnectionFactory(this.jmsTemplate2.getConnectionFactory());
-
dmc.setPubSubNoLocal(
-
dmc.setMessageListener(
- dmc.setSessionAcknowledgeMod
e(Session.AUTO_ACKNOWLEDGE);
- dmc.initialize();
- dmc.start();
当设置pubSubNoLocal为true时,消费者不会接收来自同一个连接的消息。因为我们在上面的配置文件中定义了连接池的最大连接数为1,因此每次使用的连接都是同一个连接,所以就消费者就接收不到消息。只有当pubSubNoLocal为false时,消费者才能接收到来自同一个连接的消息。