1、使用synchronized、wait和notify
package ProductAndConsume; import java.util.List; public class Consume implements Runnable{ private List container = null; private int count; public Consume(List lst){ this.container = lst; } public void run() { while(true){ synchronized (container) { if(container.size()== 0){ try { container.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } container.remove(0); container.notify(); System.out.println("我吃了"+(++count)+"个"); } } } } package ProductAndConsume; import java.util.List; public class Product implements Runnable { private List container = null; private int count; public Product(List lst) { this.container = lst; } public void run() { while (true) { synchronized (container) { if (container.size() > MultiThread.MAX) { try { container.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } container.add(new Object()); container.notify(); System.out.println("我生产了"+(++count)+"个"); } } } } package ProductAndConsume; import java.util.ArrayList; import java.util.List; public class MultiThread { private List container = new ArrayList(); public final static int MAX = 5; public static void main(String args[]){ MultiThread m = new MultiThread(); new Thread(new Consume(m.getContainer())).start(); new Thread(new Product(m.getContainer())).start(); new Thread(new Consume(m.getContainer())).start(); new Thread(new Product(m.getContainer())).start(); } public List getContainer() { return container; } public void setContainer(List container) { this.container = container; }
2、使用Conditon、await和signal
package com.meituan.hyt.test5; import java.util.ArrayList; import java.util.List; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class Main { private List container = new ArrayList(); public final static int MAX = 5; private final Lock lock = new ReentrantLock(); private final Condition notFull = lock.newCondition(); private final Condition notEmpty = lock.newCondition(); public static void main(String[] args) { Main m = new Main(); Product product = m.new Product(); Consume consume = m.new Consume(); new Thread(product).start(); new Thread(consume).start(); } class Consume implements Runnable { private int count; public void run() { while (true) { // try { // Thread.sleep(1000); // } catch (InterruptedException e) { // e.printStackTrace(); // } lock.lock(); try { if (container.size() == 0) { try { System.out.println(Thread.currentThread().getName() + "队列空,我等待"); notEmpty.await(); } catch (InterruptedException e) { e.printStackTrace(); } } try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } container.remove(0); System.out.println(Thread.currentThread().getName() + "我消费了" + (++count) + "个"); notFull.signalAll(); } finally { lock.unlock(); } } } } class Product implements Runnable { private int count; public void run() { while (true) { // try { // Thread.sleep(1000); // } catch (InterruptedException e) { // e.printStackTrace(); // } lock.lock(); try { if (container.size() > MAX) { try { System.out.println(Thread.currentThread().getName() + "队列满,我等待"); notFull.await(); } catch (InterruptedException e) { e.printStackTrace(); } } try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } container.add(new Object()); System.out.println(Thread.currentThread().getName() + "我生产了" + (++count) + "个"); notEmpty.signalAll(); } finally { lock.unlock(); } } } } }
运行结果:
Thread-1我生产了1个
Thread-1我生产了2个
Thread-1我生产了3个
Thread-1我生产了4个
Thread-1我生产了5个
Thread-1我生产了6个
Thread-1队列满,我等待
Thread-2我消费了1个
Thread-2我消费了2个
Thread-2我消费了3个
Thread-2我消费了4个
Thread-2我消费了5个
Thread-2我消费了6个
Thread-2队列空,我等待
Thread-1我生产了7个
Thread-1我生产了8个
Thread-1我生产了9个
Thread-1我生产了10个
Thread-1我生产了11个
Thread-1我生产了12个
Thread-1队列满,我等待
Thread-2我消费了7个
......
如果取消注释代码,运行结果如下,因为sleep给了其他线程抢夺锁的机会。
Thread-1我生产了1个
Thread-2我消费了1个
Thread-1我生产了2个
Thread-2我消费了2个
Thread-1我生产了3个
Thread-2我消费了3个
Thread-1我生产了4个
Thread-2我消费了4个
Thread-1我生产了5个
Thread-2我消费了5个
Thread-1我生产了6个
Thread-2我消费了6个
Thread-1我生产了7个
Thread-2我消费了7个
Thread-1我生产了8个
Thread-2我消费了8个
......
3、使用jdk自带的数据结构LinkedBlockingQueue
LinkedBlockingQueue是一个线程安全的阻塞队列,它实现了BlockingQueue接口,BlockingQueue接口继承自java.util.Queue接口,并在这个接口的基础上增加了take和put方法,这两个方法正是队列操作的阻塞版本。
由于LinkedBlockingQueue实现是线程安全的,实现了先进先出等特性,是作为生产者消费者的首选,LinkedBlockingQueue 可以指定容量,也可以不指定,不指定的话,默认最大是Integer.MAX_VALUE,其中主要用到put和take方法,put方法在队列满的时候会阻塞直到有队列成员被消费,take方法在队列空的时候会阻塞,直到有队列成员被放进来。
public class BlockingQueueTest2 { /** * * 定义装苹果的篮子 * */ public class Basket { // 篮子,能够容纳3个苹果 BlockingQueue<String> basket = new LinkedBlockingQueue<String>(3); // 生产苹果,放入篮子 public void produce() throws InterruptedException { // put方法放入一个苹果,若basket满了,等到basket有位置 basket.put("An apple"); } // 消费苹果,从篮子中取走 public String consume() throws InterruptedException { // take方法取出一个苹果,若basket为空,等到basket有苹果为止(获取并移除此队列的头部) return basket.take(); } } // 定义苹果生产者 class Producer implements Runnable { private String instance; private Basket basket; public Producer(String instance, Basket basket) { this.instance = instance; this.basket = basket; } public void run() { try { while (true) { // 生产苹果 System.out.println("生产者准备生产苹果:" + instance); basket.produce(); System.out.println("!生产者生产苹果完毕:" + instance); // 休眠300ms Thread.sleep(300); } } catch (InterruptedException ex) { System.out.println("Producer Interrupted"); } } } // 定义苹果消费者 class Consumer implements Runnable { private String instance; private Basket basket; public Consumer(String instance, Basket basket) { this.instance = instance; this.basket = basket; } public void run() { try { while (true) { // 消费苹果 System.out.println("消费者准备消费苹果:" + instance); System.out.println(basket.consume()); System.out.println("!消费者消费苹果完毕:" + instance); // 休眠1000ms Thread.sleep(1000); } } catch (InterruptedException ex) { System.out.println("Consumer Interrupted"); } } } public static void main(String[] args) { BlockingQueueTest2 test = new BlockingQueueTest2(); // 建立一个装苹果的篮子 Basket basket = test.new Basket(); ExecutorService service = Executors.newCachedThreadPool(); Producer producer = test.new Producer("生产者001", basket); Producer producer2 = test.new Producer("生产者002", basket); Consumer consumer = test.new Consumer("消费者001", basket); service.submit(producer); service.submit(producer2); service.submit(consumer); // 程序运行5s后,所有任务停止 // try { // Thread.sleep(1000 * 5); // } catch (InterruptedException e) { // e.printStackTrace(); // } // service.shutdownNow(); } }