现在的位置: 首页 > 综合 > 正文

BlockingQueue

2012年09月08日 ⁄ 综合 ⁄ 共 4703字 ⁄ 字号 评论关闭

 BlockingQueue接口是在Queue基础上增加了两个操作
 两个操作是:检索元素时等待队列变为非空,以及存储元素时等待空间变得可用。

 (
试图向已满队列中放入元素会导致放入操作受阻塞,直到BlockingQueue里有新的空间才会被唤醒继续操作;

 试图从空队列中检索元素将导致类似阻塞,直到BlocingkQueue进了新货才会被唤醒。

 BlockingQueue 不接受 null 元素。试图 add、put 或 offer 一个 null 元素时,

 某些实现会抛出 NullPointerException。null 被用作指示 poll 操作失败的警戒值。 

 
BlockingQueue可以是限定容量的。它在任意给定时间都可以有一个 remainingCapacity,超出此容量,便无法无阻塞地 put 额外的元素。

 没有任何内部容量约束的 BlockingQueue 总是报告 Integer.MAX_VALUE 的剩余容量。 


 BlockingQueue 实现主要用于生产者-使用者队列,但它另外还支持 Collection 接口。

 因此,举例来说,使用 remove(x) 从队列中移除任意一个元素是有可能的。然而,这种操作通常不会有效执行,

 只能有计划地偶尔使用,比如在取消排队信息时。 
 BlockingQueue实现是线程安全的。

 所有排队方法都可以使用内部锁定或其他形式的并发控制来自动达到它们的目的。

 然而,大量的 Collection 操作(addAll、containsAll、retainAll 和 removeAll)没有必要自动执行

 (可能是空实现,它会抛出UnsupportOperationException),

 除非在实现中特别说明。因此,举例来说,在 c 中添加了的一些元素后,addAll(c) 有可能失败(抛出一个异常)。 

 BlockingQueue 实质上不支持使用任何一种“close”或“shutdown”操作来指示不再添加任何项。

 这种功能的需求和使用有依赖于实现的倾向。

 例如,一种常用的策略是:对于生产者,插入特殊的 end-of-stream 或 poison 对象,并根据使用者获取这些对象的时间来对它们进行解释。 

 内存一致性效果:一致性效果:和其他并发集合一样。把一个元素放入到队列的线程的优先级高与对元素的访问和移除的线程

 注意1:BlockingQueue 可以安全地与多个生产者和多个使用者一起使用。 

 例1是基于典型的生产者-使用者场景的一个用例。

 注意2BlockingQueue 实现是线程安全的所有排队方法都可以使用内部锁定或其他形式的并发控制来自动达到它们的目的
 注意3:在JDK5/6中,LinkedBlockingQueue和ArrayBlocingQueue等对象的poll(long timeout, TimeUnit unit)存在内存泄露

   Leak的对象是AbstractQueuedSynchronizer.Node,

   据称JDK5会在Update12里Fix,JDK6会在Update2里Fix。

   更加详细参考http://sesame.javaeye.com/blog/428026和 http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=2143840 

声明的接口

Public Methods
abstract boolean add(E
e)
Inserts the specified element into this queue if it is possible to do so immediately without violating capacity restrictions, returning true upon success and throwing an IllegalStateException if
no space is currently available.
abstract boolean contains(Object o)
Returns true if this queue contains the specified element.
abstract int drainTo(Collection<? super E>
c)
Removes all available elements from this queue and adds them to the given collection.
移除此队列中所有可用的元素,并将它们添加到给定 collection 中。
abstract int drainTo(Collection<? super E>
c, int maxElements)
Removes at most the given number of available elements from this queue and adds them to the given collection.
最多从此队列中移除给定数量的可用元素,并将这些元素添加到给定 collection 中。
abstract boolean offer(E
e, long timeout, TimeUnit unit)
Inserts the specified element into this queue, waiting up to the specified wait time if necessary for space to become available.
将指定的元素插入到此队列的尾部,如果没有可用空间,将等待指定的等待时间(如果有必要)。 
abstract boolean offer(E
e)
Inserts the specified element into this queue if it is possible to do so immediately without violating capacity restrictions, returning true upon success and false if no space is currently
available.
将指定的元素插入到此队列的尾部(如果可能),如果此队列已满,则立即返回。
abstract E poll(long
timeout, TimeUnit unit)
Retrieves and removes the head of this queue, waiting up to the specified wait time if necessary for an element to become available.
检索并移除此队列的头部,如果此队列中没有任何元素,则等待指定等待的时间(如果有必要)。 
abstract void put(E
e)
Inserts the specified element into this queue, waiting if necessary for space to become available.
将指定的元素添加到此队列的尾部,如果必要,将等待可用的空间。 
abstract int remainingCapacity()
Returns the number of additional elements that this queue can ideally (in the absence of memory or resource constraints) accept without blocking, or Integer.MAX_VALUE if there is no intrinsic limit.
abstract boolean remove(Object o)
Removes a single instance of the specified element from this queue, if it is present.
abstract E take()
Retrieves and removes the head of this queue, waiting if necessary until an element becomes available.
检索并移除此队列的头部,如果此队列不存在任何元素,则一直等待。

ArrayBlockingQueue

ArrayBlockingQueue是一个基于数组实现的,更多内容请参考《ArrayBlockingQueue
LinkedBlockingQueue
  LinkedBlockingQueue是一个基于链接节点的、范围任意的blocking queue的实现更多内容请参考《
LinkedBlockingQueue
DelayQueue

 DelayQueue是实现Delayed接口的元素的一个无界阻塞队列,只有在延迟期满时才能从中提取元素。

关于此的更多内容请参考《
DelayQueue
PriorityBlockingQueue
PriorityBlockingQueue一个无界的阻塞队列,它使用与类PriorityQueue相同的顺序规则,并且提供了阻塞检索的操作。

关于此的详细内容请参考《
PriorityBlockingQueue
SynchronousQueue
 
SynchronousQueue也好是BlockingQueue的一种实现。它是这样的一种阻塞队列,其中每个 put 必须等待一个 take,反之亦然。同步队列没有任何内部容量,甚至连一个队列的容量都没有。
关于此的详细内容请参考《SynchronousQueue
 例1
 以下是来自java文档基于典型的生产者-使用者场景的一个用例。
 class Producer implements Runnable {

   private final BlockingQueue queue;

   Producer(BlockingQueue q) { queue = q; }

   
public void run() {

     try {

       while (true) { queue.put(produce()); }

     } catch (InterruptedException ex) { ... handle ...}

   }

   Object produce() { ... }

 }

 
class Consumer implements Runnable {

   private final BlockingQueue queue;

   Consumer(BlockingQueue q) { queue = q; }

   
public void run() {

     
try {

       
while (true) { consume(queue.take()); }

     } 
catch (InterruptedException ex) { ... handle ...}

   }

   void consume(Object x) { ... }

 }

 
class Setup {

   
void main() {

     BlockingQueue q = new SomeQueueImplementation();

     Producer p = new Producer(q);

     Consumer c1 = new Consumer(q);

     Consumer c2 = new Consumer(q);

     new Thread(p).start();

     new Thread(c1).start();

     new Thread(c2).start();

   }

 }

抱歉!评论已关闭.