现在的位置: 首页 > 综合 > 正文

BlockingQueue 使用 BlockingQueue 使用

2017年11月04日 ⁄ 综合 ⁄ 共 3939字 ⁄ 字号 评论关闭

BlockingQueue 使用

分类: java线程 14人阅读 评论(0) 收藏 举报

 本例介绍一个特殊的队列:BlockingQueue,如果BlockingQueue是空的,从BlockingQueue取东西的操作将会被阻断进入 等待状态,直到BlockingQueue进了东西才会被唤醒,同样,如果BlockingQueue是满的,任何试图往里存东西的操作也会被阻断进入等 待状态,直到BlockingQueue里有空间时才会被唤醒继续操作。 
       本例再次实现前面介绍的篮子程序,不过这个篮子中最多能放得苹果数不是1,可以随意指定。当篮子满时,生产者进入等待状态,当篮子空时,消费者等待。 

       BlockingQueue定义的常用方法如下: 
                add(anObject):把anObject加到BlockingQueue里,如果BlockingQueue可以容纳,则返回true,否则抛出异常。 
                offer(anObject):表示如果可能的话,将anObject加到BlockingQueue里,即如果BlockingQueue可以容纳,则返回true,否则返回false。 
                put(anObject):把anObject加到BlockingQueue里,如果BlockingQueue没有空间,则调用此方法的线程被阻断直到BlockingQueue里有空间再继续。 
                poll(time):取走BlockingQueue里排在首位的对象,若不能立即取出,则可以等time参数规定的时间,取不到时返回null。 
                take():取走BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态直到BlockingQueue有新的对象被加入为止。 

       BlockingQueue有四个具体的实现类,根据不同需求,选择不同的实现类: 
                ArrayBlockingQueue:规定大小的BlockingQueue,其构造函数必须带一个int参数来指明其大小。其所含的对象是以FIFO(先入先出)顺序排序的。 
                LinkedBlockingQueue:大小不定的BlockingQueue,若其构造函数带一个规定大小的参数,生成的BlockingQueue 有大小限制,若不带大小参数,所生成的BlockingQueue的大小由Integer.MAX_VALUE来决定。其所含的对象是以FIFO顺序排序 的。 
                PriorityBlockingQueue:类似于LinkedBlockingQueue,但其所含对象的排序不是FIFO,而是依据对象的自然排序顺序或者是构造函数所带的Comparator决定的顺序。 
                SynchronousQueue:特殊的BlockingQueue,对其的操作必须是放和取交替完成的。 

       LinkedBlockingQueue和ArrayBlockingQueue比较起来,它们背后所用的数据结构不一样,导致 LinkedBlockingQueue的数据吞吐量要大于ArrayBlockingQueue,但在线程数量很大时其性能的可预见性低于 ArrayBlockingQueue。

 

Java代码  收藏代码
  1. <span style="font-size: small;">package com.yoyosys.smartstorage.consumer;  
  2.   
  3. import java.util.concurrent.BlockingQueue;  
  4. /** 
  5.  * to-do. 
  6.  * 
  7.  * @author Denghaiping 
  8.  * @version 1.0 
  9.  */  
  10. public class Fetcher implements Runnable  
  11. {  
  12.   @SuppressWarnings("unused")  
  13.   private BlockingQueue<String> queue = null ;  
  14.   
  15.   public Fetcher(BlockingQueue<String> queue)  
  16.   {  
  17.     this.queue = queue;  
  18.   }  
  19.   
  20.   public void run()   
  21.   {  
  22.     try {  
  23.       while (true) {  
  24.         queue.put("segment-name-"+i);  
  25.         System.out.println("ThreadName : "+ Thread.currentThread().getName() +" 生产完成");  
  26.       }     
  27.     } catch (InterruptedException ex) {  
  28.       ex.printStackTrace();      
  29.     }     
  30.   }  
  31. }                                                                                                                                                                        
  32.   
  33. </span>  

 

Java代码  收藏代码
  1. <span style="font-size: small;"> */  
  2. package com.yoyosys.smartstorage.consumer;  
  3.   
  4. import java.util.concurrent.ArrayBlockingQueue;  
  5. import java.util.concurrent.BlockingQueue;  
  6. /** 
  7.  * to-do. 
  8.  * 
  9.  * @author Denghaiping 
  10.  * @version 1.0 
  11.  */  
  12. public class Indexer implements Runnable  
  13. {  
  14.   
  15.   private  BlockingQueue<String> queue ;  
  16.   public Indexer(BlockingQueue<String> queue)  
  17.   {  
  18.     this.queue = queue;  
  19.   }  
  20.   
  21.   public void run()  
  22.   {  
  23.     try {  
  24.       while(true) {  
  25.         Thread.sleep(1000);  
  26.         String name = queue.take();  
  27.         System.out.println("ThreadName : " +Thread.currentThread().getName()+ " 消费完成 " +name);  
  28.       }  
  29.     } catch (InterruptedException ex) {  
  30.       ex.printStackTrace();   
  31.     }  
  32.   }  
  33. }   
  34.   
  35. </span>  

 

Java代码  收藏代码
  1. <span style="font-size: small;">package com.yoyosys.smartstorage.consumer;  
  2.   
  3. import java.util.concurrent.*;  
  4.   
  5. /** 
  6.  * to-do. 
  7.  * 
  8.  * @author Denghaiping 
  9.  * @version 1.0 
  10.  */  
  11. public class TestConsumer  
  12. {  
  13.   private static BlockingQueue<String> queue = new ArrayBlockingQueue<String> (10);  
  14.     
  15.     
  16.   public static void main(String [] args)   
  17.   {  
  18.     ExecutorService service = Executors.newCachedThreadPool();    
  19.     Fetcher producer = new Fetcher(queue);    
  20.     Indexer consumer = new Indexer(queue);    
  21.     Indexer consumer1 = new Indexer(queue);    
  22.     service.submit(producer);    
  23.     service.submit(consumer);  
  24.     service.submit(consumer1);  
  25.     // 程序运行5s后,所有任务停止    
  26.     try {  
  27.       Thread.sleep(5000);  
  28.     } catch (InterruptedException e) {  
  29.       e.printStackTrace();  
  30.     }  
  31.     //service.shutdownNow();    
  32.   }  
  33. }  
  34.   
  35. </span>  

抱歉!评论已关闭.