现在的位置: 首页 > 综合 > 正文

CountDownLatch and CyclicBarrier

2013年01月26日 ⁄ 综合 ⁄ 共 4842字 ⁄ 字号 评论关闭

CountDownLatch和CyclicBarrier,都是java.util.concurrent包中同步辅助类,常用的应用场景就是多线程运行计数开关,在指定个数的线程运行完成后,才可去做某件事情。CountDownLatch通过countDown()方法减少执行完成的线程数,而CyclicBarrier是通过await()方法来唤醒等待的线程。以下分别是他们的JavaDoc:

http://www.rritw.com/my/api/jdk-6u10-api-zh/java/util/concurrent/CountDownLatch.html

http://www.rritw.com/my/api/jdk-6u10-api-zh/java/util/concurrent/CyclicBarrier.html

先来看一个关于CyclicBarrier的一个示例,该示例来自于:http://xijunhu.iteye.com/blog/713433

import java.util.Random;
import java.util.concurrent.CyclicBarrier;

/** */
/**
 * CyclicBarrier类似于CountDownLatch也是个计数器, 不同的是CyclicBarrier数的是调用了CyclicBarrier.await()进入等待的线程数, 当线程数达到了CyclicBarrier初始时规定的数目时,所有进入等待状态的线程被唤醒并继续。 CyclicBarrier就象它名字的意思一样,可看成是个障碍, 所有的线程必须到齐后才能一起通过这个障碍。 CyclicBarrier初始时还可带一个Runnable的参数, 此Runnable任务在CyclicBarrier的数目达到后,所有其它线程被唤醒前被执行。
 */
public class CyclicBarrierTest {

	public static class ComponentThread implements Runnable {
		CyclicBarrier barrier;// 计数器
		int ID; // 组件标识
		int[] array; // 数据数组

		// 构造方法
		public ComponentThread(CyclicBarrier barrier, int[] array, int ID) {
			this.barrier = barrier;
			this.ID = ID;
			this.array = array;
		}

		public void run() {
			try {
				array[ID] = new Random().nextInt(100);
				//System.out.println("Component " + ID + " generates: " + array[ID]);
				// 在这里等待Barrier处
				//System.out.println("Component " + ID + " sleep");
				barrier.await();
				//System.out.println("Component " + ID + " awaked");
				// 计算数据数组中的当前值和后续值
				int result = array[ID] + array[ID + 1];
				System.out.println("Component " + ID + " result: " + result);
			} catch (Exception ex) {
			}
		}
	}

	/** */
	/**
	 * 测试CyclicBarrier的用法
	 */
	public static void testCyclicBarrier() {
		for (;;) {
			final int[] array = new int[3];
			CyclicBarrier barrier = new CyclicBarrier(2, new Runnable() {
				// 在所有线程都到达Barrier时执行,这个方法一定会在所有的等待线程被唤醒之前执行
				public void run() {
					//System.out.println("testCyclicBarrier run");
					array[2] = array[0] + array[1];
				}
			});

			// 启动线程
			new Thread(new ComponentThread(barrier, array, 0)).start();
			new Thread(new ComponentThread(barrier, array, 1)).start();
			try {
				Thread.sleep(1000);
			} catch (InterruptedException e) {}
		}
	}

	public static void main(String[] args) {
		CyclicBarrierTest.testCyclicBarrier();
	}
}

在这个示例中CyclicBarrier声明了两个parites(线程参与者),以及当两个parties的await()都执行后需要执行的一个Runnable命令,它的实现原理就是每个party的await()方法调用时,都去把当前parties的数量减一,然后看当前的parties是否是已经为0,如果为0了,那就执行待执行的runnable线程,否则就进行条件等待(Condition.await()),直到被唤醒为止,被唤醒的线程需要重新获取锁定,也就是说parties的执行顺序就由获取锁定的顺序而确定了,而不一定是谁先执行到await()方法谁先被唤醒。以下是dowait方法中的判断是否所有的parties都已经了await()方法并确定是否执行Runnable的代码:

int index = --count;
//检查当前的parties是否是已经全部都执行了await()方法
if (index == 0) {  // tripped
   boolean ranAction = false;
   try {
	   final Runnable command = barrierCommand;
	   //执行command的方法
	   if (command != null)
	       command.run();
	   ranAction = true;
	   //唤醒其它其它因为条件不满足(Condition.await())而等待的线程,通过调用Condition.singalAll方法实现,效果同调用获取了锁定方法中的notifyAll()一样
	   nextGeneration();
	   return 0;
   } finally {
       if (!ranAction)
           breakBarrier();
   }
}

方法testCyclicBarrier加了无线循环,可以看到输出component 0和1的输出,会出现交替的情况,这就是因为重新获取锁定的竟争的结果。

这里需要明确一点,Runnable也即barrierCommand一定会在所有的parties都执行完await()立即执行,其它parties中await()之后的代码逻辑一定会等到barrierCommand执行完后才继续执行。

这个和CountDownLatch的实现是不同的,CountDownLatch是在countDown为0的时候,可以执行待执行线程await()之后的代码,但是parties并不会等待它的执行,这里主要是由CPU的分配逻辑确定谁先运行了。下面是通过CountDownLatch对上面的CyclicBarrier的另外一种实现,不过因为他们的实现原理有一点差别,不能够完全达到相同的效果,

import java.util.Random;
import java.util.concurrent.CountDownLatch;

/**
 * CountDownLatch同步辅助类示列,可以简单的理解为计数器。
 */
public class CountDownLatchTest {

	public static class ComponentThread implements Runnable {
		CountDownLatch latch;// 计数器
		int ID; // 组件标识
		int[] array; // 数据数组

		// 构造方法
		public ComponentThread(CountDownLatch latch, int[] array, int ID) {
			this.latch = latch;
			this.ID = ID;
			this.array = array;
		}

		public void run() {
			try {
				array[ID] = new Random().nextInt(100);
				System.out.println("Component " + ID + " generates: " + array[ID]);
				latch.countDown();
				System.out.println("Component " + ID + " awaked");
				// 计算数据数组中的当前值和后续值
				int result = array[ID] + array[ID + 1];
				System.out.println("Component " + ID + " result: " + result);
			} catch (Exception ex) {
			}
		}
	}

	/** */
	/**
	 * 测试用法
	 */
	public static void test() {
		for (;;) {
			final int[] array = new int[3];
			final CountDownLatch latch = new CountDownLatch(2);
			Thread thread = new Thread(new Runnable() {
				@Override
				public void run() {
					try {
						latch.await();
					} catch (InterruptedException e) {
					}
					System.out.println("Do Add action");
					array[2] = array[0] + array[1];
				}
			});
			thread.start();

			// 启动线程
			new Thread(new ComponentThread(latch, array, 0)).start();
			new Thread(new ComponentThread(latch, array, 1)).start();
			try {
				Thread.sleep(1000);
			} catch (InterruptedException e) {
			}
		}
	}

	public static void main(String[] args) {
		CountDownLatchTest.test();
	}
}

在CountDownLatch,是通过无限循环来判断是否所有的parties都执行了countDown(),如果都执行了或者被中断了就跳出await()方法,继续继续下面的代码,以下是由await()方法调用的、实现判断的方法:

/**
     * Acquires in shared interruptible mode.
     * @param arg the acquire argument
     */
    private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.SHARED);
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    break;
            }
        } catch (RuntimeException ex) {
            cancelAcquire(node);
            throw ex;
        }
        // Arrive here only if interrupted
        cancelAcquire(node);
        throw new InterruptedException();
    }

抱歉!评论已关闭.