现在的位置: 首页 > 综合 > 正文

聊聊高并发(三十一)解析java.util.concurrent各个组件(十三) 理解Exchanger交换器

2017年12月19日 ⁄ 综合 ⁄ 共 4867字 ⁄ 字号 评论关闭

这篇讲讲Exchanger交互器,它是一种比较特殊的两方(Two-Party)栅栏,可以理解成Exchanger是一个栅栏,两边一方是生产者,一方是消费者,

1. 生产者和消费者各自维护了一个容器,生产者往容器里生产东西,消费者从容器里消费东西。

2. 当生产者的容器是满的时候,它需要通过Exchanger向消费者交换,把满的容器交换给消费者,从消费者手里拿到空的容器继续生产。

3. 当消费者的容器是空的时候,它需要通过Exchanger向生产者交换,把空的容器交换给生产者,从生产者手里拿到满的容器继续消费。

所以我们看到这个过程中至少有5个组件

1. Exchanger栅栏

2. 生产者

3. 消费者

4. 生产者的容器

5. 消费者的容器

更复杂的情况是生产者有多个人在生产,消费者有多个人在消费,每个人都有自己的容器。这里有一个隐含的意思是生产者和消费者不挑容器,只要是空的或者满的都能用。Exchanger的匹配是根据Hash来的,所以可能出现不同的人生产者或消费者对应到同一个Hash值。

Exchanger使用了Slot槽来表示一个位置,生产者和消费者都可以被Hash到一个槽中。

private static final class Slot extends AtomicReference<Object> {
        // Improve likelihood of isolation on <= 64 byte cache lines
        long q0, q1, q2, q3, q4, q5, q6, q7, q8, q9, qa, qb, qc, qd, qe;
    }

    /**
     * Slot array.  Elements are lazily initialized when needed.
     * Declared volatile to enable double-checked lazy construction.
     */
    private volatile Slot[] arena = new Slot[CAPACITY];

创建了一个内部类Node来封装要交互者的线程和要交换的容器

private static final class Node extends AtomicReference<Object> {
        /** The element offered by the Thread creating this node. */
        public final Object item;

        /** The Thread waiting to be signalled; null until waiting. */
        public volatile Thread waiter;

        /**
         * Creates node with given item and empty hole.
         * @param item the item
         */
        public Node(Object item) {
            this.item = item;
        }
    }

算法的主要部分就是交换的过程,下面简单说说交互的逻辑

1. 先根据当前线程的id计算出一个Hash值作为索引index

2. 然后轮询,如果index对应的Slot槽是null就生成一个,表示还没有人使用这个槽位

3. 如果对应的Slot已经有线程了,并且CAS设置它为null也成功了,表示生产者和消费者匹配上了,再通过CAS把自己的item设置给对方Node引用,然后把之前等待的一方唤醒,把对方Node里面的item返回给自己。这样相当于后来者拿到了之前等待者的item,并把后来者自己的item设置成了之前等待者的Node引用

当先来者被从自旋状态唤醒后,会从自己的Node引用中获取item,如果非空并且不是CANCEL,就证明有人跟它交换了,也拿到了对方的item返回了,否则就是超时取消了

4. 如果对应的Slot没有线程,说明它是先来的那个,如果是0号位置的Slot,就进行阻塞,如果是非0的Slot,就自旋,直到超时或取消

5. 如果一个进入在它自己选择的槽上CAS失败,它选择一个供替代的槽。如果一个线程成功CAS到一个槽但没有其他线程到达,它尝试其他,前往 0 号槽

private Object doExchange(Object item, boolean timed, long nanos) {
        Node me = new Node(item);                 // Create in case occupying
        int index = hashIndex();                  // Index of current slot
        int fails = 0;                            // Number of CAS failures

        for (;;) {
            Object y;                             // Contents of current slot
            Slot slot = arena[index];
            if (slot == null)                     // Lazily initialize slots
                createSlot(index);                // Continue loop to reread
            else if ((y = slot.get()) != null &&  // Try to fulfill
                     slot.compareAndSet(y, null)) {
                Node you = (Node)y;               // Transfer item
                if (you.compareAndSet(null, item)) {
                    LockSupport.unpark(you.waiter);
                    return you.item;
                }                                 // Else cancelled; continue
            }
            else if (y == null &&                 // Try to occupy
                     slot.compareAndSet(null, me)) {
                if (index == 0)                   // Blocking wait for slot 0
                    return timed ?
                        awaitNanos(me, slot, nanos) :
                        await(me, slot);
                Object v = spinWait(me, slot);    // Spin wait for non-0
                if (v != CANCEL)
                    return v;
                me = new Node(item);              // Throw away cancelled node
                int m = max.get();
                if (m > (index >>>= 1))           // Decrease index
                    max.compareAndSet(m, m - 1);  // Maybe shrink table
            }
            else if (++fails > 1) {               // Allow 2 fails on 1st slot
                int m = max.get();
                if (fails > 3 && m < FULL && max.compareAndSet(m, m + 1))
                    index = m + 1;                // Grow on 3rd failed slot
                else if (--index < 0)
                    index = m;                    // Circularly traverse
            }
        }
    }

更多Exchanger算法的细节请参考这篇

http://coderbee.net/index.php/concurrent/20140424/897

下面用一个测试用例来测试Exchanger的功能。最简单的一个Exchanger的使用场景有5个组件

1个Exchanger, 1个生产者,1个生产者容器,1个消费者,1个消费者容器

当生产者把自己的容器生产满了,就在Exchanger栅栏处等待消费者拿空的容器和它交换

当消费者把自己的容器消费空了,就在Exchanger栅栏处等待生产者拿满的容器和它交换

package com.lock.test;

import java.util.concurrent.Exchanger;

public class ExchangerUsecase {
	private static Exchanger<Buffer<Integer>> exchanger = new Exchanger<Buffer<Integer>>();
	private static Buffer<Integer> emptyBuffer = new Buffer<Integer>();
	private static Buffer<Integer> fullBuffer = new Buffer<Integer>();
	
	private static class Buffer<T>{
		private T[] cache = (T[])(new Object[2]);
		private int index = 0;
		
		public void add(T item){
			cache[index++] = item;
		}
		
		public T take(){
			return cache[--index];
		}
		
		public boolean isEmpty(){
			return index == 0;
		}
		
		public boolean isFull(){
			return index == cache.length;
		}
	}
	
	public static void main(String[] args){
		Runnable provider = new Runnable(){
			Buffer<Integer> currentBuffer = emptyBuffer;
			private int exchangeCount = 0;
			@Override
			public void run() {
				while(currentBuffer != null && exchangeCount <= 1){
					if(!currentBuffer.isFull()){
						System.out.println("Provider added one item");
						currentBuffer.add(1);
					}else{
						try {
							currentBuffer = exchanger.exchange(currentBuffer);
							exchangeCount ++;
							Thread.sleep(2000);
						} catch (InterruptedException e) {
							e.printStackTrace();
						}
					}
				}
				
			}
			
		};
		
		Runnable consumer = new Runnable(){
			Buffer<Integer> currentBuffer = fullBuffer;
			private int exchangeCount = 0;
			@Override
			public void run() {
				while(currentBuffer != null  && exchangeCount <= 2){
					if(!currentBuffer.isEmpty()){
						System.out.println("Consumer took one item");
						currentBuffer.take();
					}else{
						try {
							currentBuffer = exchanger.exchange(currentBuffer);
							exchangeCount ++;
						} catch (InterruptedException e) {
							e.printStackTrace();
						}
					}
				}
				
			}
			
		};
		
		new Thread(provider).start();
		new Thread(consumer).start();
	}
}

private static Object spinWait(Node node, Slot slot) {
        int spins = SPINS;
        for (;;) {
            Object v = node.get();
            if (v != null)
                return v;
            else if (spins > 0)
                --spins;
            else
                tryCancel(node, slot);
        }
    } 

测试结果显示生产者先生成了两个,然后满了,就等待消费者和它交换。交换后消费者消费了两个,再次等待交换。生产者又生成满了一次,再次交换。如果不设置退出机制,双方会一直生产和消费下去,所以在测试用例中限制了交换两次

Provider added one item
Provider added one item
Consumer took one item
Consumer took one item
Provider added one item
Provider added one item
Consumer took one item
Consumer took one item

抱歉!评论已关闭.