1. 包介绍:java并发编程包括三个包java.util.concurrent、java.util.concurrent.atomic、java.util.concurrent.locks,分别是并发、原子、锁。
2. 如何解决并发:在单线程模式中如同一条流水线,从头到尾依此执行,多线程就是并发的去执行。拿做鞋子来比喻:单线程会先做鞋底,再做鞋面,再组装。多线程会开三条线程,一条做鞋底,一条做鞋面,一条组装。那种模式更加有效率?并发!那种模式更加符合实际的生产方式?并发!可见并发确实是好,但是大家好才是真的好。再拿做鞋子比喻,做鞋底的流水线做了十只鞋底,而做鞋面的流水线却只做了七只鞋面,这时组装的流水线就不好了。如何解决并发带来的问题?我们发现单线程模式是没有并发问题的(废话),答案就在废话里,现实告诉我们鱼与熊掌是可以兼得的,想要享受多线程带来的高效又不必忍受并发的煎熬这需要一点技巧。单线程是依次执行,现实中如何保持次序?排队。在编程中排队叫队列(queue),并发问题的解决方案中都不外乎队列,比如上面做鞋子的问题我们可以这样开一个队列:做鞋子和鞋面作为一个步骤先进入队列,组装作为第二个步骤后进入队列。根据先进先出(FIFO)原则,流水线永远会这样开,做N个鞋子鞋面完成后再组装N只鞋子,如果你设置N=1效率还会再提高。
3. 相关概念:为了理解并发,需要了解以下几个概念
a. 内存同步:共享的数据在任一线程中其值都是相同的,因为数据在内存中会缓存的原因一个共享数据在每个线程中都有自己的副本,这样做是为了提高效率,但是这会出现内存中数据不同步的情况,这是因为副本更新没那么快。
b. 可重入与打断:线程执行到代码的某行被打断了,这个线程就会在此休眠下来,当这个线程因为某种原因如获得了锁被唤醒时,这个线程从上次被打断的地方进入,就叫可重入。
c. 加锁、占有锁与释放锁:加锁就像是告诉别人这是我的不准别人用。淘宝上有买B加锁的装置,这可以打一个非常形象的比喻:老公给老婆装了锁,老公当然是有钥匙的,他可以打开锁嘿咻。这个过程中开锁是加锁,强势插入叫做占有锁,完事后锁上叫释放锁。如果老婆有个情人,他趁老公不在的时候也偷偷配了钥匙,所以他也可以打开锁嘿咻。但是他必须等老公不在的时候才可以上(获得锁),这样就能有效阻止并发。
d. 线程休眠与唤醒:休眠与唤醒好像在可重入中已经讲了。
e. 公平参数:公平参数会使等待锁释放最久的线程最先会唤起,这和队列最先进入的在队列中时间最长有相似之处。
4.volatile修饰符与synchronized关键字:volatile是易变的意思,synchronized是同步的意思。volatile声明的变量能保持在所有线程中数据同步,因为前面提到的内存同步概念,这个修饰符会使线程不保留自己的副本,而是直接指向真正的对象,volatile对效率的影响并不是很大。synchronized通过给对象加锁达到线程同步的目的,所有没有获得锁的对象都会排队等候锁的释放,这样的操作对效率影响非常大。
5. java.util.concurrent工具包:这个包下有线程执行池、队列、并发集、纳秒工具类等,此处因为讲并发编程的原因只讲同步器。同步器包括四个类Semaphore、CountDownLatch、CyclicBarrier、Exchanger,这四个类分别适用于不同的并发类型。理解他们可以总结他们的两个共同的:1.设置屏障,不让其他线程通过2.解除屏障,其他线程可通过,下面分别论述这四种同步器
Semaphore:发布特定数量通行证的同步器,每个线程通过acquire()方法请求通行证,Semaphore发放一个通行证,如果通行证发放完了,就设置屏障,不再让其他线程通过,这些线程就会休眠。通过release()交还通行证,这时Semaphonre又有了通行证发放了,就解除屏障,让等待的线程通过。通过构造方法需传入一个int类型的值来设置发布多少个通行证。
public class ResourcePool { ArrayList<Object> pool=null;//资源池 Semaphore pass=null;//通行证 public ResourcePool(int size){ pool=new ArrayList<>(); for(int i=0;i<size;i++){ pool.add("resours:"+i); pass=new Semaphore(1); } } public String get() throws InterruptedException { // 获取通行证,只有得到通行证后才能得到资源 System.out.println("尝试获取通行证"); pass.acquire();//请求通行证 System.out.println("得到通行证"); String result = (String) pool.remove(0); System.out.println("资源 " + result + " 被取走"); return result; } public void put(String resource) { // 归还通行证,并归还资源 System.out.println("归还通行证"); pass.release(); System.out.println("资源 " + resource + " 被归还"); pool.add(resource); } public static void testPool() { final ResourcePool pool = new ResourcePool(10); Runnable worker = new Runnable() { public void run() { String resource = null; try { //取得resource resource = pool.get(); //使资源被占用一段时间 Thread.sleep(1000); } catch (InterruptedException ex) { } //归还resource pool.put(resource); } }; // 启动12个任务 for (int i = 0; i < 12; i++) { new Thread(worker).start(); } } }
CountDownLatch:等待所有线程准备完成的的同步器:CountDownLatch通过构造方法传入计数器。CountDownLatch通过await()方法让当前线程在此等待既休眠。通过countDown()方法使得计数器减一,当计数器减到0时就自动唤醒所以等待的线程。
public class CountDownLatchWorker implements Runnable{ private final CountDownLatch doneSingal; private final int i; public CountDownLatchWorker2(CountDownLatch doneSingal,int i) { super(); this.doneSingal = doneSingal; this.i=i; } @Override public void run() { // TODO Auto-generated method stub System.out.println("执行第"+i+"部分"); doneSingal.countDown();//计数器减一,并在此等待 } public static void testCountDownLatchWorker2() throws InterruptedException{ int N=10; CountDownLatch countDownLatch=new CountDownLatch(N); ExecutorService executor=Executors.newCachedThreadPool(); for(int i=0;i<N;i++){ executor.execute(new CountDownLatchWorker2(countDownLatch, i)); } System.out.println("await befer"); countDownLatch.await();// 主线程会在此等待 System.out.println("await end"); System.out.println("所有的子线程执行完毕,可以通过await"); executor.shutdown(); } }
CyclicBarrier:可重置计数器的同步器:这个同步器和CountDownLatch类似,不同的是他有一个reset()方法可以使得计数器恢复到初始状态。当然用法也不相同,CyclicBarrier只需要调用await()方法就可以使得计数器减一并使当前线程在此等待,当计数器为0时唤醒所有等待的线程。其中他有一个特殊的构造方法CyclicBarrier(intparties,Runnable
barrierAction),当计数器为时会执行barrierAction。这个同步器非常适合解决上面做鞋子的问题,织线程会等待做鞋底和鞋面都完成后再执行。
public class CyclicBarrierWorker implements Runnable{ private CyclicBarrier barrier;// int i=0; public CyclicBarrierWorker(CyclicBarrier barrier,int i) { super(); this.i=i; this.barrier = barrier; } @Override public void run() { // TODO Auto-generated method stub System.out.println("执行第"+i+"个任务"+barrier.getNumberWaiting()); try { barrier.await();//通知任务已经完成,并且计数器减一 } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (BrokenBarrierException e) { // TODO Auto-generated catch block e.printStackTrace(); } } public static void testCylicBarrier() { // TODO Auto-generated method stub int N=4; CyclicBarrier barrier=new CyclicBarrier(N, new Runnable() { @Override public void run() { // TODO Auto-generated method stub //改线程会等待所有的子任务执行完毕 //当计数器会0时,该线程会执行 System.out.println("子任务全部执行完毕,总任务执行"); } }); for(int i=0;i<barrier.getParties();i++){ new Thread(new CyclicBarrierWorker(barrier, i)).start(); } } }
Exchanger:两个线程执行到交换点会互相等待当两个线程都拿到另一线程的交换值时就会重入。exchange(V
x)会等待另一个线程的值传过来。在值未传过来之前他不会执行到下一步。
public class MyExchanger { Exchanger<String> exchanger=new Exchanger<String>(); class A implements Runnable{ @Override public void run() { // TODO Auto-generated method stub try { String s1="我是壹"; System.out.println("start A"); String s=exchanger.exchange(s1);//现在在此等待B的结过 System.out.println(s1+" ex:"+s); System.out.println("end A"); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } class B implements Runnable{ @Override public void run() { // TODO Auto-generated method stub try { String s2="i am a two"; System.out.println("start B"); String s= exchanger.exchange(s2);//线程在中断,等待A线程结果 System.out.println(s2+" ex:"+s); System.out.println("end B"); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } //测试效果,可以在exchange方法处打上断点看效果 public void tesetExchanger(){ new Thread(new A()).start(); new Thread(new B()).start(); } }