现在的位置: 首页 > 综合 > 正文

用MUTEX实现读写锁

2013年05月08日 ⁄ 综合 ⁄ 共 11560字 ⁄ 字号 评论关闭

这个需求很明确,就不罗嗦了。

第一个版本:

class ResManager {
public:
    ResManager(HANDLE hHandle) : m_hHandle (hHandle) {
        WaitForSingleObject(m_hHandle, INFINITE);
    }

    ~ResManager() {
        ReleaseMutex(m_hHandle);
    }

private:
    HANDLE m_hHandle; 
};

class WriteMsgHelper {
public:
    static WriteMsgHelper* getInstance() {
        if (m_pInstance == 0) {
            m_pInstance = new WriteMsgHelper();
            m_hMutex = CreateMutex(0, false, 0);
        }
        return m_pInstance;
    }

    void WriteMsg(const string& str) {
        ResManager rm(m_hMutex);
        cout << str.c_str() << endl; 
    }
private:
    WriteMsgHelper() {}
    static WriteMsgHelper* m_pInstance; 
    static HANDLE m_hMutex; 
};

WriteMsgHelper* WriteMsgHelper::m_pInstance = 0; 
HANDLE WriteMsgHelper::m_hMutex = INVALID_HANDLE_VALUE;

class BaseLock {
public:
    virtual ~BaseLock() {}
    virtual bool getLock() = 0;
    virtual void releaseLock() = 0;
};

class WriteLock : public BaseLock 
{
public:
    WriteLock(int &iReaderCount, HANDLE &hReaderMutex) 
        : m_iReader_Count(iReaderCount),
          m_hMutex_Reader(hReaderMutex)
    {

    }

    bool getLock() {
        WaitForSingleObject(m_hMutex_Reader, INFINITE);

        if (m_iReader_Count == 0) {
            return true; 
        }
        else {
            ReleaseMutex(m_hMutex_Reader);
            return false;
        }
    }

    void releaseLock() {
        ReleaseMutex(m_hMutex_Reader);
    }
private:
    HANDLE &m_hMutex_Reader;
    int &m_iReader_Count; 
};

class ReadLock : public BaseLock
{
public:
    ReadLock(int &iReaderCount, HANDLE &hReaderMutex) : m_iReaderCount(iReaderCount), m_hMutex_ReaderCount(hReaderMutex) {}
    bool getLock() {
        ResManager rm(m_hMutex_ReaderCount);
        m_iReaderCount++;
        return true;
    }
    void releaseLock() {
        ResManager rm(m_hMutex_ReaderCount);
        m_iReaderCount--;
    }
private:
    int &m_iReaderCount;
    HANDLE &m_hMutex_ReaderCount; 
};

struct ThreadParam {
    int iThreadID;
    BaseLock *pLock;
    int iRand;

    ThreadParam(int aiThreadID, BaseLock *apLock, int aiRand) 
        : iThreadID(aiThreadID), pLock(apLock), iRand(aiRand) {

    } 
};

const int g_iMaxSleepSeconds = 5; 

// thread try to write
DWORD WINAPI WriteThread(PVOID pvParam) {
    char buf[200] = {0};
    ThreadParam *pThreadParam = (ThreadParam *)pvParam;
    sprintf(buf, "Thread %d try to get write lock", pThreadParam->iThreadID);
    WriteMsgHelper::getInstance()->WriteMsg(buf);

    bool result = pThreadParam->pLock->getLock();
    if (!result) {
        sprintf(buf, "Thread %d get lock failed", pThreadParam->iThreadID);
        WriteMsgHelper::getInstance()->WriteMsg(buf);
    }
    else {
        sprintf(buf, "Thread %d get lock successfully", pThreadParam->iThreadID);
        WriteMsgHelper::getInstance()->WriteMsg(buf);

        int iSleep = pThreadParam->iRand;
        sprintf(buf, "Thread %d write %d seconds", pThreadParam->iThreadID, iSleep);
        WriteMsgHelper::getInstance()->WriteMsg(buf);
        Sleep(iSleep * 1000);
        sprintf(buf, "Thread %d release lock", pThreadParam->iThreadID);
        WriteMsgHelper::getInstance()->WriteMsg(buf);
        pThreadParam->pLock->releaseLock();
    }
    delete(pThreadParam);
    return 0; 
}

// thread try to read
DWORD WINAPI ReadThread(PVOID pvParam) {
    char buf[200] = {0};
    ThreadParam *pThreadParam = (ThreadParam*)pvParam;
    sprintf(buf, "Thread %d try to get read lock", pThreadParam->iThreadID);
    WriteMsgHelper::getInstance()->WriteMsg(buf);

    bool result = pThreadParam->pLock->getLock();
    if (!result) {
        sprintf(buf, "Thread %d get lock failed", pThreadParam->iThreadID);
        WriteMsgHelper::getInstance()->WriteMsg(buf);
    }
    else {
        sprintf(buf, "Thread %d get lock successfully", pThreadParam->iThreadID);
        WriteMsgHelper::getInstance()->WriteMsg(buf);
        int iSleep = pThreadParam->iRand;
        sprintf(buf, "Thread %d read %d seconds", pThreadParam->iThreadID, iSleep);
        WriteMsgHelper::getInstance()->WriteMsg(buf);
        Sleep(iSleep * 1000);
        sprintf(buf, "Thread %d release lock", pThreadParam->iThreadID);
        WriteMsgHelper::getInstance()->WriteMsg(buf);
        pThreadParam->pLock->releaseLock();
    }

    delete(pThreadParam);
    return 0;
}

int _tmain(int argc, _TCHAR* argv[])
{
    const int iThreadNum = 5; 

    HANDLE hReaderMutex = CreateMutex(0, false, 0);
    int iReaderCount = 0; 

    WriteLock writeLock(iReaderCount, hReaderMutex);
    ReadLock readLock(iReaderCount, hReaderMutex);
    srand(time(0));

    HANDLE hThreads[iThreadNum * 2] = {0};

    for (int i=0; i<5; i++) {
        // create reader
        Sleep(10);
        ThreadParam *pThreadParam = new ThreadParam(i * 2, &readLock, rand() % g_iMaxSleepSeconds);
        hThreads[i*2] = CreateThread(0, 0, ReadThread, pThreadParam, 0, 0);

        // create writer
        Sleep(10);
        pThreadParam = new ThreadParam(i * 2 + 1, &writeLock, rand() % g_iMaxSleepSeconds);
        hThreads[i*2+1] = CreateThread(0, 0, WriteThread, pThreadParam, 0, 0);
    }

    WaitForMultipleObjects(iThreadNum * 2, hThreads, true, INFINITE);

    return 0; 
}

输出:

Thread 0 try to get read lock
Thread 0 get lock successfully
Thread 0 read 1 seconds
Thread 1 try to get write lock
Thread 2 try to get read lock
Thread 1 get lock failed
Thread 2 get lock successfully
Thread 3 try to get write lock
Thread 2 read 0 seconds
Thread 4 try to get read lock
Thread 3 get lock failed
Thread 2 release lock
Thread 5 try to get write lock
Thread 4 get lock successfully
Thread 6 try to get read lock
Thread 5 get lock failed
Thread 7 try to get write lock
Thread 4 read 3 seconds
Thread 6 get lock successfully
Thread 8 try to get read lock
Thread 7 get lock failed
Thread 9 try to get write lock
Thread 6 read 3 seconds
Thread 8 get lock successfully
Thread 9 get lock failed
Thread 8 read 4 seconds
Thread 0 release lock
Thread 4 release lock
Thread 6 release lock
Thread 8 release lock

其中有几个类是用来做辅助作用的,可以忽略。包括ResManager (用来管理资源), WritesgHelper(用来保证消息完整的输出到标准输出上)。

大概思路是这样的,定义了一个WriteLock类和一个ReadLock类,它们继承自BaseLock类。这样的用意是线程中使用时可以不用关心锁的类型,而是调用统一的接口就可以了。WriteLock和ReadLock实际上共享了一个mutex和一个整型变量。整型变量记录了读者的数量,而该mutex即为了保护这个变量而设。读锁每次简单的增加读者的数量,释放锁时减少读者的数量。而写锁每次要检查读者的数量,只有在读者数量为0时才进行写操作,否则写操作失败。

测试结果基本反映了我们设计的意图,读者可以并发的进行读操作,而写者操作具有排他性。但是我们发现一个问题,写者经常处于饥渴状态,以至于根本拿不到锁。第二版本我们试图改掉这个问题。读写锁有两种:第一种是强读者同步,即只要写者没有进行写操作就可以进行读操作;第二种是强读者同步,即只要读者没有进行读操作就可以进行写操作。这里先想办法实现一个简单的版本,就是写者如果申请锁失败了,不返回,而是阻塞等待。但是这个实现我大概想了想,好像只用MUTEX没有好的办法实现。我要回去再好好研究一下。大侠们有好的主意吗?

第二个版本: 尝试写了一个,发现原来如此简单,之前一直想不出来是因为总想在写锁中查看读者的状态,其实完全没必要:

class ResManager {
public:
    ResManager(HANDLE hHandle) : m_hHandle (hHandle) {
        WaitForSingleObject(m_hHandle, INFINITE);
    }

    ~ResManager() {
        ReleaseMutex(m_hHandle);
    }

private:
    HANDLE m_hHandle; 
};

class WriteMsgHelper {
public:
    static WriteMsgHelper* getInstance() {
        if (m_pInstance == 0) {
            m_pInstance = new WriteMsgHelper();
            m_hMutex = CreateMutex(0, false, 0);
        }
        return m_pInstance;
    }

    void WriteMsg(const string& str) {
        ResManager rm(m_hMutex);
        cout << str.c_str() << endl; 
    }
private:
    WriteMsgHelper() {}
    static WriteMsgHelper* m_pInstance; 
    static HANDLE m_hMutex; 
};

WriteMsgHelper* WriteMsgHelper::m_pInstance = 0; 
HANDLE WriteMsgHelper::m_hMutex = INVALID_HANDLE_VALUE;

class BaseLock {
public:
    virtual ~BaseLock() {}
    virtual bool getLock() = 0;
    virtual void releaseLock() = 0;
};

class WriteLock : public BaseLock 
{
public:
    WriteLock(HANDLE &hReaderMutex) 
        : m_hMutex_Writer(hReaderMutex)
    {

    }

    bool getLock() {
        WaitForSingleObject(m_hMutex_Writer, INFINITE);

        return true;
    }

    void releaseLock() {
        ReleaseMutex(m_hMutex_Writer);
    }
private:
    HANDLE &m_hMutex_Writer;
};

class ReadLock : public BaseLock
{
public:
    ReadLock(int &iReaderCount, HANDLE &hReaderMutex, HANDLE &hWriterMutex) 
        : m_iReaderCount(iReaderCount), m_hMutex_Reader(hReaderMutex), m_hMutex_Writer(hWriterMutex) {}
    bool getLock() {
        ResManager rm(m_hMutex_Reader);
        if (m_iReaderCount++ == 0)
            WaitForSingleObject(m_hMutex_Writer, INFINITE);
        return true;
    }
    void releaseLock() {
        ResManager rm(m_hMutex_Reader);
        m_iReaderCount--;
        if (m_iReaderCount == 0)
            ReleaseMutex(m_hMutex_Writer);
    }
private:
    int &m_iReaderCount;
    HANDLE &m_hMutex_Reader; 
    HANDLE &m_hMutex_Writer; 
};

struct ThreadParam {
    int iThreadID;
    BaseLock *pLock;
    int iRand;

    ThreadParam(int aiThreadID, BaseLock *apLock, int aiRand) 
        : iThreadID(aiThreadID), pLock(apLock), iRand(aiRand) {

    } 
};

const int g_iMaxSleepSeconds = 5; 

// thread try to write
DWORD WINAPI WriteThread(PVOID pvParam) {
    char buf[200] = {0};
    ThreadParam *pThreadParam = (ThreadParam *)pvParam;
    sprintf(buf, "Thread %d try to get write lock", pThreadParam->iThreadID);
    WriteMsgHelper::getInstance()->WriteMsg(buf);

    bool result = pThreadParam->pLock->getLock();
    if (!result) {
        sprintf(buf, "Thread %d get lock failed", pThreadParam->iThreadID);
        WriteMsgHelper::getInstance()->WriteMsg(buf);
    }
    else {
        sprintf(buf, "Thread %d get lock successfully", pThreadParam->iThreadID);
        WriteMsgHelper::getInstance()->WriteMsg(buf);

        int iSleep = pThreadParam->iRand;
        sprintf(buf, "Thread %d write %d seconds", pThreadParam->iThreadID, iSleep);
        WriteMsgHelper::getInstance()->WriteMsg(buf);
        Sleep(iSleep * 1000);
        sprintf(buf, "Thread %d release lock", pThreadParam->iThreadID);
        WriteMsgHelper::getInstance()->WriteMsg(buf);
        pThreadParam->pLock->releaseLock();
    }
    delete(pThreadParam);
    return 0; 
}

// thread try to read
DWORD WINAPI ReadThread(PVOID pvParam) {
    char buf[200] = {0};
    ThreadParam *pThreadParam = (ThreadParam*)pvParam;
    sprintf(buf, "Thread %d try to get read lock", pThreadParam->iThreadID);
    WriteMsgHelper::getInstance()->WriteMsg(buf);

    bool result = pThreadParam->pLock->getLock();
    if (!result) {
        sprintf(buf, "Thread %d get lock failed", pThreadParam->iThreadID);
        WriteMsgHelper::getInstance()->WriteMsg(buf);
    }
    else {
        sprintf(buf, "Thread %d get lock successfully", pThreadParam->iThreadID);
        WriteMsgHelper::getInstance()->WriteMsg(buf);
        int iSleep = pThreadParam->iRand;
        sprintf(buf, "Thread %d read %d seconds", pThreadParam->iThreadID, iSleep);
        WriteMsgHelper::getInstance()->WriteMsg(buf);
        Sleep(iSleep * 1000);
        sprintf(buf, "Thread %d release lock", pThreadParam->iThreadID);
        WriteMsgHelper::getInstance()->WriteMsg(buf);
        pThreadParam->pLock->releaseLock();
    }

    delete(pThreadParam);
    return 0;
}

int _tmain(int argc, _TCHAR* argv[])
{
    const int iThreadNum = 5; 

    HANDLE hReaderMutex = CreateMutex(0, false, 0);
    HANDLE hWriterMutex = CreateMutex(0, false, 0);
    int iReaderCount = 0; 

    WriteLock writeLock(hWriterMutex);
    ReadLock readLock(iReaderCount, hReaderMutex, hWriterMutex);
    srand(time(0));

    HANDLE hThreads[iThreadNum * 2] = {0};

    for (int i=0; i<5; i++) {
        // create reader
        Sleep(10);
        ThreadParam *pThreadParam = new ThreadParam(i * 2, &readLock, rand() % g_iMaxSleepSeconds);
        hThreads[i*2] = CreateThread(0, 0, ReadThread, pThreadParam, 0, 0);

        // create writer
        Sleep(10);
        pThreadParam = new ThreadParam(i * 2 + 1, &writeLock, rand() % g_iMaxSleepSeconds);
        hThreads[i*2+1] = CreateThread(0, 0, WriteThread, pThreadParam, 0, 0);
    }

    WaitForMultipleObjects(iThreadNum * 2, hThreads, true, INFINITE);

    return 0;
}

测试结果输出:

Thread 0 try to get read lock
Thread 0 get lock successfully
Thread 0 read 2 seconds
Thread 1 try to get write lock
Thread 2 try to get read lock
Thread 2 get lock successfully
Thread 2 read 3 seconds
Thread 3 try to get write lock
Thread 4 try to get read lock
Thread 4 get lock successfully
Thread 4 read 1 seconds
Thread 5 try to get write lock
Thread 6 try to get read lock
Thread 6 get lock successfully
Thread 6 read 0 seconds
Thread 7 try to get write lock
Thread 8 try to get read lock
Thread 8 get lock successfully
Thread 8 read 0 seconds
Thread 9 try to get write lock
Thread 0 release lock
Thread 1 get lock successfully
Thread 1 write 2 seconds
Thread 1 release lock
Thread 3 get lock successfully
Thread 3 write 4 seconds
Thread 3 release lock
Thread 5 get lock successfully
Thread 5 write 4 seconds
Thread 5 release lock
Thread 7 get lock successfully
Thread 7 write 0 seconds
Thread 7 release lock
Thread 9 get lock successfully
Thread 9 write 2 seconds
Thread 9 release lock
Thread 2 release lock
Thread 4 release lock
Thread 6 release lock
Thread 8 release lock

让我觉得不解的事情发生了。Thread 0是一个读线程,并且它发现它是第一个读线程,就锁住了写锁。但是在thread 0运行结束后,虽然thread 0没有释放写锁,但是写锁实际被释放了。换句话说,锁的操作状态只有在线程运行期有效,一旦线程运行结束,它对锁的操作(例如Lock())也就失效了。这中现象发生是应为我们调用了WaitForSingleObject()造成的。下面我换了一个自己用关键字段实现的锁:

class MyLock {
public:
    MyLock(CRITICAL_SECTION &cs) : m_cs(cs) {
        InitializeCriticalSection(&m_cs);
    }

    ~MyLock() {
        DeleteCriticalSection(&m_cs);
    }

    void lock() {
        EnterCriticalSection(&m_cs);
    }

    void unlock() {
        LeaveCriticalSection(&m_cs);
    }
private:
    CRITICAL_SECTION &m_cs; 
};

运行结果:

Thread 0 try to get read lock
Thread 0 get lock successfully
Thread 0 read 4 seconds
Thread 1 try to get write lock
Thread 2 try to get read lock
Thread 2 get lock successfully
Thread 2 read 3 seconds
Thread 3 try to get write lock
Thread 4 try to get read lock
Thread 4 get lock successfully
Thread 4 read 1 seconds
Thread 5 try to get write lock
Thread 6 try to get read lock
Thread 6 get lock successfully
Thread 6 read 1 seconds
Thread 7 try to get write lock
Thread 8 try to get read lock
Thread 8 get lock successfully
Thread 8 read 1 seconds
Thread 9 try to get write lock
Thread 0 release lock
Thread 2 release lock
Thread 4 release lock
Thread 6 release lock
Thread 8 release lock
Thread 1 get lock successfully
Thread 1 write 2 seconds
Thread 1 release lock
Thread 3 get lock successfully
Thread 3 write 3 seconds
Thread 3 release lock
Thread 5 get lock successfully
Thread 5 write 2 seconds
Thread 5 release lock
Thread 7 get lock successfully
Thread 7 write 2 seconds
Thread 7 release lock
Thread 9 get lock successfully
Thread 9 write 0 seconds
Thread 9 release lock

非常好,结果完全符合预期。但是我们看到这是一种读者优先的锁,也就是写者要等待所有恶读者完成读操作才能访问关键资源,即便他是先于某些读者到来的。下面一个实现实现一种写者优先的锁:

 

 

抱歉!评论已关闭.