package cn.itcast.hemai2; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; /** * 这是一个典型的“有界缓存区”,固定大小的数组在其中保持生产者插入的元素和使用者提取的元素。一旦创建了这样的缓存区,就不能再增加其容量。 * 试图向已满队列中放入元素会导致操作受阻塞;试图从空队列中提取元素将导致类似阻塞。 * ArrayBlockingQueue 只有put方法和take方法才具有阻塞功能 * @author admin * */ public class BlockingQueueTest { public static void main(String[] args) { final BlockingQueue queue = new ArrayBlockingQueue(3); for(int i=0;i<2;i++){ new Thread(){ public void run(){ while(true){ try { Thread.sleep((long)(Math.random()*1000)); System.out.println(Thread.currentThread().getName() + "准备放数据!"); queue.put(1); System.out.println(Thread.currentThread().getName() + "已经放了数据," + "队列目前有" + queue.size() + "个数据"); } catch (InterruptedException e) { e.printStackTrace(); } } } }.start(); } new Thread(){ public void run(){ while(true){ try { //将此处的睡眠时间分别改为100和1000,观察运行结果 Thread.sleep(1000); System.out.println(Thread.currentThread().getName() + "准备取数据!"); queue.take(); System.out.println(Thread.currentThread().getName() + "已经取走数据," + "队列目前有" + queue.size() + "个数据"); } catch (InterruptedException e) { e.printStackTrace(); } } } }.start(); } }
package cn.itcast.hemai2; import java.util.Collections; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.atomic.AtomicInteger; /** * 阻塞队列ArrayBlockingQueue的一个应用: * 用两个具有1个空间的队列来实现同步通知的功能。 * @author admin * */ public class BlockingQueueCommunication { /** * @param args */ public static void main(String[] args) { final Business business = new Business(); new Thread( new Runnable() { @Override public void run() { for(int i=1;i<=50;i++){ business.sub(i); } } } ).start(); for(int i=1;i<=50;i++){ business.main(i); } } static class Business { BlockingQueue<Integer> queue1 = new ArrayBlockingQueue<Integer>(1); BlockingQueue<Integer> queue2 = new ArrayBlockingQueue<Integer>(1); { Collections.synchronizedMap(null); try { System.out.println("xxxxxdfsdsafdsa"); queue2.put(1); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } public void sub(int i){ try { queue1.put(1); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } for(int j=1;j<=10;j++){ System.out.println("sub thread sequece of " + j + ",loop of " + i); } try { queue2.take(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } public void main(int i){ try { queue2.put(1); } catch (InterruptedException e1) { // TODO Auto-generated catch block e1.printStackTrace(); } for(int j=1;j<=100;j++){ System.out.println("main thread sequece of " + j + ",loop of " + i); } try { queue1.take(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } }