现在的位置: 首页 > 综合 > 正文

AbstractQueuedSynchronizer源码解析之ReentrantLock(一)

2013年12月18日 ⁄ 综合 ⁄ 共 11750字 ⁄ 字号 评论关闭

    

在上一篇笔记中提到concurrent包中semaphores, barriers, and latches等Synchronizer的介绍和使用,于是想深入的了解这些Synchronizer的原理和源码。可以从上图中观察到这些Synchronizer内部都有自己的Sync类,而且Sync类全部继承AbstractQueuedSynchronizer抽象类,可以看出AbstractQueuedSynchronizer类就是整个concurrent包的Synchronizer
Framework。Doug Lea在他的论文《
The java.util.concurrent Synchronizer Framework》对AbstractQueuedSynchronizer作了详尽的说明,花了很长的时间来看了这篇论文,论文中提到:

Support for these operations requires the coordination of three basic components:

* Atomically managing synchronization state

* Blocking and unblocking threads

* Maintaining queues 

一个Synchronizer需要三个基本组件,再来看看AbstractQueuedSynchronizer的源码:
/**
* The synchronization state.
*/
private volatile int state;
/**
* Head of the wait queue, lazily initialized. Except for initialization, it
* is modified only via method setHead. Note: If head exists, its waitStatus
* is guaranteed not to be CANCELLED.
*/

private transient volatile Node head;
/**
* Tail of the wait queue, lazily initialized. Modified only via method enq
* to add new wait node.
*/

private transient volatile Node tail;
上面代码声明了state和队列wait queue(FIFO类型)。AbstractQueuedSynchronizer采用LockSupport来支持Block,LockSupport的park()和unpark(Thread
thread)可以block和unblock一个线程。

再来看看wait queue中Node的结构,首先看源码中Node class的说明:
The wait queue is a variant of a "CLH" (Craig, Landin, and Hagersten)lock queue. CLH locks are normally used for spinlocks. We instead use them for blocking
synchronizers, but 
use the same basic tactic of holding some of the control information about a thread in the predecessor of its node. A
"status" field in each node keeps track of whether a thread should block. A node is signalled when its predecessor releases. Each node of the queue otherwise serves as a specific-notification-style monitor holding a single waiting thread. The status field
does NOT control whether threads are granted locks etc though. A thread may try to acquire if it is first in the queue. But being first does not guarantee success; it only gives the right to contend.
 
So the currently released contender thread may need to rewait.


To enqueue into a CLH lock, you atomically splice it in as new tail. To dequeue, you just set the head field.

<pre>
+------+  prev +-----+       +-----+
head |      | <---- |     | <---- |     |  tail
+------+       +-----+       +-----+
</pre>


Insertion into a CLH queue requires only a single atomic operation on "tail", so there is a simple atomic point of demarcation from unqueued to queued. Similarly,
dequeing involves only updating the "head". However,
it takes a bit more work for nodes to determine who their successors are,in part to deal
with possible cancellation due to timeouts and interrupts.



The "prev" links (not used in original CLH locks), are mainly needed to handle cancellation. If a node is cancelled, its successor is (normally)
relinked to a non-cancelled predecessor.
 
For explanation of similar mechanics in the case of spin locks, see the papers by Scott and Scherer at http://www.cs.rochester.edu/u/scott/synchronization/


We also use "next" links to implement blocking mechanics. The thread id for each node is kept in its own node, so a predecessor signals the next
node to wake up by traversing next link to determine which thread it is.Determination of successor must avoid races with newly queued nodes to set the "next" fields of their predecessors. This is solved when necessary by checking backwards from the atomically
updated "tail" when a node's successor appears to be null. 
(Or, said differently, thenext-links are an optimization so that we don't usually
need a backward scan.)



Cancellation introduces some conservatism to the basic algorithms. Since we must poll for cancellation of other nodes, we can miss noticing whether a cancelled
node is ahead or behind us. This is dealt with by always
 unparking successors upon cancellation, allowing them to stabilize on a new predecessor.


CLH queues need a dummy header node to get
started
. But we don't create them on construction, because it would be wasted effort if there is never contention. Instead, the node is constructed
and head and tail pointers are set upon first contention.



Threads waiting on Conditions use the same nodes, but use an additional link. Conditions only need to link nodes in simple (non-concurrent) linked queues
because they are only accessed when exclusively held. 
Upon await, a node is inserted into a condition queue. Upon signal, the node is transferred
to the main queue. A special value of status field is used to mark which queue a node is on.
 说明很长,黑体部分是关键。继续看Node class的源码:
static final class Node {
/** waitStatus value to indicate thread has cancelled */
static final int CANCELLED = 1;
/** waitStatus value to indicate successor's thread needs unparking */
static final int SIGNAL = -1;
/** waitStatus value to indicate thread is waiting on condition */
static final int CONDITION = -2;
/** Marker to indicate a node is waiting in shared mode */
static final Node SHARED = new Node();
/** Marker to indicate a node is waiting in exclusive mode */
static final Node EXCLUSIVE = null;
定义了一些常量,指明了wait status的几个状态。
/**
* Status field, taking on only the values: 
SIGNAL: The successor of this node is (or
will soon be) blocked (via park), so the current node must unpark its successor when it releases or cancels. To avoid races, acquire methods must first indicate they need a signal, then retry the atomic acquire, and then on failure,block. 
* CANCELLED:This node is cancelled due to timeout or interrupt. Nodes never leave this state. In particular, a thread with cancelled node never again blocks. 
* CONDITION: This node is currently on a condition queue. It will not be used as a sync queue node until transferred. (Use of this value here has nothing to do with the other uses of the field, but simplifies mechanics.) 
* 0: None of the above


* The values are arranged numerically to simplify use. Non-negative
* values mean that a node doesn't need to signal. So, most code doesn't
* need to check for particular values, just for sign.

* The field is initialized to 0 for normal sync nodes, and CONDITION
* for condition nodes. It is modified only using CAS.
*/
volatile int waitStatus;
/**
Link to predecessor node that current node/thread
relies on for
* checking waitStatus. Assigned during enqueing, and nulled out (for
* sake of GC) only upon dequeuing. Also, upon cancellation of a
* predecessor, we short-circuit while finding a non-cancelled one,
* which will always exist because the head node is never cancelled: A
* node becomes head only as a result of successful acquire. A cancelled
* thread never succeeds in acquiring, and a thread only cancels itself,
* not any other node.

*/
volatile Node prev;

/**
Link to the successor node that the current
node/thread unparks upon
* release. Assigned once during enqueuing, and nulled out (for sake of
* GC) when no longer needed. Upon cancellation, we cannot adjust this
* field, but can notice status and bypass the node if cancelled. The
* enq operation does not assign next field of a predecessor until after
* attachment, so seeing a null next field does not necessarily mean
* that node is at end of queue. However, if a next field appears to be
* null, we can scan prev's from the tail to double-check.

*/
volatile Node next;

/**
* The thread that enqueued this node. Initialized on construction and
* nulled out after use.
*/
volatile Thread thread;

/**
* Link to next node waiting on condition,
or the special value SHARED.
* Because condition queues are accessed only when holding in exclusive
* mode, we just need a simple linked queue to hold nodes while they are
* waiting on conditions. They are then transferred to the queue to
* re-acquire. And because conditions can only be exclusive, we save a
* field by using special value to indicate shared mode.

*/
Node nextWaiter;

这几个变量都非常关键,只有理解每个变量所代表的含义,才能继续着去看源码。

       



Synchronizer采用Template Pattern,AbstractQueuedSynchronizer是个抽象类,其中没有抽象方法,但有五个protected方法需要具体子类去覆盖

/**
Attempts to acquire in exclusive mode. This
method should query if the
* state of the object permits it to be acquired in the exclusive mode, and
* if so to acquire it.


This method is always invoked by the thread
performing acquire. If this
* method reports failure, the acquire method may queue the thread, if it is
* not already queued, until it is signalled by a release from some other
* thread
. This can be used to implement method {@link Lock#tryLock()}.

* The default implementation throws {@link UnsupportedOperationException}.

* @param arg
*            the acquire argument. This value is always the one passed to
*            an acquire method, or is the value saved on entry to a
*            condition wait. The value is otherwise uninterpreted and can
*            represent anything you like.
* @return {@code true} if successful. Upon success, this object has been
*         acquired.
* @throws IllegalMonitorStateException
*             if acquiring would place this synchronizer in an illegal
*             state. This exception must be thrown in a consistent fashion
*             for synchronization to work correctly.
* @throws UnsupportedOperationException
*             if exclusive mode is not supported
*/
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}

/**
* Attempts to set the state to reflect a release
in exclusive mode.


This method is always invoked by the thread
performing release.


* The default implementation throws {@link UnsupportedOperationException}.

* @param arg
*            the release argument. This value is always the one passed to a
*            release method, or the current state value upon entry to a
*            condition wait. The value is otherwise uninterpreted and can
*            represent anything you like.
* @return {@code true} if this object is now in a fully released state, so
*         that any waiting threads may attempt to acquire; and {@code
*         false} otherwise.
* @throws IllegalMonitorStateException
*             if releasing would place this synchronizer in an illegal
*             state. This exception must be thrown in a consistent fashion
*             for synchronization to work correctly.
* @throws UnsupportedOperationException
*             if exclusive mode is not supported
*/
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}

/**
Attempts to acquire in shared mode. This
method should query if the state
* of the object permits it to be acquired in the shared mode, and if so to
* acquire it.


* This method is always invoked by the thread performing acquire. If this
* method reports failure, the acquire method may queue the thread, if it is
* not already queued, until it is signalled by a release from some other
* thread.

* The default implementation throws {@link UnsupportedOperationException}.

* @param arg
*            the acquire argument. This value is always the one passed to
*            an acquire method, or is the value saved on entry to a
*            condition wait. The value is otherwise uninterpreted and can
*            represent anything you like.
* @return a negative value on failure; zero if acquisition in shared mode
*         succeeded but no subsequent shared-mode acquire can succeed; and
*         a positive value if acquisition in shared mode succeeded and
*         subsequent shared-mode acquires might also succeed, in which case
*         a subsequent waiting thread must check availability. (Support for
*         three different return values enables this method to be used in
*         contexts where acquires only sometimes act exclusively.) Upon
*         success, this object has been acquired.
* @throws IllegalMonitorStateException
*             if acquiring would place this synchronizer in an illegal
*             state. This exception must be thrown in a consistent fashion
*             for synchronization to work correctly.
* @throws UnsupportedOperationException
*             if shared mode is not supported
*/
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}

/**
Attempts to set the state to reflect a release
in shared mode.


* This method is always invoked by the thread performing release.

* The default implementation throws {@link UnsupportedOperationException}.

* @param arg
*            the release argument. This value is always the one passed to a
*            release method, or the current state value upon entry to a
*            condition wait. The value is otherwise uninterpreted and can
*            represent anything you like.
* @return {@code true} if this release of shared mode may permit a waiting
*         acquire (shared or exclusive) to succeed; and {@code false}
*         otherwise
* @throws IllegalMonitorStateException
*             if releasing would place this synchronizer in an illegal
*             state. This exception must be thrown in a consistent fashion
*             for synchronization to work correctly.
* @throws UnsupportedOperationException
*             if shared mode is not supported
*/
protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}

/**
Returns {@code true} if synchronization
is held exclusively with respect
* to the current (calling) thread.
 This method is invoked upon each call to
* a non-waiting {@link ConditionObject} method. (Waiting methods instead
* invoke {@link #release}.)

* <p>
* The default implementation throws {@link UnsupportedOperationException}.
* This method is invoked internally only within {@link ConditionObject}
* methods, so need not be defined if conditions are not used.

* @return {@code true} if synchronization is held exclusively; {@code
*         false} otherwise
* @throws UnsupportedOperationException

抱歉!评论已关闭.