现在的位置: 首页 > 综合 > 正文

java多线程之BlockingQueue深入分析

2013年12月01日 ⁄ 综合 ⁄ 共 4955字 ⁄ 字号 评论关闭

一、概述:

BlockingQueue作为线程容器,可以为线程同步提供有力的保障。


二、BlockingQueue定义的常用方法

1.BlockingQueue定义的常用方法如下:

  抛出异常 特殊值 阻塞 超时
插入 add(e) offer(e) put(e) offer(e, time, unit)
移除 remove() poll() take() poll(time, unit)
检查 element() peek() 不可用 不可用


        1)add(anObject):anObject加到BlockingQueue,即如果BlockingQueue可以容纳,则返回true,否则招聘异常

        2)offer(anObject):表示如果可能的话,anObject加到BlockingQueue,即如果BlockingQueue可以容纳,则返回true,否则返回false.

        3)put(anObject):anObject加到BlockingQueue,如果BlockQueue没有空间,则调用此方法的线程被阻断直到BlockingQueue里面有空间再继续.

        4)poll(time):取走BlockingQueue里排在首位的对象,若不能立即取出,则可以等time参数规定的时间,取不到时返回null

        5)take():取走BlockingQueue里排在首位的对象,BlockingQueue为空,阻断进入等待状态直到Blocking有新的对象被加入为止

其中:BlockingQueue 不接受null 元素。试图addput 或offer 一个null 元素时,某些实现会抛出NullPointerExceptionnull 被用作指示poll 操作失败的警戒值。 


三、BlockingQueue的几个注意点


【1】BlockingQueue 可以是限定容量的。它在任意给定时间都可以有一个remainingCapacity,超出此容量,便无法无阻塞地put 附加元素。没有任何内部容量约束的BlockingQueue 总是报告Integer.MAX_VALUE 的剩余容量。

【2】BlockingQueue 实现主要用于生产者-使用者队列,但它另外还支持Collection 接口。因此,举例来说,使用remove(x) 从队列中移除任意一个元素是有可能的。然而,这种操作通常 会有效执行,只能有计划地偶尔使用,比如在取消排队信息时。

【3】BlockingQueue 实现是线程安全的。所有排队方法都可以使用内部锁或其他形式的并发控制来自动达到它们的目的。然而,大量的 Collection 操作(addAllcontainsAllretainAll 和removeAll没有 必要自动执行,除非在实现中特别说明。因此,举例来说,在只添加了c 中的一些元素后,addAll(c) 有可能失败(抛出一个异常)。

【4】BlockingQueue 实质上 支持使用任何一种“close”或“shutdown”操作来指示不再添加任何项。这种功能的需求和使用有依赖于实现的倾向。例如,一种常用的策略是:对于生产者,插入特殊的end-of-stream 或poison 对象,并根据使用者获取这些对象的时间来对它们进行解释。


 四、简要概述BlockingQueue常用的四个实现类


        1)ArrayBlockingQueue:规定大小的BlockingQueue,其构造函数必须带一个int参数来指明其大小.其所含的对象是以FIFO(先入先出)顺序排序的.

        2)LinkedBlockingQueue:大小不定的BlockingQueue,若其构造函数带一个规定大小的参数,生成的BlockingQueue有大小限制,若不带大小参数,所生成的BlockingQueue的大小由Integer.MAX_VALUE来决定.其所含的对象是以FIFO(先入先出)顺序排序的

        3)PriorityBlockingQueue:类似于LinkedBlockQueue,但其所含对象的排序不是FIFO,而是依据对象的自然排序顺序或者是构造函数的Comparator决定的顺序.

        4)SynchronousQueue:特殊的BlockingQueue,对其的操作必须是放和取交替完成的.

    其中LinkedBlockingQueueArrayBlockingQueue比较起来,它们背后所用的数据结构不一样,导致LinkedBlockingQueue的数据吞吐量要大于ArrayBlockingQueue,但在线程数量很大时其性能的可预见性低于ArrayBlockingQueue.  


五、具体BlockingQueue的实现类的内部细节


有耐心的同学请看具体实现类细节:

1、ArrayBlockingQueue


    ArrayBlockingQueue是一个由数组支持的有界阻塞队列。此队列按
FIFO(先进先出)原则对元素进行排序。队列的头部 是在队列中存在时间最长的元素。队列的尾部 是在队列中存在时间最短的元素。新元素插入到队列的尾部,队列检索操作则是从队列头部开始获得元素。

    这是一个典型的“有界缓存区”,固定大小的数组在其中保持生产者插入的元素和使用者提取的元素。一旦创建了这样的缓存区,就不能再增加其容量。试图向已满队列中放入元素会导致放入操作受阻塞;试图从空队列中检索元素将导致类似阻塞。

ArrayBlockingQueue创建的时候需要指定容量capacity(可以存储的最大的元素个数,因为它不会自动扩容)以及是否为公平锁(fair参数)。

在创建ArrayBlockingQueue的时候默认创建的是非公平锁,不过我们可以在它的构造函数里指定。这里调用ReentrantLock的构造函数创建锁的时候,调用了:

public ReentrantLock(boolean fair) {

sync = (fair)? new FairSync() : new NonfairSync();

}

FairSync/ NonfairSync是ReentrantLock的内部类:

线程按顺序请求获得公平锁,而一个非公平锁可以闯入,且当它尚未进入等待队列,就会和等待队列head结点的线程发生竞争,如果锁的状态可用,请求非公平锁的线程可在等待队列中向前跳跃,获得该锁。内部锁synchronized没有提供确定的公平性保证。

分三点来讲这个类:

2.1 添加新元素的方法:add/put/offer

2.2 该类的几个实例变量:takeIndex/putIndex/count/

2.3 Condition实现


1.1 添加新元素的方法:add/put/offer


首先,谈到添加元素的方法,首先得分析以下该类同步机制中用到的锁:

Java代码

[java] view
plain
copy

  1. lock = new ReentrantLock(fair);       
  2. notEmpty = lock.newCondition();//Condition Variable 1       
  3. notFull =  lock.newCondition();//Condition Variable 2    

这三个都是该类的实例变量,只有一个锁lock,然后lock实例化出两个Condition,notEmpty/noFull分别用来协调多线程的读写操作。

Java代码

[java] view
plain
copy

  1. public boolean offer(E e) {       
  2.         if (e == nullthrow new NullPointerException();       
  3.         final ReentrantLock lock = this.lock;//每个对象对应一个显示的锁       
  4.         lock.lock();//请求锁直到获得锁(不可以被interrupte)       
  5.         try {       
  6.             if (count == items.length)//如果队列已经满了       
  7.                 return false;       
  8.             else {       
  9.                 insert(e);       
  10.                 return true;       
  11.             }       
  12.         } finally {       
  13.             lock.unlock();//       
  14.         }       
  15. }       
  16. 看insert方法:       
  17. private void insert(E x) {       
  18.         items[putIndex] = x;       
  19.         //增加全局index的值。       
  20.         /*     
  21.         Inc方法体内部:     
  22.         final int inc(int i) {     
  23.         return (++i == items.length)? 0 : i;     
  24.             }     
  25.         这里可以看出ArrayBlockingQueue采用从前到后向内部数组插入的方式插入新元素的。如果插完了,putIndex可能重新变为0(在已经执行了移除操作的前提下,否则在之前的判断中队列为满)     
  26.         */      
  27.         putIndex = inc(putIndex);        
  28.         ++count;       
  29.         notEmpty.signal();//wake up one waiting thread       
  30. }      

Java代码

[java] view
plain
copy

  1. public void put(E e) throws InterruptedException {       
  2.         if (e == nullthrow new NullPointerException();       
  3.         final E[] items = this.items;       
  4.         final ReentrantLock lock = this.lock;       
  5.         lock.lockInterruptibly();//请求锁直到得到锁或者变为interrupted       
  6.         try {       
  7.             try {       
  8.                 while (count == items.length)//如果满了,当前线程进入noFull对应的等waiting状态       
  9.                     notFull.await();       
  10.             } catch (InterruptedException ie) {       
  11.                 notFull.signal(); // propagate to non-interrupted thread       
  12.                 throw ie;       
  13.             }       
  14.             insert(e);       
  15.         } finally {       
  16.             lock.unlock();       
  17.         }       
  18. }      

Java代码

[java] view
plain
copy

  1. public 

抱歉!评论已关闭.