概念介绍
关于“生产者/消费者”应该所有有过计算机开发经验的人员都有所了解。但在真正的工程开发中,很多人又很容易忽视该模式的应用。具体忽视该模式应用的原因,我总结有两个:一个是对该模式不熟,害怕出现难以控制的bug;另一个是打算使用该模式,但是却无法确定生产者对象和消费者对象之间的数据单元。本篇便是为了解决上述两个问题展开。
实例引出生产者与消费者
解释一个概念,我习惯的做法是通过源码来剖析。不是有位大师说过“源码之下,了无秘密”。下面将会用一个事例代码来对生产者和消费者模式进行简单介绍。
数据单元:
要想使用好生产者和消费者模式,最先要做到的,往往也是最难的一步:确定数据单元。
这里例子中生产者生产命令,消费者负责执行(消费)命令。
/// /// @brief /// 命令接口类 /// class Command { public: int m_nCmdID; ///< 命令唯一标识 public: virtual void Remove() {delete this;} ///< 释放命令对象 }
接下来我们来定义一系列具体的命令
/// /// @brief /// 命令类型 /// const int CMD_HERO_NONE = 0x0000000; ///< 空命令 const int CMD_HERO_MOVE_LEFT = 0x0000001; ///< 左移命令 const int CMD_HERO_MOVE_TOP = 0x0000002; ///< 上移命令 const int CMD_HERO_MOVE_RIGHT = 0x0000003; ///< 右移命令 const int CMD_HERO_MOVE_BOTTUM = 0x0000004; ///< 下移命令 const int CMD_HERO_SKILL_1 = 0x0000010; ///< 技能1 const int CMD_HERO_SKILL_2 = 0x0000011; ///< 技能2 const int CMD_HERO_SKILL_3 = 0x0000012; ///< 技能3 const int CMD_HERO_SKILL_4 = 0x0000013; ///< 技能4
/// /// @brief /// 移动命令 /// class MoveCommand : public Command { public: MoveCommand(int nStep) : m_nStep(nStep) { } public: int m_nStep; ///< 移动步数 }
/// /// @brief /// 技能命令 /// class SkillCommand : public Command { public: SkillCommand() {;} }
以上便是我们具体的数据单元,‘移动命令’负责包含移动命令数据内容,‘技能命令’负责包含技能命令的数据内容。当然乍一看技能命令是一个空类,但机智的读者一定发现‘技能命令’所需要的所有信息都包含在其父类信息中了。
接下来就是我们的生产命令和消费命令过程了。
生产者与消费者
生产者只负责生产命令,消费者负责从生产者生产的命令中去拿取并消费。那么生产者所生产的大量命令,我们该如何去存储呢?很显然的一个做法是使用队列,满足先进先出需求。
const int MAX_COMMAND_COUNT = 1024; ///< 最大存储的命令个数 /// /// @brief /// 命令管理器类 /// class CCommandMgr { public: CCommandMgr(void); CCommandMgr(const CCommandMgr &rhs); CCommandMgr& operator=(const CCommandMgr &rhs); public: virtual ~CCommandMgr(void); /// /// @brief /// 获取命令管理对象指针 /// static CCommandMgr *GetInstance(); /// /// @brief /// 添加操作命令对象 /// virtual int AddOperator(Command *cmd); /// /// @brief /// 获取操作命令对象 /// virtual Command* GetOperatorToExcute(); /// /// @brief /// 删除所有命令对象 /// virtual void RemoveAllCommand(); /// /// @brief /// 清理资源 /// void Release(); private: HANDLE m_hListMutex; ///< 队列锁,每次只能有一个线程访问队列 HANDLE m_hPutSemaphore; ///< 生产者信号量 HANDLE m_hGetSemaphore; ///< 消费者信号量 std::list<Command*> m_lstCmd; ///< 命令队列 static CCommandMgr *m_pCmdMgr; ///< 命令对象指针 }
CCommandMgr* CCommandMgr::m_pCmdMgr = NULL; CCommandMgr::CCommandMgr(void) { m_hListMutex = CreateMutex(NULL,FALSE,NULL); m_hPutSemaphore = CreateSemaphore( NULL, MAX_COMMAND_COUNT, MAX_COMMAND_COUNT, NULL); m_hGetSemaphore = CreateSemaphore( NULL, 0, MAX_COMMAND_COUNT, NULL); } CCommandMgr::~CCommandMgr(void) { } CCommandMgr* CCommandMgr::GetInstance() { if (NULL == m_pCmdMgr) { m_pCmdMgr = new CCommandMgr(); } return m_pCmdMgr; } int CCommandMgr::AddOperator(Command *cmd) { WaitForSingleObject(m_hPutSemaphore,INFINITE); WaitForSingleObject(m_hListMutex,INFINITE); ///< 可以对命令进行处理,判断是否有重复命令,如果有,则删除重复命令 m_lstCmd.push_front(cmd); ReleaseSemaphore(m_hGetSemaphore,1,NULL); ///< 在生产者线程中,消费者数据单元+1 ReleaseMutex(m_hListMutex); return 0; } Command* CCommandMgr::GetOperatorToExcute() { WaitForSingleObject(m_hGetSemaphore,INFINITE); WaitForSingleObject(m_hListMutex,INFINITE); Command* pCmd = m_lstCmd.back(); m_lstCmd.pop_back(); ReleaseSemaphore(m_hPutSemaphore,1,NULL); ///< 在消费者线程中,生产者数据单元+1 ReleaseMutex(m_hListMutex); return pCmd; } void CCommandMgr::RemoveAllCommand() { WaitForSingleObject(m_hListMutex,INFINITE); std::list<Command*>::iterator it; it = m_lstCmd.begin(); while (it != m_lstCmd.end()) ///< 删除所有命令,同时对应的所有生产与消费者也必须同步更新 { WaitForSingleObject(m_hGetSemaphore,INFINITE); (*it)->Remove(); ReleaseSemaphore(m_hPutSemaphore,1,NULL); it ++; } m_lstCmd.clear(); ReleaseMutex(m_hListMutex); } void CCommandMgr::Release() { WaitForSingleObject(m_hListMutex,INFINITE); ///< 释放命令管理对象 RemoveAllCommand(); CloseHandle(m_hListMutex); CloseHandle(m_hPutSemaphore); CloseHandle(m_hGetSemaphore); ReleaseMutex(m_hListMutex); delete m_pCmdMgr; m_pCmdMgr = NULL; }
如上为生产者与消费者处理管理模块,生产者在生产的时候需要判断是否还能继续生产,即判断生产者信号量是否有激活信号,如果有,则继续生产,若没有,只能等到消费者线程完成消费操作后,才能再进行生产;同样消费者线程,只能等到生产者生产有产品之后,才能进行消费。
下面提供一个消费者线程的实现
消费者线程
g_workThread = NULL; unsigned __stdcall CommandWorkThreadFunc(void *pArg) { HWND hNotifyWnd = (HWND)(pArg); if ( NULL == hNotifyWnd) { ASSERT(FALSE); return -1; } CCommandMgr* pCmdMgr = CCommandMgr::GetInstance(); ASSERT(pCmdMgr); while (TRUE) { Command *pCmd = pCmdMgr->GetOperatorToExcute(); ///< 消费者取出命令去执行 ASSERT(pCmd); Sleep(10); switch(pCmd->m_nCmdID) { case CMD_HERO_MOVE_LEFT: case CMD_HERO_MOVE_TOP: case CMD_HERO_MOVE_RIGHT: case CMD_HERO_MOVE_BUTTOM: { MoveCommand *pMoveCmd = dynamic_cast<MoveCommand*>(pCmd); SendMessage(hNotifyWnd, some_self_msg_define, pCmd->m_nCmdID, pMoveCmd->m_nStep); } break; case CMD_HERO_SKILL_1: case CMD_HERO_SKILL_2: case CMD_HERO_SKILL_3: case CMD_HERO_SKILL_4: { SkillCommand *pSkillCmd = dynamic_cast<SkillCommand*>(pCmd); SendMessage(hNotifyWnd, some_self_msg_define, pCmd->m_nCmdID, 0); } break; default: break; } pCmd->Remove(); ///< 执行完成后,销毁对应的数据单元 } CCommandMgr::GetInstance()->RemoveAllCommand(); return 0; }
如上消费者线程中,消费者不需要知道生产者的任何信息,只要生产者将数据信息放到数据单元,消费者线程就会自动执行,获取到相应的数据信息后,投递给相应的窗口进行处理。
分析
上面的例子代码,不知道是否达到了预期的让读者了解生产者/消费者模式。下面按步骤对上述事例进行阐述。
首先,需要确定我们要生产什么?消费什么?既确定要操作的数据单元。如果在编程中对数据单元没有十足的把握,可以像事例中,定义一个接口数据单元,这样做对后续如果对数据单元增删也是很有益处的。
其次,要实现我们的生产者和消费者管理类,在这里我们采用了信号量来分别控制生产者和消费者。生产者被设定了默认的最大激活信号量,这样做的目的是来控制生产者的生产上限,同时消费者被设定了默认的最小激活信号量,目的是来控制消费者的消费下限。
最后,一般情况下我们都是为消费者做一个消费者线程,这样生产者不论在其他任何线程生产数据单元的时候,消费者就会得到执行。生产者与消费者完全解耦。
如果还有读者不能明白,可以和我继续讨论,我也是正在学习该知识点。应用环境
生产者/消费者模式,可以应用于所有对消费者执行没有严格控制的地方。因为所有的数据单元一旦被生产者生产,随时都会被消费者所消费使用。在网络通信方面尤其实用,很多客户端需要向服务端发送指令,这时候,就可以采用生产者/消费者模型,客户端无论在什么时候生产指令,消费者都能及时的将数据单元解析,并迅速响应发送给服务端。
研究对象
生产者
负责数据单元的生产,对数据单元的封装操作。
消费者
负责数据单元的消费,对数据单元的解析操作。
总结
生产者/消费者虽然不属于23种设计模式中的任何一种,但是它将生产对象与消费对象解耦的过程却和设计模式相成相辅。熟练掌握该模式,会使应用程序的开发更轻便,性能也会相应增强。
PS:事例代码纯手工敲写,并未验证,有误之处请与我联系。