上一篇《ACE通用服务端框架》为我们展示了ACE接受器(Acceptor)和连接器(Connector)设计模式的高度集成度,本篇延续上一篇的思路,完成通用客户端的设计和编码。
由于ACE充分发挥了C++模板技术的优势,是的Connector模式与Acceptor模式在编码上是如此接近。
好了,开始我们的编码旅程吧。同样的,本框架部分代码来自《The ACE Programmers Guide》7.6 Using the Acceptor-Connector Framework章节和《ACE自适配通信环境中文技术文档-中篇:ACE程序员教程》第七章的思路。
1、main.cpp
#include <ace/OS.h>
#include <ace/Log_Msg.h>
#include <ace/streams.h>
#include <ace/Reactor.h>
#include <ace/TP_Reactor.h>
#include <ace/Select_Reactor.h>
#include <ace/INET_Addr.h>
#include <ace/Signal.h>
#include "Global_Define.h"
int ACE_TMAIN (int, ACE_TCHAR *[])
{
ACE_TP_Reactor *tp_reactor = new ACE_TP_Reactor;
ACE_Reactor *reactor = new ACE_Reactor(tp_reactor, 1);
ACE_Reactor::instance(reactor, 1);
//ACE_TP_Reactor tp_reactor;
//ACE_Reactor reactor(&tp_reactor, 0);
//ACE_Reactor::instance(&reactor, 0);
//ACE_Select_Reactor select_reactor;
//ACE_Reactor reactor(&select_reactor, 0);
//ACE_Reactor::instance(&reactor, 0);
GlobalValue g_val;
if (g_val.load_config(CONFIG_FILE) == -1)
ACE_ERROR_RETURN((LM_ERROR, ACE_TEXT ("(%P|%t) Load config file fail!")), 1);
/****************************************************************
* TODO 如果需要,在此处注册信号
*****************************************************************/
ScheduleHandler sh;
// 注册信号,处理方法:ScheduleHandler::handle_signal()
ACE_Sig_Set sig_set;
sig_set.sig_add (SIGINT);
sig_set.sig_add (SIGQUIT);
sig_set.sig_add (SIGTERM);
sig_set.sig_add (SIGSEGV);
ACE_Reactor::instance()->register_handler(sig_set, &sh);
// 注册读取配置文件定时器,处理方法:ScheduleHandler::handle_timeout()
ACE_Reactor::instance()->schedule_timer(&sh, (const void *)&g_val, ACE_Time_Value(10), ACE_Time_Value(10));
//ACE_INET_Addr port_to_connect(50001, ACE_LOCALHOST);
ACE_INET_Addr port_to_connect(g_val.client_port, g_val.client_host.c_str());
ClientConnector connector;
do {
Client21 client;
Client21 *pc = &client;
if (connector.connect(pc, port_to_connect) == -1)
{
ACE_DEBUG((LM_ERROR, ACE_TEXT ("(%P|%t) %p. %s/n"), ACE_TEXT ("Connect"), ACE_TEXT("Auto re-connect 5 seconds later")));
ACE_OS::sleep(5);
continue;
}
ACE_Reactor::instance()->run_reactor_event_loop();
client.close();
ACE_DEBUG((LM_ERROR, ACE_TEXT("(%P|%t) Auto re-connect 3 seconds later/n")));
ACE_OS::sleep(3);
ACE_Reactor::instance()->reset_reactor_event_loop();
} while(true);
ACE_Reactor::instance()->close();
ACE_Thread_Manager::instance()->wait(&ACE_Time_Value(10));
ACE_OS::exit(0);
return (0);
}
2、Client21.cpp
#include "Global_Define.h"
/*记录最近一次接入的客户端ACE_SOCK_STREAM,用于控制一次只能允许一个客户端接入的情况*/
//ACE_SOCK_STREAM * last_peer = NULL;
Client21::Client21(void) : active_(false) /*方式2*/ /*notifier_(0, this, ACE_Event_Handler::WRITE_MASK)*/
{
this->active_ = false;
this->msg_queue()->high_water_mark(SEND_HIGHT_MARK);
/*方式2*/
//this->notifier_.reactor(this->reactor());
//this->msg_queue()->notification_strategy(&this->notifier_);
ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) Client21 start/n")));
return;
}
Client21::~Client21(void)
{
this->active_ = false;
this->msg_queue()->close();
ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) Client21 stop/n")));
return;
}
// 在连接建立后被自动回调
int Client21::open(void *p)
{
if (super::open(p) == -1) return -1;
ACE_TCHAR peer_name[MAXHOSTNAMELEN];
ACE_INET_Addr peer_addr;
if (this->peer().get_remote_addr(peer_addr) == 0 && peer_addr.addr_to_string (peer_name, MAXHOSTNAMELEN) == 0)
ACE_DEBUG((LM_INFO, ACE_TEXT("(%P|%t) Connected to %s/n"), peer_name));
this->active_ = true;
task_recv.set_active(true);
task_send.set_active(true);
task_recv.set_task_send(&task_send);
if (task_recv.activate(THR_NEW_LWP | THR_JOINABLE, TASK_RECV_POOL_SIZE) == -1)
{
ACE_DEBUG((LM_ERROR, ACE_TEXT("(%P|%t) TaskRecv activate failed/n")));
}
task_send.set_task_peer((ACE_Task<ACE_MT_SYNCH> *)this);
if (task_send.activate(THR_NEW_LWP | THR_JOINABLE, TASK_SEND_POOL_SIZE) == -1)
{
ACE_DEBUG((LM_ERROR, ACE_TEXT("(%P|%t) TaskSend activate failed/n")));
}
/*回调*/
this->start_0();
/*开启定时处理线程*/
ACE_Time_Value interval(1); // One seconds
return this->reactor()->schedule_timer(this, 0, ACE_Time_Value::zero, interval);
//return 0;
}
// 客户端接收到服务端发送的数据后,回调此函数
int Client21::handle_input(ACE_HANDLE)
{
char buffer[INPUT_SIZE] = {0};
ssize_t recv_cnt = 0;
#if (MSG_HEAD_MODE==1) /*接收消息头模式*/
/****************************************************************
* TODO 根据实际消息头格式修改解析方法
*****************************************************************/
ssize_t msg_len = 0;
recv_cnt = this->peer().recv(buffer, MSG_HEAD_SIZE);
if (recv_cnt > 0 && recv_cnt == MSG_HEAD_SIZE)
{
/*************************
* TODO 注意这里的4个字节
**************************/
msg_len = SET_BYTE4(buffer[0]&0xFF) + SET_BYTE3(buffer[1]&0xFF) + SET_BYTE2(buffer[2]&0xFF) + SET_BYTE1(buffer[3]&0xFF);
ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) Message length (%d) %s/n"), msg_len, msg_len<INPUT_SIZE-MSG_HEAD_SIZE?ACE_TEXT(""):ACE_TEXT(": warning!!! out of range")));
if (msg_len > 0 && msg_len < INPUT_SIZE-MSG_HEAD_SIZE)
{
if (msg_len == MSG_HEAD_SIZE)
{
ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) Recv(%d+%d):[%s]/n"), MSG_HEAD_SIZE, msg_len-MSG_HEAD_SIZE, buffer));
ACE_Message_Block *mb = NULL;
ACE_NEW_RETURN(mb, ACE_Message_Block(msg_len+1), -1);
ACE_OS::memcpy(mb->wr_ptr(), buffer, msg_len);
mb->wr_ptr(msg_len);
if (task_recv.putq(mb) == -1)
{
ACE_DEBUG((LM_ERROR, ACE_TEXT ("(%P|%t) %p; discarding data/n"), ACE_TEXT ("Client21 enqueue to TaskRecv failed")));
mb->release();
return 0;
}
return 0;
}
else
{
recv_cnt = this->peer().recv(buffer+MSG_HEAD_SIZE, (msg_len-MSG_HEAD_SIZE));
if (recv_cnt > 0 && recv_cnt == (msg_len-MSG_HEAD_SIZE))
{
ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) Recv(%d+%d):[%s]/n"), MSG_HEAD_SIZE, msg_len-MSG_HEAD_SIZE, buffer));
ACE_Message_Block *mb = NULL;
ACE_NEW_RETURN(mb, ACE_Message_Block(msg_len+1), -1);
ACE_OS::memcpy(mb->wr_ptr(), buffer, msg_len);
mb->wr_ptr(msg_len);
if (task_recv.putq(mb) == -1)
{
ACE_DEBUG((LM_ERROR, ACE_TEXT ("(%P|%t) %p; discarding data/n"), ACE_TEXT ("Client21 enqueue to TaskRecv failed")));
mb->release();
return 0;
}
return 0;
}
if (recv_cnt <= 0 || ACE_OS::last_error() != EWOULDBLOCK)
{
this->active_ = false;
ACE_DEBUG((LM_ERROR, ACE_TEXT("(%P|%t) Connection closed1/n")));
/*表示客户端已断开*/
return -1;
}
}
}
return 0;
}
if (recv_cnt <= 0 || ACE_OS::last_error() != EWOULDBLOCK)
{
this->active_ = false;
ACE_DEBUG((LM_ERROR, ACE_TEXT("(%P|%t) Connection closed/n")));
/*表示客户端已断开*/
return -1;
}
#else /*不接收消息头模式*/
recv_cnt = this->peer().recv(buffer, sizeof(buffer));
if (recv_cnt > 0)
{
ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) Recv(%d):[%s]/n"), recv_cnt, buffer));
ACE_Message_Block *mb = NULL;
ACE_NEW_RETURN(mb, ACE_Message_Block(recv_cnt+1), -1);
ACE_OS::memcpy(mb->wr_ptr(), buffer, recv_cnt);
mb->wr_ptr(recv_cnt);
if (task_recv.putq(mb) == -1)
{
ACE_DEBUG((LM_ERROR, ACE_TEXT ("(%P|%t) %p; discarding data/n"), ACE_TEXT ("Client21 enqueue to TaskRecv failed")));
mb->release();
return 0;
}
return 0;
}
if (recv_cnt <= 0 || ACE_OS::last_error() != EWOULDBLOCK)
{
this->active_ = false;
ACE_DEBUG((LM_ERROR, ACE_TEXT("(%P|%t) Connection closed/n")));
/*表示客户端已断开*/
return -1;
}
#endif
return 0;
}
// 客户端向服务端发送数据时,回调此函数
int Client21::handle_output (ACE_HANDLE)
{
ACE_Message_Block *mb;
ACE_Time_Value nowait (ACE_OS::gettimeofday ());
while (!this->msg_queue()->is_empty())
if (-1 != this->getq(mb, &nowait))
{
ssize_t send_cnt = this->peer().send (mb->rd_ptr(), mb->length());
if (send_cnt == -1)
{
ACE_ERROR ((LM_ERROR, ACE_TEXT ("(%P|%t) %p/n"), ACE_TEXT ("Client21 send")));
}
else
{
ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) Send(%d):[%s]/n"), send_cnt, mb->rd_ptr()));
mb->rd_ptr(ACE_static_cast(size_t, send_cnt));
}
mb->release ();
}
/*方式2*/
/*
if (this->msg_queue()->is_empty())
this->reactor()->cancel_wakeup(this, ACE_Event_Handler::WRITE_MASK);
else
this->reactor()->schedule_wakeup(this, ACE_Event_Handler::WRITE_MASK);
*/
/*(方法1)*/ this->reactor()->cancel_wakeup(this, ACE_Event_Handler::WRITE_MASK);
return 0;
}
// 定时处理函数
int Client21::handle_timeout(const ACE_Time_Value ¤t_time, const void *act)
{
if (this->active_ != true) return 0;
this->timeout_0();
return 0;
}
// 服务端断开时,回调此函数
int Client21::handle_close (ACE_HANDLE h, ACE_Reactor_Mask mask)
{
if (mask == ACE_Event_Handler::WRITE_MASK)
return 0;
else
{
ACE_TCHAR peer_name[MAXHOSTNAMELEN];
ACE_INET_Addr peer_addr;
if (this->peer().get_remote_addr(peer_addr) == 0 && peer_addr.addr_to_string (peer_name, MAXHOSTNAMELEN) == 0)
ACE_DEBUG((LM_INFO, ACE_TEXT("(%P|%t) Disconnected %s/n"), peer_name));
/*释放队列资源*/
task_send.set_active(false);
task_recv.set_active(false);
this->set_active(false);
task_send.msg_queue()->close();
task_recv.msg_queue()->close();
this->msg_queue()->close();
task_send.wait();
task_recv.wait();
this->wait();
/*回调*/
this->stop_0();
/*关闭链路*/
this->peer().close_reader();
this->peer().close_writer();
this->peer().close();
return super::handle_close (h, mask);
}
}
/**********************************************************************************************************************************/
void Client21::set_active(bool b)
{
this->active_ = b;
}
bool Client21::get_active()
{
return this->active_;
}
/**********************************************************************************************************************************/
void Client21::start_0()
{
/*
if (last_peer != NULL)
{
last_peer->close_reader();
last_peer->close_writer();
last_peer->close();
}
last_peer = &this->peer();
*/
/****************************************************************
* TODO 可以在此实现接入服务端后需要的初始化工作 - (已经实际接入)
*****************************************************************/
}
void Client21::stop_0()
{
//if (last_peer == &this->peer()) last_peer = NULL;
if (this->get_active() == true)
{
/*退出客户端*/
this->reactor()->end_reactor_event_loop();
}
/****************************************************************
* TODO 可以在此实现断开服务端后需要执行的工作
*****************************************************************/
}
void Client21::timeout_0()
{
/****************************************************************
* TODO 实现客户端定时任务,如发送心跳等
*****************************************************************/
ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) handle_timeout/n")));
}
3、TaskRecv.cpp
#include <ace/OS.h>
#include <ace/Message_Block.h>
#include <ace/Log_Msg.h>
//处理接收队列
int TaskRecv::svc(void)
{
ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) TaskRecv::svc() start/n")));
while (this->active_==true)
{
ACE_Message_Block *mb = NULL;
if (this->getq(mb) == -1)
{
continue;
}
process_recv(mb);
}
ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) TaskRecv::svc() stop/n")));
return 0;
}
void TaskRecv::process_recv(ACE_Message_Block *mb)
{
char * data = mb->rd_ptr();
size_t data_size = mb->length();
ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) Processing recv message: length %d : %s/n"), data_size, data));
/****************************************************************
* TODO 处理接收到的对等端数据,数据的起始地址为data,长度为data_size
*****************************************************************/
ACE_OS::sleep(3);
//如何发送的例子
send_msg(data, data_size);
//数据处理结束必需释放内存
mb->release();
}
//向对等端发送数据,实际操作是向发送队列插入一条 ACE_Message_Block
//返回 -1:失败 >=0:发送数据长度
int TaskRecv::send_msg(char * _buf, size_t _size)
{
if (_size == 0) return 0;
if (this->active_ != true) return -1;
return this->task_send_->send_msg(_buf, _size);
}
/**********************************************************************************************************************************/
void TaskRecv::set_active(bool b)
{
this->active_ = b;
}
bool TaskRecv::get_active()
{
return this->active_;
}
void TaskRecv::set_task_send(TaskSend *_task_send)
{
if (!this->task_send_)
{
this->task_send_ = _task_send;
}
}
TaskSend * TaskRecv::get_task_send()
{
return this->task_send_;
}
4、TaskSend.cpp
#include <ace/OS.h>
#include <ace/Message_Block.h>
#include <ace/Log_Msg.h>
//处理发送队列
int TaskSend::svc(void)
{
ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) TaskSend::svc() start/n")));
while (this->active_==true)
{
ACE_Message_Block *mb = NULL;
if (this->getq(mb) == -1)
{
continue;
}
process_send(mb);
}
ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) TaskSend::svc() stop/n")));
return 0;
}
void TaskSend::process_send(ACE_Message_Block *mb)
{
if (((Client21 *)this->task_peer_)->get_active() != true) return;
char * data = mb->rd_ptr();
size_t data_size = mb->length();
ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) Processing send message: length %d : %s/n"), data_size, data));
if (this->task_peer_->putq(mb) == -1)
{
ACE_DEBUG((LM_ERROR, ACE_TEXT ("(%P|%t) %p; discarding data/n"), ACE_TEXT ("TaskSend enqueue to Svc_Handler failed")));
mb->release();
}
/*(方法1)*/ this->task_peer_->reactor()->schedule_wakeup(this->task_peer_, ACE_Event_Handler::WRITE_MASK);
}
/**********************************************************************************************************************************/
void TaskSend::set_active(bool b)
{
this->active_ = b;
}
bool TaskSend::get_active()
{
return this->active_;
}
void TaskSend::set_task_peer(ACE_Task<ACE_MT_SYNCH> * _task_peer_)
{
if (!this->task_peer_)
{
this->task_peer_ = _task_peer_;
}
}
ACE_Task<ACE_MT_SYNCH> * TaskSend::get_task_peer()
{
return this->task_peer_;
}
/**********************************************************************************************************************************/
// 可以调用此方法向对等端发送数据,实际操作是向发送队列插入一条 ACE_Message_Block
// 返回 -1:失败 >=0:发送数据长度
int TaskSend::send_msg(char * _buf, size_t _size)
{
if (_size==0) return 0;
if (this->active_ != true) return -1;
ACE_Message_Block *mb = NULL;
ACE_NEW_RETURN(mb, ACE_Message_Block(_size+1), -1);
ACE_OS::memcpy(mb->wr_ptr(), _buf, _size);
mb->wr_ptr(_size);
if (this->putq(mb) == -1)
{
ACE_DEBUG((LM_ERROR, ACE_TEXT ("(%P|%t) %p; discarding data/n"), ACE_TEXT ("TaskSend enqueue failed")));
mb->release();
return -1;
}
return _size;
}
5、ScheduleHandler.cpp
#include "ace/OS.h"
#include "ace/Reactor.h"
#include "ace/Timer_Queue.h"
#include "ace/Time_Value.h"
#include "ace/Event_Handler.h"
#include "ace/Log_Msg.h"
#include "Global_Define.h"
// 定时任务,定时更新配置参数
int ScheduleHandler::handle_timeout(const ACE_Time_Value &c_tv, const void *arg)
{
GlobalValue * _val = (GlobalValue *)arg;
_val->load_config(CONFIG_FILE);
return 0;
}
// 信号处理
int ScheduleHandler::handle_signal(int signum, siginfo_t *, ucontext_t *)
{
ACE_UNUSED_ARG (signum);
ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) Receive system signal %d./n"), signum));
/****************************************************************
* TODO 如果需要,在此处理信号,信号必需已注册
*****************************************************************/
ACE_OS::exit(0);
return 0;
}
6、Config.cpp
#include "Global_Define.h"
#include <ace/OS.h>
#include <ace/Configuration.h>
#include <ace/Configuration_Import_Export.h>
// 载入配置文件
int GlobalValue::load_config(const char* config_filename_)
{
ACE_TString str;
long num = 0;
ACE_Configuration_Heap config;
if (config.open() == -1)
{
ACE_ERROR_RETURN((LM_ERROR, ACE_TEXT("(%P|%t) %p/n"), ACE_TEXT("config.open()")), -1);
}
ACE_Ini_ImpExp config_importer(config);
if (config_importer.import_config(config_filename_) == -1)
{
ACE_ERROR_RETURN((LM_ERROR, ACE_TEXT("(%P|%t) %p/n"), ACE_TEXT(config_filename_)), -1);
}
ACE_Configuration_Section_Key status_section;
if (config.open_section (config.root_section(), ACE_TEXT("SYSTEM"), 0, status_section) == -1)
{
ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT("(%P|%t) %p/n"), ACE_TEXT ("Can't open [SYSTEM] section")), -1);
}
// 服务地址
if (config.get_string_value(status_section, ACE_TEXT("CLIENT_HOST"), str) != -1)
{
if (str.compare(this->client_host) != 0)
{
ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) [SYSTEM]->'CLIENT_HOST' is updated (%s)->(%s)/n"), this->client_host.c_str(), str.c_str()));
this->client_host = str;
}
str.clear();
}
else
{
this->client_host.clear();
this->client_host = ACE_TEXT("127.0.0.1");
ACE_DEBUG((LM_ERROR, ACE_TEXT("(%P|%t) [SYSTEM]->'CLIENT_HOST' does not exist, default value is (%s)/n"), this->client_host.c_str()));
}
// 服务端口
if (config.get_string_value(status_section, ACE_TEXT("CLIENT_PORT"), str) != -1)
{
try
{
num = ACE_OS::strtol(str.c_str(), NULL, 10);
if (num != this->client_port)
{
ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) [SYSTEM]->'CLIENT_PORT' is updated (%d)->(%d)/n"), this->client_port, num));
this->client_port = static_cast<u_short> (num);
}
}
catch (...)
{
this->client_port = 50001;
ACE_DEBUG((LM_ERROR, ACE_TEXT("(%P|%t) [SYSTEM]->'CLIENT_PORT' is catch exception, default value is (%d)/n"), this->client_port));
}
num = 0;
}
else
{
this->client_port = 50001;
ACE_DEBUG((LM_ERROR, ACE_TEXT("(%P|%t) [SYSTEM]->'CLIENT_PORT' does not exist, default value is (%d)/n"), this->client_port));
}
/****************************************************************
* TODO 在此增加读取配置文件的参数
*****************************************************************/
return 0;
}
7、Global_Define.h
#include <ace/Task.h>
#include <ace/Synch.h>
#include <ace/INET_Addr.h>
#include <ace/Svc_Handler.h>
#include <ace/SOCK_Stream.h>
#include <ace/SOCK_Connector.h>
#include <ace/Connector.h>
#include <ace/Reactor.h>
#include <ace/Reactor_Notification_Strategy.h>
#define RECV_HIGHT_MARK 8*1024 /*接收队列高水位标*/
#define SEND_HIGHT_MARK 8*1024 /*发送队列高水位标*/
#define INPUT_SIZE 4*1024 /*接收数据缓冲区大小*/
#define TASK_RECV_POOL_SIZE 8 //处理服务器接收到的客户端数据队列的线程池大小
#define TASK_SEND_POOL_SIZE 1 //处理服务器向客户端发送的数据队列的线程池大小
#define MSG_HEAD_MODE 0 /*表示是否采用消息头模式,0:表示不使用,1:表示使用*/
#define MSG_HEAD_SIZE 12
#define CONFIG_FILE "../config/config.cfg"
#define GET_BYTE1(a) ( (a)%0x100 ) /*低1字节*/
#define GET_BYTE2(a) ( ((a)/0x100)%0x100 ) /*低2字节*/
#define GET_BYTE3(a) ( ((a)/0x10000)%0x100 ) /*低3字节*/
#define GET_BYTE4(a) ( ((a)/0x1000000)%0x100 ) /*低4字节*/
#define SET_BYTE1(a) ( (a) ) /*低1字节*/
#define SET_BYTE2(a) ( (a)*0x100 ) /*低2字节*/
#define SET_BYTE3(a) ( (a)*0x10000 ) /*低3字节*/
#define SET_BYTE4(a) ( (a)*0x1000000 ) /*低4字节*/
/**********************************************************************************************************************************/
/********************************************************************************************
* TODO 全局参数类,在此增加全局参数
********************************************************************************************/
class GlobalValue
{
public:
ACE_TString client_host; /*客户端主机地址*/
u_short client_port; /*客户端监听端口*/
int load_config(const char* config_filename_);
};
/**********************************************************************************************************************************/
/********************************************************************************************
* 采用ACE_Task任务或主动对象处理模式,处理服务器向客户端发送的数据队列
********************************************************************************************/
class TaskSend: public ACE_Task<ACE_MT_SYNCH>
{
public:
TaskSend() : active_(false), task_peer_(NULL)
{
this->active_ = false;
this->msg_queue()->high_water_mark(SEND_HIGHT_MARK);
ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) TaskSend start/n")));
}
virtual ~TaskSend()
{
this->active_ = false;
this->msg_queue()->close();
ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) TaskSend stop/n")));
}
virtual int svc(void);
void set_active(bool b);
bool get_active();
void set_task_peer(ACE_Task<ACE_MT_SYNCH> * _task_peer_);
ACE_Task<ACE_MT_SYNCH> * get_task_peer();
int send_msg(char * _buf, size_t _size);
private:
bool active_;
ACE_Task<ACE_MT_SYNCH> * task_peer_;
void process_send(ACE_Message_Block *mb = NULL);
};
/********************************************************************************************
* 采用ACE_Task任务或主动对象处理模式,处理服务器接收到的客户端数据队列
********************************************************************************************/
class TaskRecv: public ACE_Task<ACE_MT_SYNCH>
{
public:
TaskRecv() : active_(false), task_send_(NULL)
{
this->active_ = false;
this->msg_queue()->high_water_mark(RECV_HIGHT_MARK);
ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) TaskRecv start/n")));
}
virtual ~TaskRecv()
{
this->active_ = false;
this->msg_queue()->close();
ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) TaskRecv stop/n")));
}
virtual int svc(void);
void set_active(bool b);
bool get_active();
void set_task_send(TaskSend *_task_send);
TaskSend * get_task_send();
private:
bool active_;
TaskSend *task_send_;
void process_recv(ACE_Message_Block *mb = NULL);
int send_msg(char * _buf, size_t _size);
};
/********************************************************************************************
* 采用ACE连接器(Connector)连接建立模式,实现双工单路客户端
********************************************************************************************/
class Client21 : public ACE_Svc_Handler<ACE_SOCK_STREAM, ACE_MT_SYNCH>
{
typedef ACE_Svc_Handler<ACE_SOCK_STREAM, ACE_MT_SYNCH> super;
public:
Client21(void);
virtual ~Client21(void);
// 在连接建立后被自动回调
virtual int open (void * = 0);
// Called when input is available from the client.
virtual int handle_input (ACE_HANDLE fd = ACE_INVALID_HANDLE);
// Called when output is possible.
virtual int handle_output (ACE_HANDLE fd = ACE_INVALID_HANDLE);
// Called when a timer expires.
virtual int handle_timeout (const ACE_Time_Value ¤t_time, const void *act = 0);
// Called when this handler is removed from the ACE_Reactor.
virtual int handle_close(ACE_HANDLE handle, ACE_Reactor_Mask close_mask);
void set_active(bool b);
bool get_active();
private:
bool active_;
TaskRecv task_recv;
TaskSend task_send;
/********************************************************************************************
* 策略类,实现了Strategy模式。如果ACE_Message_Queue拥有一个策略对象,无论何时有ACE_Message_Block对象
* 进入队列,ACE_Message_Queue都会调用该策略对象的notify()方法,把一个通知放入队列,通知的目标是对象的handle_output()方法
********************************************************************************************/
/*方式2*/ //ACE_Reactor_Notification_Strategy notifier_;
/*回调函数,使用者在此实现对应功能*/
void start_0(); /*对等端接入后回调*/
void stop_0(); /*对等端关闭后回调*/
void timeout_0(); /*定时任务*/
};
/**********************************************************************************************************************************/
/********************************************************************************************
* 应用程序定时任务,及信号处理
********************************************************************************************/
class ScheduleHandler : public ACE_Event_Handler
{
public:
virtual int handle_timeout(const ACE_Time_Value ¤t_time, const void *act = 0);
virtual int handle_signal(int signum, siginfo_t * = 0, ucontext_t * = 0);
};
/**********************************************************************************************************************************/
typedef ACE_Connector<Client21, ACE_SOCK_CONNECTOR> ClientConnector;
#endif