现在的位置: 首页 > 综合 > 正文

Memcached 集群,客户端自动Hash到不同服务器的实现

2012年08月20日 ⁄ 综合 ⁄ 共 4525字 ⁄ 字号 评论关闭

首先分析一下Java client 启动时的部分代码 

Memcached 支持直接设置多个servers属性 来实现多个memcahced均衡,对应还有一个属性是weights,字面意思就是权重,分析了一下代码,和我想的是一样的 

启动memcached的代码通常是这样的 

Java代码 
  1. SockIOPool pool = SockIOPool.getInstance(poolname);  
  2. pool.setServers(servers);  
  3. pool.setWeights(weights);  
  4. pool.setInitConn(initConn);  
  5. pool.setMinConn(minConn);  
  6. pool.setMaxConn(maxConn);  
  7. pool.setMaxIdle(maxIdle);  
  8. pool.setMaxBusyTime(maxBusyTime);  
  9. pool.setMaintSleep(maintSleep);  
  10. pool.setSocketTO(socketTO);  
  11. pool.setSocketConnectTO(socketConnectTO);  
  12. pool.setNagle(nagle);  
  13. pool.setHashingAlg(SockIOPool.NEW_COMPAT_HASH);  
  14. pool.initialize();  
  15. MemCachedClient client = new MemCachedClient(poolname);  

servers 和 weights 都是一个数组,就是说可以同时设置多个server 

然后看一下 pool.initialize() 做了什么 

Java代码 
  1. availPool   = new HashMap<String,Map<SockIO,Long>>( servers.length * initConn );  
  2. busyPool    = new HashMap<String,Map<SockIO,Long>>( servers.length * initConn );  
  3. deadPool    = new IdentityHashMap<SockIO,Integer>();  
  4. hostDeadDur = new HashMap<String,Long>();  
  5. hostDead    = new HashMap<String,Date>();  
  6. maxCreate   = (poolMultiplier > minConn) ? minConn : minConn / poolMultiplier;   // only create up to maxCreate connections at once  
  7. if ( log.isDebugEnabled() ) {  
  8.     log.debug( "++++ initializing pool with following settings:" );  
  9.     log.debug( "++++ initial size: " + initConn );  
  10.     log.debug( "++++ min spare   : " + minConn );  
  11.     log.debug( "++++ max spare   : " + maxConn );  
  12. }  
  13. // if servers is not set, or it empty, then  
  14. // throw a runtime exception  
  15. if ( servers == null || servers.length <= 0 ) {  
  16.     log.error( "++++ trying to initialize with no servers" );  
  17.     throw new IllegalStateException( "++++ trying to initialize with no servers" );  
  18. }  
  19.     // initalize our internal hashing structures  
  20.         if ( this.hashingAlg == CONSISTENT_HASH )  
  21.         populateConsistentBuckets();  
  22.     else  
  23.             populateBuckets();  

看到这里就是开辟一些连接池的空间,然后调用了根据我们选择的hash 算法 执行populateBuckets();或者populateConsistentBuckets(); 

hash算法共有4种 

Java代码 
  1. // native String.hashCode();  
  2. public static final int NATIVE_HASH     = 0;      
  3. // original compatibility hashing algorithm (works with other clients)  
  4. public static final int OLD_COMPAT_HASH = 1;  
  5. // new CRC32 based compatibility hashing algorithm (works with other clients)     
  6. public static final int NEW_COMPAT_HASH = 2;  
  7. // MD5 Based -- Stops thrashing when a server added or removed  
  8. public static final int CONSISTENT_HASH = 3;          

我们通常用的是 NEW_COMPAT_HASH,这个保证可以wokrs with other clients 

所以看一下populateBuckets()做了什么 

Java代码 
  1. this.buckets = new ArrayList<String>();  
  2. for ( int i = 0; i < servers.length; i++ ) {  
  3.   if ( this.weights != null && this.weights.length > i ) {  
  4.      for ( int k = 0; k < this.weights[i].intValue(); k++ ) {  
  5.      this.buckets.add( servers[i] );  
  6.      if ( log.isDebugEnabled() )  
  7.         log.debug( "++++ added " + servers[i] + " to server bucket" );  
  8.      }  
  9.   }  
  10.   else {  
  11.      this.buckets.add( servers[i] );  
  12.   }  
  13.      // create initial connections  
  14.   for ( int j = 0; j < initConn; j++ ) {  
  15.           SockIO socket = createSocket( servers[i] );  
  16.       if ( socket == null ) {  
  17.         break;  
  18.           }  
  19.           addSocketToPool( availPool, servers[i], socket );  
  20.   }  
  21. }  

假如我们设置的servers是 192.168.0.1:44444和192.168.0.2:22222 
然后我们设置了weights是  5和3 那么 
buckets list的值最终会是 

192.168.0.1:44444, 
192.168.0.1:44444, 
192.168.0.1:44444, 
192.168.0.1:44444, 
192.168.0.1:44444, 
192.168.0.2:22222, 
192.168.0.2:22222. 
192.168.0.2:22222. 

然后就开始根据initCon初始连接数按servers分别创建socket 
那么究竟这个buckets做什么用呢? 

在我们使用set存放对象时会调用 

Java代码 
  1. SockIOPool.SockIO sock = pool.getSock( key, hashCode );  

看一看pool.getSock的代码 

Java代码 
  1. // get initial bucket  
  2. long bucket = getBucket( key, hashCode );  
  3. String server = ( this.hashingAlg == CONSISTENT_HASH )  
  4.         ? consistentBuckets.get( bucket )  
  5.         : buckets.get( (int)bucket );  

其中有段代码是这样的,看看getBucket 

Java代码 
  1. private long getBucket( String key, Integer hashCode ) {  
  2.     long hc = getHash( key, hashCode );  
  3.     if ( this.hashingAlg == CONSISTENT_HASH ) {  
  4.         return findPointFor( hc );  
  5.     }  
  6.     else {  
  7.          long bucket = hc % buckets.size();  
  8.         if ( bucket < 0 ) bucket *= -1;  
  9.             return bucket;  
  10.     }  
  11. }  

先不管key和hashCode,我们看到首先算出一个hc值后会直接做hc%buckets.size()实际上就是根据buckets的数量散列,最终值一定是buckets.size()范围里的一个值 
然后最终server值就根据buckets.get( (int)bucket )得到,那么假如我们得到bucket是3,则参照上面buckets 里的值,得到 list.get(3)=192.168.0.1:44444 所以会根据weight设置的值的不同得到不同的server ,如果 weights设置10:1 那buckets里就是10个相同的server和另一个不同的,将来散列得到的server很大可能性是servers里设置的第一个server。 

 

原文地址:http://bachmozart.javaeye.com/blog/211836

抱歉!评论已关闭.