现在的位置: 首页 > 综合 > 正文

DefaultMessageListenerContainer事务相关问题

2018年08月28日 ⁄ 综合 ⁄ 共 1552字 ⁄ 字号 评论关闭

使用DefaultMessageListenerContainer作为消息接收器,典型的配置如下:

<bean id="queueListenerContainer"
        class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="mqConnectionFactory" />
        <property name="destination" ref="queueIn" /><!-- 接收队列 -->
        <property name="concurrentConsumers" value="3" /><!-- 控制同时启几个concurrent listener threads -->
        <property name="messageListener" ref="messageReceiver" />
        <property name="transactionManager" ref="jmsTransactionManager" />
        <property name="sessionTransacted" value="true" />
</bean>

<bean id="jmsTransactionManager"
        class="org.springframework.jms.connection.JmsTransactionManager">
        <property name="connectionFactory" ref="mqConnectionFactory" />

</bean>

其中的JmsTransactionManager会将messageReceiver的onMessage方法当作一个事务,如果后续的消息处理程序耗时很长,比如我的系统就有一个耗时n个小时的消息处理程序,这样的话,onMessage方法作为一个事务就一直无法提交了,表现为消息队列中的消息是假的被取走了,也就是说这时候看到的队列中消息数量减少了一个,但是当事务回滚了或者进程意外终止,那个被取走的消息又回到队列中了,这也正是事务的作用。如果这个时候同时收到并正在处理的消息已经达到concurrentConsumers的数量,那么消息接收器DefaultMessageListenerContainer就不会再响应新的消息了。

(我曾经在论坛发过的相关问题:http://topic.csdn.net/u/20110301/16/8e73219d-667d-455d-87eb-43c0853f6f0b.html)

如果你的应用是将整个接收到处理完的过程作为一个事务,那么就需要按照原本的做法;如果你不希望因为事务处理时间过长而影响消息接收,那么可以在接收到消息后另外启动一个线程来做消息处理,这样的话onMessage方法的这个事务在启动了消息处理的线程之后就结束了。

在onMessage方法里:

1、作为一整个事务:

notifyObservers(messageStr);//一般都会将消息处理实现为Observer模式

2、单独启动一个线程:

Thread t = new Thread(new Runnable() {
               @Override
                public void run() {
                    // TODO Auto-generated method stub
                    notifyObservers(messageStr);
                }
});
t.start();

抱歉!评论已关闭.