现在的位置: 首页 > 综合 > 正文

服务器端利器–双缓冲队列

2013年12月07日 ⁄ 综合 ⁄ 共 5769字 ⁄ 字号 评论关闭

http://rrsongzi-gmail-com.iteye.com/blog/696627

传统队列是生产者线程和消费者线程从同一个队列中存取数据,必然需要互斥访 问,在互相同步等待中浪费了宝贵的时间,使队列吞吐量受影响。双缓冲队使用两个队列,将读写分离,一个队列专门用来读,另一个专门用来写,当读队列空或写 队列满时将两个队列互换。这里为了保证队列的读写顺序,当读队列为空且写队列不为空时候才允许两个队列互换。

经过测试性能较JDK自带的queue的确有不小提高。


测试是和JDK6中性能最高的阻塞Queue:java.util.concurrent.ArrayBlockingQueue做比较,这个队列是环形队列的实现方式,性能还算不错,不过我们的目标是没有最好,只有更好。
测试场景:
起若干个生产者线程,往Queue中放数据,起若干个消费者线程从queue中取数据,统计每个消费者线程取N个数据的平均时间。
数据如下:
场景1 
生产者线程数:1
消费者线程数:1
Queue容量:5w
取元素个数:1000w
JDK ArrayBlockingQueue用时平均为:  5,302,938,177纳秒
双缓冲队列用时平均为:                      5,146,302,116纳秒
相差大概160毫秒

场景2: 
生产者线程数:5
消费者线程数:4
Queue容量:5w
取元素个数:1000w
JDK ArrayBlockingQueue用时平均为:  32,824,744,868纳秒
双缓冲队列用时平均为:                      20,508,495,221纳秒
相差大概12.3秒

可见在生产者消费者都只有一个的时候存和取的同步冲突比较小,双缓冲队列
优势不是很大,当存取线程比较多的时候优势就很明显了。

 

队列主要方法如下:

Java代码  收藏代码
  1. <span style="font-size: small;">/** 
  2.  *  
  3.  * CircularDoubleBufferedQueue.java 
  4.  * 囧囧有神 
  5.  * @param <E>2010-6-12 
  6.  */  
  7. public class CircularDoubleBufferedQueue<E> extends AbstractQueue<E>  
  8. implements BlockingQueue<E>, java.io.Serializable  
  9. {  
  10.     private static final long serialVersionUID = 1L;  
  11.     private Logger logger =   Logger.getLogger(CircularDoubleBufferedQueue.class.getName());  
  12.   
  13.     /** The queued items  */  
  14.     private final E[] itemsA;  
  15.     private final E[] itemsB;  
  16.       
  17.     private ReentrantLock readLock, writeLock;  
  18.     private Condition notEmpty;  
  19.     private Condition notFull;  
  20.     private Condition awake;  
  21.       
  22.       
  23.     private E[] writeArray, readArray;  
  24.     private volatile int writeCount, readCount;  
  25.     private int writeArrayHP, writeArrayTP, readArrayHP, readArrayTP;  
  26.       
  27.       
  28.     public CircularDoubleBufferedQueue(int capacity)  
  29.     {  
  30.         if(capacity<=0)  
  31.         {  
  32.             throw new IllegalArgumentException("Queue initial capacity can't less than 0!");  
  33.         }  
  34.           
  35.         itemsA = (E[])new Object[capacity];  
  36.         itemsB = (E[])new Object[capacity];  
  37.   
  38.         readLock = new ReentrantLock();  
  39.         writeLock = new ReentrantLock();  
  40.           
  41.         notEmpty = readLock.newCondition();  
  42.         notFull = writeLock.newCondition();  
  43.         awake = writeLock.newCondition();  
  44.           
  45.         readArray = itemsA;  
  46.         writeArray = itemsB;  
  47.     }  
  48.       
  49.     private void insert(E e)  
  50.     {  
  51.         writeArray[writeArrayTP] = e;  
  52.         ++writeArrayTP;  
  53.         ++writeCount;  
  54.     }  
  55.       
  56.     private E extract()  
  57.     {  
  58.         E e = readArray[readArrayHP];  
  59.         readArray[readArrayHP] = null;  
  60.         ++readArrayHP;  
  61.         --readCount;  
  62.         return e;  
  63.     }  
  64.   
  65.       
  66.     /** 
  67.      *switch condition:  
  68.      *read queue is empty && write queue is not empty 
  69.      *  
  70.      *Notice:This function can only be invoked after readLock is  
  71.          * grabbed,or may cause dead lock 
  72.      * @param timeout 
  73.      * @param isInfinite: whether need to wait forever until some other 
  74.      * thread awake it 
  75.      * @return 
  76.      * @throws InterruptedException 
  77.      */  
  78.     private long queueSwitch(long timeout, boolean isInfinite) throws InterruptedException  
  79.     {  
  80.         writeLock.lock();  
  81.         try  
  82.         {  
  83.             if (writeCount <= 0)  
  84.             {  
  85.                 logger.debug("Write Count:" + writeCount + ", Write Queue is empty, do not switch!");  
  86.                 try  
  87.                 {  
  88.                     logger.debug("Queue is empty, need wait....");  
  89.                     if(isInfinite && timeout<=0)  
  90.                     {  
  91.                         awake.await();  
  92.                         return -1;  
  93.                     }  
  94.                     else  
  95.                     {  
  96.                         return awake.awaitNanos(timeout);  
  97.                     }  
  98.                 }  
  99.                 catch (InterruptedException ie)  
  100.                 {  
  101.                     awake.signal();  
  102.                     throw ie;  
  103.                 }  
  104.             }  
  105.             else  
  106.             {  
  107.                 E[] tmpArray = readArray;  
  108.                 readArray = writeArray;  
  109.                 writeArray = tmpArray;  
  110.   
  111.                 readCount = writeCount;  
  112.                 readArrayHP = 0;  
  113.                 readArrayTP = writeArrayTP;  
  114.   
  115.                 writeCount = 0;  
  116.                 writeArrayHP = readArrayHP;  
  117.                 writeArrayTP = 0;  
  118.                   
  119.                 notFull.signal();  
  120.                 logger.debug("Queue switch successfully!");  
  121.                 return -1;  
  122.             }  
  123.         }  
  124.         finally  
  125.         {  
  126.             writeLock.unlock();  
  127.         }  
  128.     }  
  129.   
  130.     public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException  
  131.     {  
  132.         if(e == null)  
  133.         {  
  134.             throw new NullPointerException();  
  135.         }  
  136.           
  137.         long nanoTime = unit.toNanos(timeout);  
  138.         writeLock.lockInterruptibly();  
  139.         try  
  140.         {  
  141.             for (;;)  
  142.             {  
  143.                 if(writeCount < writeArray.length)  
  144.                 {  
  145.                     insert(e);  
  146.                     if (writeCount == 1)  
  147.                     {  
  148.                         awake.signal();  
  149.                     }  
  150.                     return true;  
  151.                 }  
  152.                   
  153.                 //Time out  
  154.                 if(nanoTime<=0)  
  155.                 {  
  156.                     logger.debug("offer wait time out!");  
  157.                     return false;  
  158.                 }  
  159.                 //keep waiting  
  160.                 try  
  161.                 {  
  162.                     logger.debug("Queue is full, need wait....");  
  163.                     nanoTime = notFull.awaitNanos(nanoTime);  
  164.                 }  
  165.                 catch(InterruptedException ie)  
  166.                 {  
  167.                     notFull.signal();  
  168.                     throw ie;  
  169.                 }  
  170.             }  
  171.         }  
  172.         finally  
  173.         {  
  174.             writeLock.unlock();  
  175.         }  
  176.     }  
  177.   

抱歉!评论已关闭.