现在的位置: 首页 > 综合 > 正文

Java线程_06_wait/notify/synchronized

2013年08月02日 ⁄ 综合 ⁄ 共 4705字 ⁄ 字号 评论关闭

    wait/notify/synchronized

    线程等待/通知机制实现了多个线程基于特定条件的通信机制。

    线程等待/通知机制必须配合线程同步机制避免资源竞争(Race
    Condition
    )产生的数据异常。

    场景

    一、等待-通知机制细节(为什么需要引入同步机制?)

  1. 第一个thread验证通信条件未通知需要调用wait()
  2. 第二个thread改变通信条件
  3. 第二个thread调用notify(),此时第一个thread并未调用wait
  4. 第一个thread调用了wait()

此种情况是在没有加入同步机制的情况,如果不引入同步机制那么thread#1的处理逻辑就不够完备。

Java中使用wait()/notify()时约束必须在synchronized块中调用。

二、thread收到通知(通信条件为什么必须重新检验?)

  1. Thread#1调用取得同步lockmethod
  2. Thread#1检验通信条件不是所需的状态
  3. Thread#1调用wait()并释放lock
  4. Thread#2调用取得同一个lockmethod
  1. Thread#3调用同步lockmethod,但因为lock已被Thread#2占有并未释放,所以Thread#3是阻塞状态
  1. Thread#2改变通信条件并调用notifyAll
  2. Thread#2逻辑处理完成,释放lock
  1. Thread#3取得lock
  2. Thread#3验证通信条件,此时通信条件是所需的状态
  1. Thread#3进行逻辑处理并将通信条件重置
  2. Thread#3逻辑处理完成,释放lock
  3. Thread#1收到通知,此时通信条件不是所需的状态应该再次调用wait

Thread#1因为通信条件不成立而调用wait(),一段时间被调用notifyAll()的线程唤醒,但此时通信条件在notifyAll调用后与Thread#1被唤醒前可能已经改变,所以就要求Thread#1必须在唤醒后重新检查通信条件。

notify()/notifyAll()

当有多个thread在等待无法确定具体哪一个thread会在线程调用notify()后收到通知,这种状况与多个因素有关(JVM的实现、程序运行时调度)。

为什么会需要notifyAll(),这与通信条件有关,当有多个各自通信条件不相同的线程在等待,此时因为无法确认调用notify()之后到底哪个线程会被唤醒(并且当前通信条件并不能满足被唤醒线程自身所需的通信条件),所以通过notifyAll()唤醒所有thread

示例

1.等待-通知与sychronized

package org.ybygjy.thread;

/**
 * 等待-通知
 * @author WangYanCheng
 * @version 2012-11-18
 */
public class WaitAndNotify {
    // TODO 一个线程负责生产
    // TODO 二个线程负责消费
    // TODO 一个线程负责消费%2==0
    // TODO 一个线程负责消费%2==1
    public static void main(String[] args) {
        VariableQueue vqInst = new VariableQueue();
        new Producer(vqInst).start();
        new Consumer1(vqInst).start();
        new Consumer2(vqInst).start();
    }
}

class VariableQueue {
    private int value;

    /**
     * @return the value
     */
    public synchronized int getValue() {
        return value;
    }

    /**
     * @param value the value to set
     */
    public synchronized void setValue(int value) {
        this.value = value;
    }
}

class Producer extends Thread {
    private VariableQueue vqInst;

    public Producer(VariableQueue vqInst) {
        this.vqInst = vqInst;
    }

    public void run() {
        while (true) {
            synchronized (vqInst) {
                vqInst.setValue(vqInst.getValue() + 1);
                vqInst.notifyAll();
                try {
                    vqInst.wait(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

class Consumer1 extends Thread {
    private VariableQueue vqInst;

    public Consumer1(VariableQueue vqInst) {
        this.vqInst = vqInst;
    }

    public void run() {
        while (true) {
            synchronized (vqInst) {
                while (vqInst.getValue() % 2 == 0) {
                    System.out.println("模2余0==>".concat(String.valueOf(vqInst.getValue())));
                    try {
                        vqInst.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }
}

class Consumer2 extends Thread {
    private VariableQueue vqInst;

    public Consumer2(VariableQueue vqInst) {
        this.vqInst = vqInst;
    }

    public void run() {
        while (true) {
            synchronized (vqInst) {
                while (vqInst.getValue() % 2 == 1) {
                    System.out.println("模2余1==>".concat(String.valueOf(vqInst.getValue())));
                    try {
                        vqInst.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }
}

 

2.条件变量

package org.ybygjy.thread;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * 等待-通知
 * @author WangYanCheng
 * @version 2012-11-18
 */
public class ConditionVariable {
    private VariableQueue vqInst;

    public ConditionVariable() {
        vqInst = new VariableQueue();
    }

    // TODO 1.一个线程负责生产
    // TODO 2.二个线程负责消费
    // TODO 2.1一个线程负责消费%2==0
    // TODO 2.2一个线程负责消费%2==1
    public static void main(String[] args) {
        ConditionVariable cvInst = new ConditionVariable();
        Lock lock = new ReentrantLock();
        Condition condition = lock.newCondition();
        new Thread(cvInst.new Producer(lock, condition)).start();
        new Thread(cvInst.new Consumer1(lock, condition)).start();
        new Thread(cvInst.new Consumer2(lock, condition)).start();
    }

    class VariableQueue {
        private int value;

        /**
         * @return the value
         */
        public int getValue() {
            return value;
        }

        /**
         * @param value the value to set
         */
        public void setValue(int value) {
            this.value = value;
        }
    }

    class Producer extends Thread {
        private Lock lock;
        private Condition condition;

        public Producer(Lock lock, Condition condition) {
            this.lock = lock;
            this.condition = condition;
        }

        public void run() {
            try {
                lock.lock();
                while (true) {
                    vqInst.setValue(vqInst.getValue() + 1);
                    condition.signalAll();
                    condition.await();
                    // condition.await(500, TimeUnit.MILLISECONDS);
                    sleep(500);
                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }
    }

    class Consumer1 extends Thread {
        private Lock lock;
        private Condition condition;

        public Consumer1(Lock lock, Condition condition) {
            this.lock = lock;
            this.condition = condition;
        }

        public void run() {
            try {
                lock.lock();
                while (true) {
                    if (vqInst.getValue() % 2 == 0) {
                        System.out.println("模2余0==>".concat(String.valueOf(vqInst.getValue())));
                        condition.signal();
                    }
                    condition.await();
                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }
    }

    class Consumer2 extends Thread {
        private Lock lock;
        private Condition condition;

        public Consumer2(Lock lock, Condition condition) {
            this.lock = lock;
            this.condition = condition;
        }

        public void run() {
            try {
                lock.lock();
                while (true) {
                    if (vqInst.getValue() % 2 == 1) {
                        System.out.println("模2余1==>".concat(String.valueOf(vqInst.getValue())));
                        condition.signal();
                    }
                    condition.await();
                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }
    }
}

 

后续

当前示例代码中关于条件变量相关并未实际应用,所以并不一定完全正确,还有待检验。

资料

  1. Java线程3th
  2. http://en.wikipedia.org/wiki/Race_condition

 

 

抱歉!评论已关闭.