现在的位置: 首页 > 综合 > 正文

XAPool连接池源码分析

2013年11月05日 ⁄ 综合 ⁄ 共 18576字 ⁄ 字号 评论关闭

 

 

连接池的结构类图已经在上面详细给出,现在我们来分析一个源码:

  1.      public GenericPool(PoolHelper helper, int minSize, int maxSize,
  2.             long lifeTime, long sleepTime, long maxLifeTime) {
  3.            //连接工厂
  4.         this.threadFactory = null;
  5.         //生命周期
  6.             this.lifeTime = lifeTime;
  7.         //最大生命周期
  8.             this.maxLifeTime = maxLifeTime;
  9.         //最小容量
  10.             this.minSize = minSize;
  11.         //最大容量
  12.             this.maxSize = maxSize;
  13.         //容器中包含对象的生产工厂
  14.             this.poolHelper = helper;
  15.             this.sleepTime = sleepTime;
  16.             this.checkLevelObject = 0;
  17.             this.deadLockMaxWait = DEFAULT_DEADLOCKMAXWAIT;
  18.             this.deadLockRetryWait = DEFAULT_DEADLOCKRETRYWAIT;
  19.         }

连接池的一些基本的参数设定;

  1. public synchronized void start() {
  2.         //已经在使用的对象队列
  3.         locked = new Hashtable(); // create locked objects pool
  4.         //当前可用的对象的队列
  5.         unlocked = new Hashtable(); // create unlocked objects pool
  6.         hitList = new Vector();
  7.         count = 0; // 0 element to start
  8.         gc = false// do not actions concerning garbage collector
  9.         //记录起始时间,开始计算对象生成的时间;
  10.         long now = System.currentTimeMillis(); // current time
  11.         // to obtain to the beginning minSize objects in the pool
  12.         //生成最小的对象个数
  13.         for (int i = 0; i < minSize; i++) { // count have to be equal to minSize
  14.             try {
  15.                 //抽象工厂模式,自定义工厂生产自定义的产品;把具体的工厂和产品解耦;
  16.                 GenerationObject genObject = poolHelper.create();
  17.                 //把数据和时间都都存储起来
  18.                 unlocked.put(genObject, new Long(now));
  19.                 // put it in the unlocked pool
  20.             } catch (Exception e) {
  21.                 log.error("Error Exception in GenericPool:start " + e);
  22.             }
  23.             //把当前连接池所有对象的个数增加;
  24.             ++count; // there is one more element in the pool
  25.         }
  26.         // keeper removes dead or useless objects
  27.         if (threadFactory != null) {
  28.             try {
  29.                 this.poolKeeper = new PoolKeeper(sleepTime, this);
  30.                 this.keeper = threadFactory.getThread(poolKeeper);
  31.             } catch (Exception e) {
  32.                 throw new IllegalStateException(e.getMessage());
  33.             }
  34.         } else {
  35.             // keep a handle to the poolkeeper so we can destroy it later
  36.             this.poolKeeper = new PoolKeeper(sleepTime, this);
  37.             this.keeper = new Thread(poolKeeper);
  38.         }
  39.         //启动一个连接池维护线程;
  40.         keeper.start();
  41.         // start the thread to verify element in the pool(unlocked)
  42.         log.debug("GenericPool:start pool started");
  43.     }

在上面就形成了连接池的一个基本的容器;

  1. public GenerationObject create() throws SQLException {
  2.         return create(getUser(), getPassword());
  3.     }
  4.     public GenerationObject create(String _user, String _password)
  5.         throws SQLException {
  6.         log.debug(
  7.             "StandardPoolDataSource:create create a connection for the pool");
  8.         GenerationObject genObject;
  9.         PooledConnection pooledCon = cpds.getPooledConnection(_user, _password);
  10.         // get the pooled connection
  11.                  //生产的连接绑定到了自身,方便回笼;
  12.         pooledCon.addConnectionEventListener(this);
  13.         // add it to the event listener
  14.         log.debug("StandardPoolDataSource:create create a object for the pool");
  15.         genObject =
  16.             new GenerationObject(
  17.                 pooledCon,
  18.                 pool.getGeneration(),
  19.                 _user,
  20.                 _password);
  21.         return genObject; // return a connection
  22.     }

    下面介绍一下对象的取出和回笼;

  1. public Object checkOut(String user, String password) throws Exception {
  2.         log.debug("GenericPool:checkOut an object");
  3.         //拿到当前的时间;
  4.         long now = System.currentTimeMillis(); // current time to compare
  5.         GenerationObject o;
  6.         Enumeration e;
  7.         Object realObject;
  8.         log.debug("GenericPool:checkOut UnlockedObjectCount="
  9.                 + getUnlockedObjectCount());
  10.         log.debug("GenericPool:checkOut LockedObjectCount="
  11.                 + getLockedObjectCount());
  12.         log
  13.                 .debug("GenericPool:checkOut count=" + count + " maxSize="
  14.                         + maxSize);
  15.         //如果当前存在可用对象
  16.         if (getUnlockedObjectCount() > 0) {
  17.             // if there are objects in the unlocked pool
  18.             if ((checkLevelObject == 3) || (checkLevelObject == 4)) { // need
  19.                 // to
  20.                 // verify
  21.                 // all
  22.                 // the
  23.                 // objects
  24.                //遍历这些对象
  25.                 e = unlocked.keys();
  26.                 while (e.hasMoreElements()) {
  27.                     o = (GenerationObject) e.nextElement();
  28.                     //存在可用的对象
  29.                     realObject = o.getObj(); // take the current object
  30.                     // first, verify if the object is not dead (lifetime)
  31.                     //如果当前元素过期的话
  32.                     Long life = (Long) unlocked.get(o);
  33.                     if (life == null
  34.                         || (now - life.longValue()) > lifeTime
  35.                         || (maxLifeTime > 0 && (now - o.getCreated()) > maxLifeTime)) {
  36.                         
  37.                         // object has expired
  38.                         log.debug("GenericPool:checkOut an object has expired");
  39.                         //从可用的队列中移除
  40.                         removeUnlockedObject(o);
  41.                         // minimumObject(user, password);
  42.                         // build object in the pool if it is lesser than minSize
  43.                     } else {
  44.                         log
  45.                                 .debug("GenericPool:checkOut check the owner of the connection");
  46.                         //检查拥有者
  47.                         if (checkOwner(o, user, password)) {
  48.                             if (((checkLevelObject == 3) && !poolHelper
  49.                                     .checkThisObject(realObject))
  50.                                     || ((checkLevelObject == 4) && !poolHelper
  51.                                             .testThisObject(realObject))) {
  52.                                 log
  53.                                         .debug("GenericPool:checkOut remove object checkLevelObject="
  54.                                                 + checkLevelObject);
  55.                                 //不合法移除;
  56.                                 removeUnlockedObject(o);
  57.                                 // minimumObject(user, password);
  58.                                 // build object in the pool if it is lesser than
  59.                                 // minSize
  60.                             }
  61.                         }
  62.                     }
  63.                 }
  64.             }
  65.         }
  66. int currentWait = 0;
        
            Object obj = getFromPool(user, password);
            while ((obj == null) && (currentWait < getDeadLockMaxWait())) {
                log.info("GenericPool:checkOut waiting for an object :"
                        + this.poolHelper.toString());
                try {
                    synchronized (this) {
                        //当没有对象的时候,等待若干时间
                        wait(getDeadLockRetryWait());
                    }
                } catch (InterruptedException excp) {
                    log
                            .error("GenericPool:checkOut ERROR Failed while waiting for an object: "
                                    + excp);
                }
                currentWait += getDeadLockRetryWait();
                //在重新尝试从池中取出可用对象;
                obj = getFromPool(user, password);
            }

            if (obj == null)
                throw new Exception(
                        "GenericPool:checkOut ERROR  impossible to obtain a new object from the pool");

            return obj;
        }

  1. private Object getFromPool(String user, String password) throws Exception {
  2.         long now = System.currentTimeMillis(); // current time to compare
  3.         if (getUnlockedObjectCount() > 0) {
  4.             // now, we have to return an object to the user
  5.             GenerationObject o = null;
  6.             Object realObject = null;
  7.             Long life = null;
  8.             Enumeration e = unlocked.keys(); // then take them
  9.             while (e.hasMoreElements()) { // for each objects ...
  10.                 synchronized (this) {
  11.                     // Ensure that there are object in the unlocked
  12.                     // collection
  13.                     if (getUnlockedObjectCount() == 0)
  14.                         break;
  15.                     o = (GenerationObject) e.nextElement();
  16.                     life = (Long) unlocked.get(o);
  17.                     if (life == null) {
  18.                         // Fix for #303462; note that this fixes the problem, but Enumeration's on Hashtable's
  19.                         // are by definition somewhat unpredictable; a more robust fix may be in order
  20.                         log.debug("GenericPool:getFromPool fix for #303462 encountered");
  21.                         continue;
  22.                     }
  23.                     //从可用对象的队列中移除
  24.                     unlocked.remove(o);
  25.                     // In any case the object will be removed.
  26.                     // Prevents others accessing the object while we are
  27.                     // not synchronized.
  28.                     //返回其真实对象;
  29.                     realObject = o.getObj();
  30.                 }
  31.                 // Verify that the life object is valid
  32.                 if (life == null)
  33.                     break;
  34.                 // first, verify if the object is not dead (lifetime)
  35.                 if (life == null
  36.                     || (now - life.longValue()) > lifeTime
  37.                     || (maxLifeTime > 0 && (now - o.getCreated()) > maxLifeTime)) {
  38.                     
  39.                     // object has expired
  40.                     log.debug("GenericPool:getFromPool an object has expired");
  41.                     removeUnlockedObject(o);
  42.                 } else {
  43.                     log
  44.                             .debug("GenericPool:getFromPool check the owner of the connection");
  45.                     if (checkOwner(o, user, password)) {
  46.                         log.debug("GenericPool:getFromPool owner is verified");
  47.                         // second, verification of the object if needed
  48.                         if ((checkLevelObject == 0)
  49.                                 || ((checkLevelObject == 1) && poolHelper
  50.                                         .checkThisObject(realObject))
  51.                                 || ((checkLevelObject == 2) && poolHelper
  52.                                         .testThisObject(realObject))) {
  53.                             locked.put(o, new Long(now));
  54.                             // put it in the locked pool
  55.                             log
  56.                                     .debug("GenericPool:getFromPool return an object (after verification if needed)");
  57.                             return (o.getObj()); // return this element
  58.                         } else { // object failed validation
  59.                             //          System.out.println("removeUnlockedObject="+realObject);
  60.                             log
  61.                                     .debug("GenericPool:getFromPool kill an object from the pool");
  62.                             removeUnlockedObject(o);
  63.                         }
  64.                     } else
  65.                         log.debug("GenericPool:getFromPool owner is FALSE");
  66.                 }
  67.             }
  68.         } // if getUnlockedObjectCount() > 0
  69.         // if no objects available, create a new one
  70.         boolean create = false;
  71.         //若是已有对象总和没有超过最大容量,可以自己新建对象
  72.         synchronized (this) {
  73.             if (count < maxSize) {
  74.                 create = true;
  75.                 count++; // assume we can create a connection.
  76.             }
  77.         }
  78.         if (create) {
  79.             //          System.out.println("on doit creer une connection");
  80.             // if number of pooled object is < max size of the pool
  81.             log
  82.                     .debug("GenericPool:getFromPool no objects available, create a new one");
  83.             try {
  84.                 //          System.out.println("on doit creer une connection CREATE");
  85.                 GenerationObject genObject = poolHelper.create(user, password);
  86.                 //          System.out.println("nouvel objet="+genObject.getObj());
  87.                 //          System.out.println("on doit creer une connection PUT");
  88.                 locked.put(genObject, new Long(now));
  89.                 // put it in the locked pool
  90.                 return (genObject.getObj()); // and return this element
  91.             } catch (Exception excp) {
  92.                 synchronized (this) {
  93.                     count--; // our assumption failed. rollback.
  94.                 }
  95.                 log
  96.                         .error("GenericPool:getFromPool Error Exception in GenericPool:getFromPool");
  97.                 // cney: rethrow exception thrown by create
  98.                 throw excp;
  99.             }
  100.         }
  101.         //否则返回空,继续等待
  102.         return null;
  103.     }

从连接池中回笼;

  1. public synchronized void checkIn(Object o) {
  2.         log.debug("GenericPool:checkIn return an object to the pool");
  3.         for (Enumeration enumeration = locked.keys(); enumeration.hasMoreElements();) { // for
  4.             // each
  5.             // object
  6.             // of
  7.             GenerationObject obj = (GenerationObject) enumeration.nextElement();
  8.             // the locked pool
  9.             if (obj.getObj().equals(o)) {
  10.                 //从锁定的对象从锁定队列中移入可用队列中
  11.                 locked.remove(obj); // remove the object from the locked pool
  12.                 unlocked.put(obj, new Long(System.currentTimeMillis()));
  13.                 // we have to verify if the generation of the object is still
  14.                 // valid
  15.                 int genObj = obj.getGeneration(); // get the generation number
  16.                 // if the generation number of the object is not valid, test the
  17.                 // object
  18.                 if (generation > genObj) {
  19.                     if (!poolHelper.checkThisObject(obj.getObj()))
  20.                         // if the object is not valid
  21.                         removeUnlockedObject(obj);
  22.                 }
  23.                 //唤醒线程
  24.                 notifyAll();
  25.             }
  26.         }

我们再来看一下连接池的清理线程;

  1. public class PoolKeeper implements Runnable {
  2.     private long sleepTime; // timeout between 2 clean up
  3.     private GenericPool pool; // the pool to clean up
  4.     private boolean running = true;
  5.     /**
  6.      * constructor, called by GenericPool (any kind of object)
  7.      */
  8.     public PoolKeeper(long sleepTime, GenericPool pool) {
  9.         this.sleepTime = sleepTime;
  10.         this.pool = pool;
  11.     }
  12.     public void stop() {
  13.         synchronized (this ) {
  14.                         running = false;
  15.                 }
  16.     }
  17.     /**
  18.      * run method. allows to clean up the pool
  19.      */
  20.     public void run() {
  21.         while (running && !Thread.interrupted()) {
  22.             try {
  23.                 synchronized (this) {
  24.                     wait(this.sleepTime); // wait for timeout ms before attack
  25.                 }
  26.             } catch (InterruptedException e) {
  27.                                 break;
  28.             }
  29.                         //特定时间间隔的清扫连接池;
  30.             this.pool.cleanUp(); // clean up the Pool and reallocate objects
  31.         }
  32.         // release the pool.
  33.         this.pool = null;
  34.     }

在clearUp中:

  1. long now = System.currentTimeMillis(); // current time
  2.         synchronized (this) {
  3.             for (Enumeration enumeration = unlocked.keys(); enumeration.hasMoreElements();) { // for
  4.                 // each
  5.                 // object
  6.                 // of
  7.                 // the
  8.                 GenerationObject o = (GenerationObject) enumeration.nextElement();
  9.                 // unlocked pool
  10.                 
  11.                 Long lasttouch = (Long) unlocked.get(o);
  12.                 // birth day of the pool
  13.                 if (lasttouch == null
  14.                     || (now - lasttouch.longValue()) > lifeTime
  15.                     || (maxLifeTime > 0 && (now - o.getCreated()) > maxLifeTime)) {
  16.                     log.debug("GenericPool:cleanUp clean up the pool");
  17.                     removeUnlockedObject(o);
  18.                 }
  19.             }
  20.         }

用当前时间与创建时间的间隔和生命周期做比较,看是否过期,过期杀死;

  1. public void expire(Object o) {
  2.         log.debug(
  3.             "StandardPoolDataSource:expire expire a connection, remove from the pool");
  4.         if (o == null)
  5.             return;
  6.         try {
  7.             PooledConnection pooledCon = (PooledConnection) o;
  8.             pooledCon.close(); // call close() of PooledConnection
  9.             pooledCon.removeConnectionEventListener(this);
  10.             log.debug("StandardPoolDataSource:expire close the connection");
  11.         } catch (java.sql.SQLException e) {
  12.             log.error(
  13.                 "StandardPoolDataSource:expire Error java.sql.SQLException in StandardPoolDataSource:expire");
  14.         }
  15.     }

我们再来看一下数据源的如何获取连接;

  1. public Connection getConnection(String _user, String _password)
  2.         throws SQLException {
  3.         log.debug("StandardPoolDataSource:getConnection");
  4.         Connection ret = null;
  5.         PooledConnection con = null;
  6.         synchronized (this) {
  7.             if (!onOff) {
  8.                 log.debug(
  9.                     "StandardPoolDataSource:getConnection must configure the pool...");
  10.                 pool.start(); // the pool starts now
  11.                 onOff = true// and is initialized
  12.                 log.debug(
  13.                     "StandardPoolDataSource:getConnection pool config : /n"
  14.                         + pool.toString());
  15.             }
  16.         }
  17.         try {
  18.             try {
  19.                 log.debug(
  20.                     "StandardPoolDataSource:getConnection Try to give a "
  21.                         + "connection (checkOut)");
  22.                 //从连接池中取得PooledConnection;
  23.                 con = (PooledConnection) pool.checkOut(_user, _password);
  24.                 // get a connection from the pool
  25.                 log.debug(
  26.                     "StandardPoolDataSource:getConnection checkOut return"
  27.                         + "a new connection");
  28.             } catch (Exception e) {
  29.                                 e.printStackTrace();
  30.                 log.debug(
  31.                     "StandardPoolDataSource:getConnection SQLException in StandardPoolDataSource:getConnection"
  32.                         + e);
  33.                 throw new SQLException(
  34.                     "SQLException in StandardPoolDataSource:getConnection no connection available "
  35.                         + e);
  36.             }
  37.             //取得物理连接
  38.             ret = con.getConnection();
  39.         } catch (Exception e) {
  40.             log.debug("StandardPoolDataSource:getConnection exception" + e);
  41.                         e.printStackTrace();
  42.             SQLException sqle =
  43.                 new SQLException(
  44.                     "SQLException in StandardPoolDataSource:getConnection exception: "
  45.                         + e);
  46.             if (e instanceof SQLException)
  47.                 sqle.setNextException((SQLException) e);
  48.             if (con != null) {
  49.                 pool.checkIn(con);
  50.             }
  51.             throw sqle;
  52.         }
  53.         log.debug("StandardPoolDataSource:getConnection return a connection");
  54.         return ret;
  55.     }

分布式事务的分析会留到下一节,在这一节中,有几个地方值得学习:

(1)工厂和监听者集为一身,在场景,如产品还要工厂进行同意的售后服务等等;

 (2)线程扫描,时间间隔等等;

抱歉!评论已关闭.