现在的位置: 首页 > 综合 > 正文

生产者与消费者

2013年12月02日 ⁄ 综合 ⁄ 共 5317字 ⁄ 字号 评论关闭

概念介绍

关于“生产者/消费者”应该所有有过计算机开发经验的人员都有所了解。但在真正的工程开发中,很多人又很容易忽视该模式的应用。具体忽视该模式应用的原因,我总结有两个:一个是对该模式不熟,害怕出现难以控制的bug;另一个是打算使用该模式,但是却无法确定生产者对象和消费者对象之间的数据单元。本篇便是为了解决上述两个问题展开。

实例引出生产者与消费者

解释一个概念,我习惯的做法是通过源码来剖析。不是有位大师说过“源码之下,了无秘密”。下面将会用一个事例代码来对生产者和消费者模式进行简单介绍。

数据单元:

要想使用好生产者和消费者模式,最先要做到的,往往也是最难的一步:确定数据单元

这里例子中生产者生产命令,消费者负责执行(消费)命令。

///
/// @brief
///     命令接口类
///
class Command
{
public:
     int m_nCmdID;                              ///< 命令唯一标识
public:
     virtual void Remove() {delete this;}       ///< 释放命令对象
}

接下来我们来定义一系列具体的命令

///
/// @brief
///       命令类型
///
const int CMD_HERO_NONE        = 0x0000000;   ///< 空命令
const int CMD_HERO_MOVE_LEFT   = 0x0000001;   ///< 左移命令
const int CMD_HERO_MOVE_TOP    = 0x0000002;   ///< 上移命令
const int CMD_HERO_MOVE_RIGHT  = 0x0000003;   ///< 右移命令
const int CMD_HERO_MOVE_BOTTUM = 0x0000004;   ///< 下移命令

const int CMD_HERO_SKILL_1     = 0x0000010;   ///< 技能1
const int CMD_HERO_SKILL_2     = 0x0000011;   ///< 技能2
const int CMD_HERO_SKILL_3     = 0x0000012;   ///< 技能3
const int CMD_HERO_SKILL_4     = 0x0000013;   ///< 技能4

///
/// @brief
///    移动命令
///
class MoveCommand : public Command
{
public:
     MoveCommand(int nStep)
      : m_nStep(nStep)
     {
     }
public:
     int m_nStep;                       ///< 移动步数
}

///
/// @brief
///     技能命令
///
class SkillCommand : public Command
{
public:
      SkillCommand()
      {;}
}

以上便是我们具体的数据单元,‘移动命令’负责包含移动命令数据内容,‘技能命令’负责包含技能命令的数据内容。当然乍一看技能命令是一个空类,但机智的读者一定发现‘技能命令’所需要的所有信息都包含在其父类信息中了。

接下来就是我们的生产命令和消费命令过程了。

生产者与消费者

生产者只负责生产命令,消费者负责从生产者生产的命令中去拿取并消费。那么生产者所生产的大量命令,我们该如何去存储呢?很显然的一个做法是使用队列,满足先进先出需求。

const int MAX_COMMAND_COUNT = 1024;                  ///< 最大存储的命令个数

///
/// @brief
///     命令管理器类
///
class CCommandMgr
{
public:
    CCommandMgr(void);
    CCommandMgr(const CCommandMgr &rhs);
    CCommandMgr& operator=(const CCommandMgr &rhs);
public:
    virtual ~CCommandMgr(void);

    ///
    /// @brief
    ///    获取命令管理对象指针
    ///
    static CCommandMgr *GetInstance();
    ///
    /// @brief
    ///    添加操作命令对象
    ///
    virtual int AddOperator(Command *cmd);
    ///
    /// @brief
    ///    获取操作命令对象
    ///
    virtual Command* GetOperatorToExcute();
    ///
    /// @brief
    ///    删除所有命令对象
    ///
    virtual void RemoveAllCommand();
    ///
    /// @brief
    ///    清理资源
    ///
    void Release();
private:
    HANDLE              m_hListMutex;           ///< 队列锁,每次只能有一个线程访问队列
    HANDLE              m_hPutSemaphore;        ///< 生产者信号量
    HANDLE              m_hGetSemaphore;        ///< 消费者信号量
    std::list<Command*> m_lstCmd;               ///< 命令队列

    static CCommandMgr *m_pCmdMgr;              ///< 命令对象指针
}

CCommandMgr* CCommandMgr::m_pCmdMgr = NULL;

CCommandMgr::CCommandMgr(void)
{
    m_hListMutex = CreateMutex(NULL,FALSE,NULL);
    m_hPutSemaphore = CreateSemaphore( NULL, MAX_COMMAND_COUNT, MAX_COMMAND_COUNT, NULL);
    m_hGetSemaphore = CreateSemaphore( NULL, 0, MAX_COMMAND_COUNT, NULL);
}

CCommandMgr::~CCommandMgr(void)
{
}

CCommandMgr* CCommandMgr::GetInstance()
{
    if (NULL == m_pCmdMgr)
    {
         m_pCmdMgr = new CCommandMgr();
    }
    return m_pCmdMgr;
}

int CCommandMgr::AddOperator(Command *cmd)
{
    WaitForSingleObject(m_hPutSemaphore,INFINITE);
    WaitForSingleObject(m_hListMutex,INFINITE);

    ///< 可以对命令进行处理,判断是否有重复命令,如果有,则删除重复命令
    m_lstCmd.push_front(cmd);
    
    ReleaseSemaphore(m_hGetSemaphore,1,NULL);           ///< 在生产者线程中,消费者数据单元+1
    ReleaseMutex(m_hListMutex);
    return 0;
}

Command* CCommandMgr::GetOperatorToExcute()
{
    WaitForSingleObject(m_hGetSemaphore,INFINITE);
    WaitForSingleObject(m_hListMutex,INFINITE);

    Command* pCmd = m_lstCmd.back();
    m_lstCmd.pop_back();

    ReleaseSemaphore(m_hPutSemaphore,1,NULL);           ///< 在消费者线程中,生产者数据单元+1
    ReleaseMutex(m_hListMutex);
    return pCmd;
}

void CCommandMgr::RemoveAllCommand()
{
    WaitForSingleObject(m_hListMutex,INFINITE);

    std::list<Command*>::iterator it;
    it = m_lstCmd.begin();
    while (it != m_lstCmd.end())                       ///< 删除所有命令,同时对应的所有生产与消费者也必须同步更新
    {
         WaitForSingleObject(m_hGetSemaphore,INFINITE);
         (*it)->Remove();
         ReleaseSemaphore(m_hPutSemaphore,1,NULL);

         it ++;
    }

    m_lstCmd.clear();
    ReleaseMutex(m_hListMutex);
}

void CCommandMgr::Release()
{
    WaitForSingleObject(m_hListMutex,INFINITE);	        ///< 释放命令管理对象
    RemoveAllCommand();

    CloseHandle(m_hListMutex);
    CloseHandle(m_hPutSemaphore);
    CloseHandle(m_hGetSemaphore);

    ReleaseMutex(m_hListMutex);
    delete m_pCmdMgr;
    m_pCmdMgr = NULL;
}

如上为生产者与消费者处理管理模块,生产者在生产的时候需要判断是否还能继续生产,即判断生产者信号量是否有激活信号,如果有,则继续生产,若没有,只能等到消费者线程完成消费操作后,才能再进行生产;同样消费者线程,只能等到生产者生产有产品之后,才能进行消费。

下面提供一个消费者线程的实现

消费者线程

g_workThread = NULL;

unsigned __stdcall CommandWorkThreadFunc(void *pArg)
{
     HWND hNotifyWnd = (HWND)(pArg);
     if ( NULL == hNotifyWnd)
     {
         ASSERT(FALSE);
         return -1;
     }
    
     CCommandMgr* pCmdMgr = CCommandMgr::GetInstance();
     ASSERT(pCmdMgr);
     
     while (TRUE)
     {
         Command *pCmd = pCmdMgr->GetOperatorToExcute();         ///< 消费者取出命令去执行
         ASSERT(pCmd);
         Sleep(10);
         switch(pCmd->m_nCmdID)
         {
          case CMD_HERO_MOVE_LEFT:
          case CMD_HERO_MOVE_TOP:
          case CMD_HERO_MOVE_RIGHT:
          case CMD_HERO_MOVE_BUTTOM:
               {
                     MoveCommand *pMoveCmd = dynamic_cast<MoveCommand*>(pCmd);
                     SendMessage(hNotifyWnd, some_self_msg_define, pCmd->m_nCmdID, pMoveCmd->m_nStep);
               }
               break;
         case CMD_HERO_SKILL_1:
         case CMD_HERO_SKILL_2:
         case CMD_HERO_SKILL_3:
         case CMD_HERO_SKILL_4:
               {
                    SkillCommand *pSkillCmd = dynamic_cast<SkillCommand*>(pCmd);
                    SendMessage(hNotifyWnd, some_self_msg_define, pCmd->m_nCmdID, 0);
               }
               break;
         default:
               break;
         }
         pCmd->Remove();            ///< 执行完成后,销毁对应的数据单元
     }

     CCommandMgr::GetInstance()->RemoveAllCommand();
     return 0;
}

如上消费者线程中,消费者不需要知道生产者的任何信息,只要生产者将数据信息放到数据单元,消费者线程就会自动执行,获取到相应的数据信息后,投递给相应的窗口进行处理。

分析

上面的例子代码,不知道是否达到了预期的让读者了解生产者/消费者模式。下面按步骤对上述事例进行阐述。

首先,需要确定我们要生产什么?消费什么?既确定要操作的数据单元。如果在编程中对数据单元没有十足的把握,可以像事例中,定义一个接口数据单元,这样做对后续如果对数据单元增删也是很有益处的。

其次,要实现我们的生产者和消费者管理类,在这里我们采用了信号量来分别控制生产者和消费者。生产者被设定了默认的最大激活信号量,这样做的目的是来控制生产者的生产上限,同时消费者被设定了默认的最小激活信号量,目的是来控制消费者的消费下限。

最后,一般情况下我们都是为消费者做一个消费者线程,这样生产者不论在其他任何线程生产数据单元的时候,消费者就会得到执行。生产者与消费者完全解耦。

如果还有读者不能明白,可以和我继续讨论,我也是正在学习该知识点。

应用环境

生产者/消费者模式,可以应用于所有对消费者执行没有严格控制的地方。因为所有的数据单元一旦被生产者生产,随时都会被消费者所消费使用。在网络通信方面尤其实用,很多客户端需要向服务端发送指令,这时候,就可以采用生产者/消费者模型,客户端无论在什么时候生产指令,消费者都能及时的将数据单元解析,并迅速响应发送给服务端。

研究对象

      生产者

负责数据单元的生产,对数据单元的封装操作。

      消费者

负责数据单元的消费,对数据单元的解析操作。

总结

生产者/消费者虽然不属于23种设计模式中的任何一种,但是它将生产对象与消费对象解耦的过程却和设计模式相成相辅。熟练掌握该模式,会使应用程序的开发更轻便,性能也会相应增强。

PS:事例代码纯手工敲写,并未验证,有误之处请与我联系。

参考

  1. 架构设计:生产者/消费者模式[0]:概述

抱歉!评论已关闭.