现在的位置: 首页 > 综合 > 正文

消费者模式的三种实现方式

2018年01月26日 ⁄ 综合 ⁄ 共 5953字 ⁄ 字号 评论关闭

1、使用synchronized、wait和notify

package ProductAndConsume;  
  
import java.util.List;  
  
public class Consume implements Runnable{  
    private List container = null;  
    private int count;  
    public Consume(List lst){  
     this.container = lst;  
    }  
 public void run() {  
    
  while(true){  
   synchronized (container) {  
    if(container.size()== 0){  
     try {  
      container.wait();
     } catch (InterruptedException e) {  
      e.printStackTrace();  
     }  
    }  
    try {  
     Thread.sleep(100);  
    } catch (InterruptedException e) {  
      e.printStackTrace();  
    }  
    container.remove(0);  
    container.notify();
    System.out.println("我吃了"+(++count)+"个");  
   }  
  }  
    
 }  
  
}  
  
package ProductAndConsume;  
  
import java.util.List;  
  
public class Product implements Runnable {  
 private List container = null;  
    private int count;  
 public Product(List lst) {  
  this.container = lst;  
 }  
  
 public void run() {  
  while (true) {  
   synchronized (container) {  
    if (container.size() > MultiThread.MAX) {  
     try {  
      container.wait();
     } catch (InterruptedException e) {  
      e.printStackTrace();  
     }  
    }  
    try {  
     Thread.sleep(100);  
    } catch (InterruptedException e) {  
     e.printStackTrace();  
    }  
    container.add(new Object());  
    container.notify();
    System.out.println("我生产了"+(++count)+"个");  
   }  
  }  
  
 }  
  
}  
  
package ProductAndConsume;  
  
import java.util.ArrayList;  
import java.util.List;  
  
public class MultiThread {  
 private List container = new ArrayList();  
 public final static int MAX = 5;  
  
  
  public static void main(String args[]){  
  
 MultiThread m = new MultiThread();  
  
  new Thread(new Consume(m.getContainer())).start();  
  new Thread(new Product(m.getContainer())).start();  
  new Thread(new Consume(m.getContainer())).start();  
  new Thread(new Product(m.getContainer())).start();  
 }  
 public List getContainer() {  
  return container;  
 }  
  
 public void setContainer(List container) {  
  this.container = container;  
 }  

2、使用Conditon、await和signal

package com.meituan.hyt.test5;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;


public class Main {
    private List container = new ArrayList();
    public final static int MAX = 5;

    private final Lock lock = new ReentrantLock();
    private final Condition notFull = lock.newCondition();
    private final Condition notEmpty = lock.newCondition();


    public static void main(String[] args) {
        Main m = new Main();
        Product product = m.new Product();
        Consume consume = m.new Consume();
        new Thread(product).start();
        new Thread(consume).start();
    }


    class Consume implements Runnable {
        private int count;

        public void run() {
            while (true) {
//                try {
//                    Thread.sleep(1000);
//                } catch (InterruptedException e) {
//                    e.printStackTrace();
//                }
                lock.lock();
                try {
                    if (container.size() == 0) {
                        try {
                            System.out.println(Thread.currentThread().getName() + "队列空,我等待");
                            notEmpty.await();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    container.remove(0);
                    System.out.println(Thread.currentThread().getName() + "我消费了" + (++count) + "个");
                    notFull.signalAll();
                } finally {
                    lock.unlock();
                }
            }

        }

    }


    class Product implements Runnable {
        private int count;

        public void run() {
            while (true) {
//                try {
//                    Thread.sleep(1000);
//                } catch (InterruptedException e) {
//                    e.printStackTrace();
//                }
                lock.lock();
                try {
                    if (container.size() > MAX) {
                        try {
                            System.out.println(Thread.currentThread().getName() + "队列满,我等待");
                            notFull.await();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    container.add(new Object());
                    System.out.println(Thread.currentThread().getName() + "我生产了" + (++count) + "个");
                    notEmpty.signalAll();
                } finally {
                    lock.unlock();
                }
            }

        }
    }
}

运行结果:

Thread-1我生产了1个
Thread-1我生产了2个
Thread-1我生产了3个
Thread-1我生产了4个
Thread-1我生产了5个
Thread-1我生产了6个
Thread-1队列满,我等待
Thread-2我消费了1个
Thread-2我消费了2个
Thread-2我消费了3个
Thread-2我消费了4个
Thread-2我消费了5个
Thread-2我消费了6个
Thread-2队列空,我等待
Thread-1我生产了7个
Thread-1我生产了8个
Thread-1我生产了9个
Thread-1我生产了10个
Thread-1我生产了11个
Thread-1我生产了12个
Thread-1队列满,我等待
Thread-2我消费了7个

......

如果取消注释代码,运行结果如下,因为sleep给了其他线程抢夺锁的机会。

Thread-1我生产了1个
Thread-2我消费了1个
Thread-1我生产了2个
Thread-2我消费了2个
Thread-1我生产了3个
Thread-2我消费了3个
Thread-1我生产了4个
Thread-2我消费了4个
Thread-1我生产了5个
Thread-2我消费了5个
Thread-1我生产了6个
Thread-2我消费了6个
Thread-1我生产了7个
Thread-2我消费了7个
Thread-1我生产了8个
Thread-2我消费了8个

......

3、使用jdk自带的数据结构LinkedBlockingQueue

LinkedBlockingQueue是一个线程安全的阻塞队列,它实现了BlockingQueue接口,BlockingQueue接口继承自java.util.Queue接口,并在这个接口的基础上增加了take和put方法,这两个方法正是队列操作的阻塞版本。

由于LinkedBlockingQueue实现是线程安全的,实现了先进先出等特性,是作为生产者消费者的首选,LinkedBlockingQueue 可以指定容量,也可以不指定,不指定的话,默认最大是Integer.MAX_VALUE,其中主要用到put和take方法,put方法在队列满的时候会阻塞直到有队列成员被消费,take方法在队列空的时候会阻塞,直到有队列成员被放进来。

public class BlockingQueueTest2 {
    /**
     * 
     * 定义装苹果的篮子
     * 
     */
    public class Basket {
        // 篮子,能够容纳3个苹果
        BlockingQueue<String> basket = new LinkedBlockingQueue<String>(3);

        // 生产苹果,放入篮子
        public void produce() throws InterruptedException {
            // put方法放入一个苹果,若basket满了,等到basket有位置
            basket.put("An apple");
        }

        // 消费苹果,从篮子中取走
        public String consume() throws InterruptedException {
            // take方法取出一个苹果,若basket为空,等到basket有苹果为止(获取并移除此队列的头部)
            return basket.take();
        }
    }

    // 定义苹果生产者
    class Producer implements Runnable {
        private String instance;
        private Basket basket;

        public Producer(String instance, Basket basket) {
            this.instance = instance;
            this.basket = basket;
        }

        public void run() {
            try {
                while (true) {
                    // 生产苹果
                    System.out.println("生产者准备生产苹果:" + instance);
                    basket.produce();
                    System.out.println("!生产者生产苹果完毕:" + instance);
                    // 休眠300ms
                    Thread.sleep(300);
                }
            } catch (InterruptedException ex) {
                System.out.println("Producer Interrupted");
            }
        }
    }

    // 定义苹果消费者
    class Consumer implements Runnable {
        private String instance;
        private Basket basket;

        public Consumer(String instance, Basket basket) {
            this.instance = instance;
            this.basket = basket;
        }

        public void run() {
            try {
                while (true) {
                    // 消费苹果
                    System.out.println("消费者准备消费苹果:" + instance);
                    System.out.println(basket.consume());
                    System.out.println("!消费者消费苹果完毕:" + instance);
                    // 休眠1000ms
                    Thread.sleep(1000);
                }
            } catch (InterruptedException ex) {
                System.out.println("Consumer Interrupted");
            }
        }
    }

    public static void main(String[] args) {
        BlockingQueueTest2 test = new BlockingQueueTest2();

        // 建立一个装苹果的篮子
        Basket basket = test.new Basket();

        ExecutorService service = Executors.newCachedThreadPool();
        Producer producer = test.new Producer("生产者001", basket);
        Producer producer2 = test.new Producer("生产者002", basket);
        Consumer consumer = test.new Consumer("消费者001", basket);
        service.submit(producer);
        service.submit(producer2);
        service.submit(consumer);
        // 程序运行5s后,所有任务停止
//        try {
//            Thread.sleep(1000 * 5);
//        } catch (InterruptedException e) {
//            e.printStackTrace();
//        }
//        service.shutdownNow();
    }

}

抱歉!评论已关闭.