现在的位置: 首页 > 综合 > 正文

聊聊高并发(三十二)实现一个基于链表的无锁Set集合

2017年11月09日 ⁄ 综合 ⁄ 共 12556字 ⁄ 字号 评论关闭

Set表示一种没有重复元素的集合类,在JDK里面有HashSet的实现,底层是基于HashMap来实现的。这里实现一个简化版本的Set,有以下约束:

1. 基于链表实现,链表节点按照对象的hashCode()顺序由小到大从Head到Tail排列。

2. 假设对象的hashCode()是唯一的,这个假设实际上是不成立的,这里为了简化实现做这个假设。实际情况是HashCode是基于对象地址进行的一次Hash操作,目的是把对象根据Hash散开,所以可能有多个对象地址对应到一个HashCode,也就是哈希冲突,要做冲突的处理。常见的处理就是使用外部拉链法,在对应的Hash的节点再创建1个链表,相同HashCode的节点在这个链表上排列,再根据equals()方法来识别是否是同一个节点。这个是HashMap的基本原理。

有了以上假设,这篇从最简单的加锁的方式到最终的无锁展示一下如何一步一步演进的过程,以及最终无锁实现要考虑的要点。

1. 粗粒度锁

2. 细粒度锁

3. 乐观锁

4. 懒惰锁

5. 无锁

先看一下Set接口,1个添加方法,1个删除方法,1个检查方法

package com.lock.test;

public interface Set<T> {
	public boolean add(T item);
	
	public boolean remove(T item);
	
	public boolean contains(T item);
}

第一个实现的是使用粗粒度的锁,就是对整个链表加锁,这样肯定是线程安全的,但是效率肯定是最差的

链表节点类的定义,三个元素

1. 加入节点的元素

2. next指针指向下一个节点,明显这是个单链表结构

3. key表示item的HashCode

class Node<T>{
		T item;
		Node<T> next;
		int key;
		
		public Node(T item){
			this.item = item;
			this.key = item.hashCode();
		}
		
		public Node(){}
	}

粗粒度链表的实现

1. 链表维护了一个头节点,头节点始终指向最小的HashCode,头节点初始的next指针指向最大的HashCode,表示尾节点,整个链表是按照HashCode从小往大排列

2. 链表维护了一个整体的锁,add, remove, contains都加锁,保证线程安全,简单粗暴,但是效率低下

class CoarseSet<T> implements Set<T>{
		private final Node<T> head;
		private java.util.concurrent.locks.Lock lock = new ReentrantLock();
		
		public CoarseSet(){
			head = new Node<T>();
			head.key = Integer.MIN_VALUE;
			Node<T> MAX = new Node<T>();
			MAX.key = Integer.MAX_VALUE;
			head.next = MAX;
		}
		
		public boolean add(T item){
			Node<T> pred, curr;
			int key = item.hashCode();
			lock.lock();
			try{
				pred = head;
				curr = head.next;
				while(curr.key < key){
					pred = curr;
					curr = curr.next;
				}
				if(curr.key == key){
					return false;
				}
				Node<T> node = new Node<T>(item);
				node.next = curr;
				pred.next = node;
				return true;
			}finally{
				lock.unlock();
			}
		}
		
		public boolean remove(T item){
			Node<T> pred, curr;
			int key = item.hashCode();
			lock.lock();
			try{
				pred = head;
				curr = head.next;
				while(curr.key < key){
					pred = curr;
					curr = curr.next;
				}
				if(curr.key == key){
					pred.next = curr.next;
					curr.next = null;
					return true;
				}
				return false;
			}finally{
				lock.unlock();
			}
		}
		
		public boolean contains(T item){
			Node<T> pred, curr;
			int key = item.hashCode();
			lock.lock();
			try{
				pred = head;
				curr = head.next;
				while(curr.key < key){
					pred = curr;
					curr = curr.next;
				}
				return curr.key == key;
			}finally{
				lock.unlock();
			}
		}
	}

对粗粒度链表的优化可以细化锁的粒度,粗粒度锁的问题在于使用了一把锁锁住了整个链表,那么可以使用多把锁,每个节点维护一把锁,这样单个节点上锁理论上不会影响其他节点。

单链表最简单add操作需要做两步

1. 把新加入节点的next指向当前节点

2. 把前驱节点的next指向新加入的节点

node.next = curr;
pred.next = node

单链表的删除操作只需要做一步

pred.next = curr.next

如果使用细粒度锁的话,添加和删除操作要同时锁住两个节点才能保证添加和删除的正确性,不然有可能添加进来的节点指向了一个已经被删除的节点。所以需要同时控制两把锁,这也叫锁耦合,需要先获取前驱节点的锁,再获取当前节点的锁。

由于这种锁的获取是按照从前往后的顺序获取的,是一个方向的,所以不会引起死锁的问题。

死锁有两种:

1. 由获取锁的顺序引起的,顺序形成了环

2. 由资源问题引起的

来看看细粒度锁的实现,首先是带锁的Node

class NodeWithLock<T>{
		T item;
		NodeWithLock<T> next;
		int key;
		java.util.concurrent.locks.Lock lock = new ReentrantLock();
		
		public NodeWithLock(T item){
			this.item = item;
			this.key = item.hashCode();
		}
		
		public NodeWithLock(){}
		
		public void lock(){
			lock.lock();
		}
		
		public void unlock(){
			lock.unlock();
		}
	}

细粒度链表实现的Set

1. 获取锁时,先获取pred锁,然后获取curr的锁

2. 当pred和curr的指针往后继节点移动时,要先释放pred锁,然后让pred指向curr,然后curr再指向next节点,然后curr再上锁,这样保证了同时前后两把锁存在

class FineList<T> implements Set<T>{
		private final NodeWithLock<T> head;
		
		public FineList(){
			head = new NodeWithLock<T>();
			head.key = Integer.MIN_VALUE;
			NodeWithLock<T> MAX = new NodeWithLock<T>();
			MAX.key = Integer.MAX_VALUE;
			head.next = MAX;
		}
		
		public boolean add(T item){
			NodeWithLock<T> pred, curr;
			int key = item.hashCode();
			head.lock();
			pred = head;
			try{
				curr = pred.next;
				curr.lock();
				try{
					while(curr.key < key){
						pred.unlock();
						pred = curr;
						curr = curr.next;
						curr.lock();
					}
					if(curr.key == key){
						return false;
					}
					NodeWithLock<T> node = new NodeWithLock<T>(item);
					node.next = curr;
					pred.next = node;
					return true;
				}finally{
					curr.unlock();
				}
			}finally{
				pred.unlock();
			}
		}
		
		public boolean remove(T item){
			NodeWithLock<T> pred, curr;
			int key = item.hashCode();
			head.lock();
			pred = head;
			try{
				curr = pred.next;
				curr.lock();
				try{
					while(curr.key < key){
						pred.unlock();
						pred = curr;
						curr = curr.next;
						curr.lock();
					}
					if(curr.key == key){
						pred.next = curr.next;
						curr.next = null;
						return true;
					}
					return false;
				}finally{
					curr.unlock();
				}
			}finally{
				pred.unlock();
			}
		}
		
		public boolean contains(T item){
			NodeWithLock<T> pred, curr;
			int key = item.hashCode();
			head.lock();
			pred = head;
			try{
				curr = pred.next;
				curr.lock();
				try{
					while(curr.key < key){
						pred.unlock();
						pred = curr;
						curr = curr.next;
						curr.lock();
					}
					return curr.key == key;
				}finally{
					curr.unlock();
				}
			}finally{
				pred.unlock();
			}
		}
	}

细粒度的锁对粗粒度的锁有一定的优化,但是还存在问题:

当前面的节点被锁住时,后面的节点无法操作,必须等待前面的锁释放

所以一种自然而然的想法就是“只在需要加锁的时候再加锁”,也就是乐观锁

1. 只有在寻找到要加锁位置的时候才加锁,之前不加锁

2. 需要加锁时,先加锁,再进行验证是否现场已经被修改

3. 如果被修改就需要从头开始再次查找,是一个轮询的过程

所以乐观锁也是一个轮询检查的过程。

来看看检查的方法,检查的目的有两个:

1. 确认要操作的现场没有被修改,也就是说pred.next == curr

2. 确认pred和curr都是可以到达的,没有被物理删除,也就是node = node.next可以到达

private boolean validate(NodeWithLock<T> pred, NodeWithLock<T> curr){
			NodeWithLock<T> node = head;
			while(node.key <= pred.key){
				if(node.key == pred.key){
					return pred.next == curr;
				}
				node = node.next;
			}
			return false;
		}

再来看看基于乐观锁的Set实现

1. 先不加锁,直到找到要处理的现场,也就是while(curr.key < key)退出的地方。退出之后有两种情况,curr.key == key和curr.key > key。

2. 找到现场后,要对pred和curr都加锁,加锁顺序也是从前往后的顺序

3. 验证现场未被修改,然后进行操作,如果被修改了,就释放锁,再次从头节点开始轮询操作

class OptimisticSet<T> implements Set<T>{

		private final NodeWithLock<T> head;
		
		public OptimisticSet(){
			head = new NodeWithLock<T>();
			head.key = Integer.MIN_VALUE;
			NodeWithLock<T> MAX = new NodeWithLock<T>();
			MAX.key = Integer.MAX_VALUE;
			head.next = MAX;
		}
		
		@Override
		public boolean add(T item) {
			NodeWithLock<T> pred, curr;
			int key = item.hashCode();
			while(true){
				pred = head;
				curr = pred.next;
				
				while(curr.key < key){
					pred = curr;
					curr = curr.next;
				}
				pred.lock();
				curr.lock();
				try{
					if(validate(pred, curr)){
						if(curr.key == key){
							return false;
						}
						NodeWithLock<T> node = new NodeWithLock<T>(item);
						node.next = curr;
						pred.next = node;
						return true;
					}
				}finally{
					pred.unlock();
					curr.unlock();
				}
			}
		}

		@Override
		public boolean remove(T item) {
			NodeWithLock<T> pred, curr;
			int key = item.hashCode();
			while(true){
				pred = head;
				curr = pred.next;
				
				while(curr.key < key){
					pred = curr;
					curr = curr.next;
				}
				pred.lock();
				curr.lock();
				try{
					if(validate(pred, curr)){
						if(curr.key == key){
							pred.next = curr.next;
							curr.next = null;
							return true;
						}
						return false;
					}
				}finally{
					pred.unlock();
					curr.unlock();
				}
			}
		}

		@Override
		public boolean contains(T item) {
			NodeWithLock<T> pred, curr;
			int key = item.hashCode();
			while(true){
				pred = head;
				curr = pred.next;
				
				while(curr.key < key){
					pred = curr;
					curr = curr.next;
				}
				pred.lock();
				curr.lock();
				try{
					if(validate(pred, curr)){
						return curr.key == key;
					}
				}finally{
					pred.unlock();
					curr.unlock();
				}
			}
		}
		
		private boolean validate(NodeWithLock<T> pred, NodeWithLock<T> curr){
			NodeWithLock<T> node = head;
			while(node.key <= pred.key){
				if(node.key == pred.key){
					return pred.next == curr;
				}
				node = node.next;
			}
			return false;
		}
	}

乐观锁减少了很大的一部分锁的争用问题,但是它还是存在一定的锁的冲突,尤其是contains操作时也需要加锁。实际上如果contains一个节点要被删除的话,可以先标记成逻辑删除,再进行物理删除。因为要删除时是需要加锁的,所以最后的物理删除肯定会成功,如果先标记成逻辑删除,那么contains操作的时候其实是可以不需要锁的,如果它看到了节点被标记逻辑删除了,那么肯定就contains失败了,如果没有看到逻辑删除,那么表示在contains操作的时候是还没有被删除的,即使它可能已经被加锁准备删除了,但是contains看到的是一个状态的快照,当contains操作的时候确实是存在的,所以返回true也是正确的。

快照这个思想在并发编程中是很重要的思想,因为并发的存在,如果不使用同步手段,比如加锁,CAS操作,那么看到的都很可能是快照,是一瞬间的状态,不能完全根据这一瞬间的状态来决定后续操作,但是看到快照的那一瞬间的操作确实是成功的

所以可以通过把删除操作分为逻辑删除和物理删除两个节点来把contains操作的锁去掉,因为contains操作也是一个很频繁的操作。可以通过给节点添加一个状态来表示是否被删除,最简单的做法就是添加一个volatile字段保证状态的可见性

static class NodeWithLockAndMark<T>{
		T item;
		NodeWithLockAndMark<T> next;
		int key;
		java.util.concurrent.locks.Lock lock = new ReentrantLock();
                // 标记逻辑删除的状态   
                volatile boolean marked;
		
		public NodeWithLockAndMark(T item){
			this.item = item;
			this.key = item.hashCode();
		}
		
		public NodeWithLockAndMark(){}
		
		public void lock(){
			lock.lock();
		}
		
		public void unlock(){
			lock.unlock();
		}
	}

懒惰的Set实现是在乐观锁的基础上实现的,有了逻辑状态,validate方法就简化了,只需要判断现场的节点是否被标记删除了,并且现场未被修改过

private boolean validate(NodeWithLockAndMark<T> pred, NodeWithLockAndMark<T> curr){
			return !pred.marked && !curr.marked && pred.next == curr;
		}

看看懒惰Set的实现

1. add和remove操作和乐观锁的过程基本一致,只是在remove时,先标记节点的逻辑删除状态,再物理删除

2. contains方法可以去掉锁了,注意的是它也是保证快照的正确性

class LazySet<T> implements Set<T>{

		private final NodeWithLockAndMark<T> head;
		
		public LazySet(){
			head = new NodeWithLockAndMark<T>();
			head.key = Integer.MIN_VALUE;
			NodeWithLockAndMark<T> MAX = new NodeWithLockAndMark<T>();
			MAX.key = Integer.MAX_VALUE;
			head.next = MAX;
		}
		
		@Override
		public boolean add(T item) {
			NodeWithLockAndMark<T> pred, curr;
			int key = item.hashCode();
			while(true){
				pred = head;
				curr = pred.next;
				
				while(curr.key < key){
					pred = curr;
					curr = curr.next;
				}
				pred.lock();
				curr.lock();
				try{
					if(validate(pred, curr)){
						if(curr.key == key){
							return false;
						}
						NodeWithLockAndMark<T> node = new NodeWithLockAndMark<T>(item);
						node.next = curr;
						pred.next = node;
						return true;
					}
				}finally{
					pred.unlock();
					curr.unlock();
				}
			}
		}

		@Override
		public boolean remove(T item) {
			NodeWithLockAndMark<T> pred, curr;
			int key = item.hashCode();
			while(true){
				pred = head;
				curr = pred.next;
				
				while(curr.key < key){
					pred = curr;
					curr = curr.next;
				}
				pred.lock();
				curr.lock();
				try{
					if(validate(pred, curr)){
						if(curr.key == key){
							// logical remove Node, use volatile to make sure visibility
							curr.marked = true;
							pred.next = curr.next;
							curr.next = null;
							return true;
						}
						return false;
					}
				}finally{
					pred.unlock();
					curr.unlock();
				}
			}
		}

		@Override
		public boolean contains(T item) {
			NodeWithLockAndMark<T> curr = head;
			int key = item.hashCode();
			while(curr.key < key){
				curr = curr.next;
			}
			return curr.key == key && !curr.marked;
		}
		
		private boolean validate(NodeWithLockAndMark<T> pred, NodeWithLockAndMark<T> curr){
			return !pred.marked && !curr.marked && pred.next == curr;
		}
	}

最后来看看真正无锁的Set的实现,在懒惰Set实现中已经把contains方法的锁去了,无锁实现需要把add和remove中的锁也去掉。无锁的Set需要保证的是如果一个节点被标记逻辑删除了,那么它的next字段就不能被使用了 ,也就是不能被修改了。否则的话可能出现一个节点被标记逻辑删除了,但是其他现场没看到,还在继续使用它的next字段,这样就出现了无效的add或remove。

AtomicMarkableReference可以保证这个场景的原子性,它维护了marked状态和next引用的一个二元状态,这个二元状态的修改是原子的。AtomicMarkableReference的更多内容请看这篇聊聊高并发(二十)解析java.util.concurrent各个组件(二) 12个原子变量相关类

所以我们对Node再次进行改进,把next字段改成了AtomicMarkableReference

这里隐含了三个理解无锁实现的关键点

1. next的标记状态表示的是当前节点是否被逻辑删除,通过CAS判断这个状态可以知道当前节点是否被逻辑删除了

2. 通过CAS比较next的Reference就可以知道当前节点和next节点的物理关系是否被修改了,也是就是可以知道下一个节点是否被物理删除了

3. 由于next标记的状态是当前节点的状态,所以当前节点是无法知道下一个节点是否被逻辑删除了的。这个点很重要,因为无锁的添加可能会出现添加的节点的后续节点是一个已经被逻辑删除,但是还没有物理删除的节点

static class NodeWithAtomicMark<T>{
		T item;
		AtomicMarkableReference<NodeWithAtomicMark<T>> next = new AtomicMarkableReference<NodeWithAtomicMark<T>>(null, false);
		int key;
		
		public NodeWithAtomicMark(T item){
			this.item = item;
			this.key = item.hashCode();
		}
		
		public NodeWithAtomicMark(){}
		
	}

理解上面的3点是理解无锁实现的关键。前两点能保证节点知道自己是否被标记了,并且知道后继节点是否被删除了。第三点的存在要求无锁实现必须有一个单独的物理删除节点的过程。

所以抽取出了一个单独的类来寻找要操作的现场,并且寻找过程中物理地删除节点。

Position类表示要操作的现场,它返回pred和curr节点的指针。

findPosition方法从头节点往后寻找位置,边寻找边物理删除被标记逻辑删除的节点。最后返回的时候的Postion的curr.key >= 要操作节点的HashCode

class Position<T>{
		NodeWithAtomicMark<T> pred, curr;
		public Position(NodeWithAtomicMark<T> pred, NodeWithAtomicMark<T> curr){
			this.pred = pred;
			this.curr = curr;
		}
		
		public Position(){}
		
		public Position<T> findPosition(NodeWithAtomicMark<T> head, int key){
			boolean[] markHolder = new boolean[1];
			NodeWithAtomicMark<T> pred, curr, succ;
			boolean success;
			retry: while(true){
				pred = head;
				curr = pred.next.getReference();
				// 清除被逻辑删除的节点,也就是被标记的节点
				while(true){
					succ = curr.next.get(markHolder);
					if(markHolder[0]){
						success = pred.next.compareAndSet(curr, succ, false, false);
						if(!success){
							continue retry;
						}
						curr = succ;
						succ = curr.next.get(markHolder);
					}
				
					if(curr.key >= key){
						return new Position<T>(pred, curr);
					}
					pred = curr;
					curr = succ;
				}
			}
		}
	}

来看看无锁Set的实现

1. 无锁添加的时候,先用Position来寻找现场,隐含了一点是寻找的时候同时物理删除了节点

2. 判断是否存在的时候也是判断的快照状态,只保证弱一致性

3. 如果找到了要插入的现场,新建一个节点,并把next指向curr。这里需要理解curr可能被标记了逻辑删除,后面pred的next字段的CAS操作会保证curr没有被物理删除,但是如上面第三点说的,无法知道curr是否被逻辑删除了,所以新加入的节点的next可能指向了一个被逻辑删除的节点。但是不影响逻辑,因为下一个Postion.findPosition操作会正确地删除被标记逻辑删除的节点。contains操作也会判断节点是否被逻辑删除

4. pred.next通过AtomicMarkableReference的CAS操作保证了curr节点没有被物理删除,如果CAS成功,说明添加成功了

5. 删除也一样,先寻找现场,同时物理删除被标记的节点

6. 找到现场后,先CAS修改当前节点的状态为被标记逻辑删除,

7. 尝试物理删除一次,如果这里物理删除成功了,那么如果正好有一个add在同样的现场操作,那么add的Pred的CAS操作会失败。如果这里物理删除失败,那么就把逻辑删除的节点保留在链表中,等下一次findPosition操作来真正的物理删除

8. contains操作从头节点开始遍历,判断节点内容,并且没有被标记逻辑删除

9. 所有的CAS操作都配合了轮询,这样保证最终CAS操作的成功

class LockFreeSet<T> implements Set<T>{

		private final NodeWithAtomicMark<T> head;
		
		public LockFreeSet(){
			head = new NodeWithAtomicMark<T>();
			head.key = Integer.MIN_VALUE;
			NodeWithAtomicMark<T> MAX = new NodeWithAtomicMark<T>();
			MAX.key = Integer.MAX_VALUE;
			head.next.set(MAX, false);
		}
		@Override
		public boolean add(T item) {
			NodeWithAtomicMark<T> pred, curr;
			int key = item.hashCode();
			Position p = new Position();
			while(true){
				Position position = p.findPosition(head, key);
				pred = position.pred;
				curr = position.curr;
				// 如果已经存在,只能保证弱一致性,这里只是一个当时状态的快照,有可能相等的节点在这之后被其他线程已经被删除了,但是这里不能看到
				if(curr.key == key){
					return false;
				}
				NodeWithAtomicMark<T> node = new NodeWithAtomicMark<T>(item);
				node.next.set(curr, false);
				// 二元状态保证:
				// 1.pred是未被标记的;
				// 2.curr未被物理删除,还是pred的后续节点,这时候即使curr已经被逻辑删除了,也不影响添加成功。curr会在下一次find被物理删除
				// 二元状态无法知道curr是否被逻辑删除,因为mark表示的是自己节点的状态。但是curr是否被逻辑删除不影响添加成功,只要不被物理删除就行
				if(pred.next.compareAndSet(curr, node, false, false)){
					return true;
				}
			}
		}

		@Override
		public boolean remove(T item) {
			NodeWithAtomicMark<T> pred, curr;
			int key = item.hashCode();
			Position p = new Position();
			boolean success = false;
			while(true){
				Position position = p.findPosition(head, key);
				pred = position.pred;
				curr = position.curr;
				// 如果已经存在,只能保证弱一致性,这里只是一个当时状态的快照,有可能相等的节点在这之后被其他线程已经被删除了,但是这里不能看到
				if(curr.key == key){
					NodeWithAtomicMark<T> succ = curr.next.getReference();
					success = curr.next.compareAndSet(succ, succ, false, true);
					if(!success){
						continue;
					}
					pred.next.compareAndSet(curr, succ, false, false);
					return true;
				}else{
					return false;
				}
			}
		}

		@Override
		public boolean contains(T item) {
			NodeWithAtomicMark<T> curr = head;
			int key = item.hashCode();
			boolean[] markHolder = new boolean[1];
			while(curr.key < key){
				curr = curr.next.getReference();
				// 检查是否被删除
				curr.next.get(markHolder);
			}
			return curr.key == key && !markHolder[0];
		}
		
	}

抱歉!评论已关闭.