现在的位置: 首页 > 综合 > 正文

activemq 重新连接的机制

2018年08月11日 ⁄ 综合 ⁄ 共 3064字 ⁄ 字号 评论关闭

最近一个项目要用到ActiveMq,并且需要最大程度的保证消息不丢失。以前对activeMq不是很熟悉,完全是摸着石头过河,目前基
本配置都搞定了。只是对于它的自动重连一直找不到好的解决办法,我希望的效果是当一个broker(假设只有这一个,没有备用的)如果异常down掉的
话,那么监听程序能够等待broker重启后再自动重新连接。看了它的文档似乎
设置一下failover:(tcp://localhost:61616) 就可以了

 

Failover Transport
    Failover
Transport是一种重新连接的机制,它工作于其它transport的上层,用于建立可靠的传输。它的配置语法允许制定任意多个复合的URI。
Failover
transport会自动选择其中的一个URI来尝试建立连接。如果没有成功,那么会选择一个其它的URI来建立一个新的连接。以下是配置语法:
   
failover:(uri1,...,uriN)?transportOptions
   
failover:uri1,...,uriN
    Transport Options的可选值如下:
Option Name
Default Value Description
initialReconnectDelay 10 How long to wait
before the first reconnect attempt (in ms)
maxReconnectDelay 30000
The maximum amount of time we ever wait between reconnect attempts (in
ms)
useExponentialBackOff true Should an exponential backoff be used
between reconnect attempts
backOffMultiplier 2 The exponent used in
the exponential backoff attempts
maxReconnectAttempts 0 If not 0,
then this is the maximum number of reconnect attempts before an error is
sent back to the client
randomize true use a random algorithm to
choose the URI to use for reconnect from the list provided
backup
false initialize and hold a second transport connection - to enable fast
failover
  
例如:failover:(tcp://localhost:61616,tcp:
//remotehost:61616)?initialReconnectDelay=100

2.2.4 Discovery
transport
    Discovery transport是可靠的tranport。它使用Discovery
transport来定位用来连接的URI列表。以下是配置语法:
   
discovery:(discoveryAgentURI)?transportOptions
   
discovery:discoveryAgentURI
    Transport Options的可选值如下:
Option
Name Default Value Description
initialReconnectDelay 10 How long to
wait before the first reconnect attempt
maxReconnectDelay 30000 The
maximum amount of time we ever wait between reconnect attempts
useExponentialBackOff
true Should an exponential backoff be used btween reconnect attempts
backOffMultiplier
2 The exponent used in the exponential backoff attempts
maxReconnectAttempts
0 If not 0, then this is the maximum number of reconnect attempts
before an error is sent back to the client
  
例如:discovery:(multicast://default)?initialReconnectDelay=100  
   
为了使用Discovery来发现broker,需要为broker启用discovery agent。 以下是XML配置文件中的一个例子:
Xml
代码 
1. <broker name="foo"> 
2.   
<transportConnectors> 
3.       <transportConnector
uri="tcp://localhost:0" discoveryUri="multicast://default"/> 
4.
    </transportConnectors> 
5.     ... 
6.
</broker> 
   在使用Failover Transport或Discovery
transport等能够自动重连的transport的时候,需要注意的是:设想有两个broker,它们都启用AMQ Message
Store作为持久化存储,有一个producer和一个consumer连接到某个queue。当因其中一个broker失效时而切换到另一个
broker的时候,如果失效的broker的queue中还有未被consumer消费的消息,那么这个queue里的消息仍然滞留在失效broker

的中,直到失效的broker被修复并重新切换回这个被修复的broker后,之前被保留的消息才会被consumer消费掉。如果被处理的消息有时序限
制,那么应用程序就需要处理这个问题。另外也可以通过ActiveMQ集群来解决这个问题。
  
在transport重连的时候,可以在connection上注册TransportListener来获得回调,例如:
Java代码 
1.
(ActiveMQConnection)connection).addTransportListener(new
TransportListener() { 
2.     public void onCommand(Object cmd) { 
3.
    } 
4.  
5.     public void onException(IOException exp) { 
6.
    } 
7.  
8.     public void transportInterupted() { 
9.
        // The transport has suffered an interruption from which it
hopes to recover. 
10.     } 
11.  
12.     public void
transportResumed() { 
13.         // The transport has resumed after
an interruption. 
14.     } 
15. }); 

---------------------

我的做法是
不要在服务器端设置,而在本地设置:failover:
(tcp://192.168.0.245:61616?wireFormat.maxInactivityDuration=0)

抱歉!评论已关闭.