现在的位置: 首页 > 综合 > 正文

再论 无锁数据结构(下)

2014年03月22日 ⁄ 综合 ⁄ 共 6941字 ⁄ 字号 评论关闭

上回说到实现了一个无锁的 WRRM 的指针保护算法,但存在一个问题是:写线程在更新指针之后,更新游离对象引用计数之前,被强制杀死,会造成读取线程的活锁,这有悖一个原则……好像叫什么“写无关原则”,即使一个线程崩溃也不该对其他线程造成太大影响(当然,所有线程全部被杀的话,啥算法都玩完), 下面的方法,可解决那个个可能存在的活锁问题:

在object里面再增加一个成员,int nTmpRelease ,用来在游离对象的引用计数变为有效值前记录临时的释放次数,在写线程更新游离对象引用计数时,更新为当前的引用计数,减去这个值。其他实现不变。

具体代码如下:

#include "stdafx.h"
#include <windows.h>
#include <winbase.h>
#include <process.h>
#include <stdio.h>

//对象的引用
struct object
{
    union
    {
        struct
        {
            void* pAnyRes;    //point to any resource etc list,map...
            int    nref_count;
        };
        __int64 nall;
    };
    int nTmpRelease;    //对象临时使用的,释放计数,为解决 写线程意外死亡造成的 活锁问题
};
typedef struct object object;

//对象的拥有者
struct owner
{
    union
    {
        struct
        {
            object* pobject;
            int nref_count;
        };
        __int64 nall;
    };
};
typedef struct owner owner;

#ifdef _DEBUG
LONG g_nNewCount = 0;
LONG g_nDelCount = 0;
#define DEBUG_INC( X )    InterlockedIncrement( (volatile long*)&X)
#else
#define DEBUG_INC( X )   
#endif

#define INVALID_REF_COUNT    (-100)    //标识一个无效的引用计数
//全局的引用者
owner    g_owner;
//获得对象指针
object* Acquire()
{
    owner stuOld,stuNew;
    do
    {
        stuOld.pobject = g_owner.pobject;
        stuOld.nref_count = g_owner.nref_count;
       
        stuNew.pobject = stuOld.pobject;
        stuNew.nref_count = stuOld.nref_count + 1;
       
    } while ( stuOld.nall != InterlockedCompareExchange64( (LONGLONG volatile*)&g_owner.nall , stuNew.nall , stuOld.nall ) );
    return stuNew.pobject;
}

void Release( object* pobject )
{
    bool bReleaseInOwner = (pobject == g_owner.pobject);
    if( bReleaseInOwner )//当前对象还是当初访问的对象
    {
        owner stuOld,stuNew;
        owner stuTest;
        do
        {
            stuOld.pobject = stuNew.pobject = pobject;
            stuOld.nref_count = g_owner.nref_count;
            stuNew.nref_count = stuOld.nref_count - 1;
            stuTest.nall = InterlockedCompareExchange64( (volatile __int64*)&g_owner , stuNew.nall , stuOld.nall );
            if( stuTest.nall == stuOld.nall )
            {
                break;
            }
            else if( stuTest.pobject != stuOld.pobject )//原对象已被更新
            {
                bReleaseInOwner = false;
                break;
            }
        } while ( true );
    }
   
    if( !bReleaseInOwner )
    {
        bool bReleaseObject = true;
        if( pobject->nref_count == INVALID_REF_COUNT )
        {
            object stuOldObj,stuNewObj,stuTestObj;
            do
            {
                stuOldObj.nref_count = INVALID_REF_COUNT;
                stuOldObj.nTmpRelease = pobject->nTmpRelease;
                __int64 llOld = *(__int64*)&stuOldObj.nref_count;
                stuNewObj.nref_count = stuOldObj.nref_count;
                stuNewObj.nTmpRelease = stuOldObj.nTmpRelease + 1;
                __int64 llNew = *(__int64*)&stuNewObj.nref_count;
                __int64 llTest = InterlockedCompareExchange64( (volatile __int64*)&pobject->nref_count , llNew , llOld );
                *((__int64*)&(stuTestObj.nref_count)) = llTest;
                if( llTest == llOld )
                {
                    bReleaseObject = false;
                    break;
                }
                else if( stuTestObj.nref_count != stuOldObj.nref_count )//对象的引用计数已经更新
                {
                    break;
                }
            } while ( true );
        }
        if( bReleaseObject )
        {
            long nRet = InterlockedDecrement( (volatile long*)&pobject->nref_count );
            if( nRet == 0 )
            {
                DEBUG_INC( g_nDelCount );
                delete pobject;
            }
            else if( nRet < 0 )
            {
                DEBUG_INC( g_nDelCount );
                delete pobject;    //must error
            }
        }
    }
}

bool Update( object* pOld , object* pNew )
{
    owner stuOld,stuNew;
    owner stuTest;
    bool bret = false;
    do
    {
        stuOld.pobject = pOld;
       
        stuNew.pobject = pNew;
        stuNew.nref_count = 1;
        pNew->nref_count = INVALID_REF_COUNT;
        pNew->nTmpRelease = 0;
       
        stuOld.nref_count = g_owner.nref_count;
       
        InterlockedCompareExchange64( (volatile __int64*)pOld , stuOld.nall , stuOld.nall );
       
        stuTest.nall = InterlockedCompareExchange64( (volatile __int64*)&g_owner , stuNew.nall , stuOld.nall );
        if( stuTest.nall == stuOld.nall )
        {
            bret = true;
            //复制引用计数
            object stuOldObj,stuNewObj,stuTestObj;
            do
            {
                stuOldObj.nref_count = pOld->nref_count;
                stuOldObj.nTmpRelease = pOld->nTmpRelease;
                __int64 llOld = *(__int64*)&stuOldObj.nref_count;
                stuNewObj.nref_count = stuOld.nref_count - stuOldObj.nTmpRelease;    //注意,这里更新
                stuNewObj.nTmpRelease = 0;
                __int64 llNew = *(__int64*)&stuNewObj.nref_count;
                __int64 llTest = InterlockedCompareExchange64( (volatile __int64*)&pOld->nref_count , llNew , llOld );
                *((__int64*)&(stuTestObj.nref_count)) = llTest;
                if( llTest == llOld )
                {
                    if( stuNewObj.nref_count == 0 )//实际上是不可能的,因为在Update的时候 WriteThread自己还保留着1个,全局还有1个至少2个
                    {
                        delete pOld;
                    }
                    break;
                }
            } while (true);
            break;
        }
        else if( stuTest.pobject != stuOld.pobject )
        {
            break;
        }
    } while ( true );
    return bret;
}

UINT CALLBACK ReadThread( void* pCount )
{
    UINT uCount = (UINT)pCount;
    do
    {
        object* pObject = Acquire();
        Release( pObject );
        uCount --;
    } while ( uCount > 0 );
    return 0;
}

UINT CALLBACK WriteThread( void* pCount )
{
    UINT uCount = (UINT)pCount;
    do
    {
        object* pObject = NULL;
        object* pNew = NULL;
        do
        {
            if( pNew != NULL )
            {
                DEBUG_INC( g_nDelCount );
                delete pNew;
            }
            if( pObject != NULL )
            {
                Release( pObject );
            }
            pObject = Acquire();
            DEBUG_INC( g_nNewCount );
            pNew = new object;
        } while ( !Update( pObject , pNew ) );
        if( pObject != NULL )
        {
            Release( pObject );    //一次是自己的
            Release( pObject );    //一次是全局的那次
        }
        uCount --;
    } while ( uCount > 0 );
    return 0;
}

int main(int argc, char* argv[])
{
    DEBUG_INC( g_nNewCount );
    g_owner.pobject = new object;
    g_owner.pobject->nTmpRelease = 0;
    g_owner.nref_count = 1;
#define READ_COUNT    1000000
#define WRITE_COUNT    100000
   
    HANDLE pWaitHandle[3];
    UINT uThreadID = 0;
    pWaitHandle[0] = (HANDLE)_beginthreadex( NULL , 0 , ReadThread , (void*)READ_COUNT , 0 ,  &uThreadID );
    pWaitHandle[1] = (HANDLE)_beginthreadex( NULL , 0 , ReadThread , (void*)READ_COUNT , 0 ,  &uThreadID );
    pWaitHandle[2] = (HANDLE)_beginthreadex( NULL , 0 , WriteThread , (void*)WRITE_COUNT , 0 ,  &uThreadID );
    WriteThread( (void*)WRITE_COUNT );
    WaitForMultipleObjects( 3 , pWaitHandle , TRUE , INFINITE );
#ifdef _DEBUG
    printf( "new %d delete %d det is %d/n",g_nNewCount , g_nDelCount , g_nNewCount - g_nDelCount );
#endif
    return 0;
}

至此,一个WRRM 的指针保护对象终于完成了,不会发生活锁,如果有线程被异常终止,也和Hazard 指针一样会造成一点泄漏。

不过这个实现 目的旨在保护指针指向的资源,任何C/C++对象都可使用(C语言把new/delete换成malloc/free),系统开销也

比Hazard 指针方案开销要小,方案的结构上好像有点共轭关系,就叫 无锁共轭数据结构吧 。

参考文献

《lock-free Data Structures》  http://erdani.org/publications/cuj-2004-10.pdf

刘未鹏 同学翻译的:(真是个大善人啊)

http://blog.csdn.net/pongba/archive/2006/01/26/588638.aspx

http://blog.csdn.net/pongba/archive/2006/01/29/589864.aspx

 

抱歉!评论已关闭.