之前研究的thrift源码都是属于thrift的同步client&server,接下来看看异步client&server是怎样的?
通过搜索发现异步client&server网上介绍得少或基本没有相关信息,于是乎,还是啃源码来得爽快,因为有一句话我挺赞同的:源代码是最好的设计文档。
下面以test.thrift为例来深入研究异步client&server
/* @file : test.thrift */
namespace cpp thrift.example
service Twitter {
string sendString(1:string data);
}
生成客户代码时,需要加cob_style来生成异步client&server,如下命令:
thrift -r -strict --gen cpp:cob_style -o ./ test.thrift
执行该命令后,所生成目录gen-cpp下有一些生成文件,除了多个Twitter_async_server.skeleton.cpp和Twitter.h&Twitter.cpp多了些异步代码外,其他跟不加cob_style一样。
好了,代码都有了,接下来就通过代码来看看异步client&server是如何工作的。
一、分析异步client
先从客户代码开始研究,上面提到Twitter.h&Twitter.cpp多了些异步代码,那么只要分析这些异步代码就可以了,在test.thrift例中,异步代码主要包括TwitterCobClient、TwitterCobSvIf和TwitterAsyncProcessor三个类,这里先关下TwitterCobClient类,它是异步client,类声明如下:
class TwitterCobClient : virtual public TwitterCobClIf {
public:
TwitterCobClient(boost::shared_ptr< ::apache::thrift::async::TAsyncChannel> channel, ::apache::thrift::protocol::TProtocolFactory* protocolFactory) :
channel_(channel),
itrans_(new ::apache::thrift::transport::TMemoryBuffer()),
otrans_(new ::apache::thrift::transport::TMemoryBuffer()),
piprot_(protocolFactory->getProtocol(itrans_)),
poprot_(protocolFactory->getProtocol(otrans_)) {
iprot_ = piprot_.get();
oprot_ = poprot_.get();
}
boost::shared_ptr< ::apache::thrift::async::TAsyncChannel> getChannel() {
return channel_;
}
virtual void completed__(bool /* success */) {}
void sendString(std::tr1::function<void(TwitterCobClient* client)> cob, const std::string& data);
void send_sendString(const std::string& data);
void recv_sendString(std::string& _return);
protected:
boost::shared_ptr< ::apache::thrift::async::TAsyncChannel> channel_;
boost::shared_ptr< ::apache::thrift::transport::TMemoryBuffer> itrans_;
boost::shared_ptr< ::apache::thrift::transport::TMemoryBuffer> otrans_;
boost::shared_ptr< ::apache::thrift::protocol::TProtocol> piprot_;
boost::shared_ptr< ::apache::thrift::protocol::TProtocol> poprot_;
::apache::thrift::protocol::TProtocol* iprot_;
::apache::thrift::protocol::TProtocol* oprot_;
};
通过类实现发现:(1)completed__(bool /* success */)是虚函数,用于通知用户数据接收完成,需要用户重载实现;(2)sendString函数带有回调参数function<void(TwitterCobClient* client)> cob,用于数据接收时回调,这是异步的特点;(3)send_sendString和recv_sendString分别用于写数据到输出缓存和从输入缓存读数据(4)它拥有TAsyncChannel,后面分析之;(5)thansport采用TMemoryBuffer,TMemoryBuffer是用于程序内部之间通信用的,在这里起到读写缓存作用
看下关键函数sendString的实现:
void TwitterCobClient::sendString(std::tr1::function<void(TwitterCobClient* client)> cob, const std::string& data)
{
send_sendString(data);
channel_->sendAndRecvMessage(std::tr1::bind(cob, this), otrans_.get(), itrans_.get());
}
很容易知道send_sendString是写数据到输出缓存,但channel_->sendAndRecvMessage是干什么呢?这需要找到具体类的实现,目前thrift的c++lib库里只找到一个TAsyncChannel的实现类:TEvhttpClientChannel,为节约篇幅,下面的实现代码都是精简之后的,对理解原理没影响。
TEvhttpClientChannel的sendAndRecvMessage实现如下:
void TEvhttpClientChannel::sendAndRecvMessage(
const VoidCallback& cob,
apache::thrift::transport::TMemoryBuffer* sendBuf,
apache::thrift::transport::TMemoryBuffer* recvBuf) {
cob_ = cob; // 绑定回调函数,这里是直接赋值,可见一个请求得有独自的Channel,不然回调函数会被覆盖
recvBuf_ = recvBuf;
struct evhttp_request* req = evhttp_request_new(response, this); // 采用http协议,收到完整数据后会回调response
uint8_t* obuf;
uint32_t sz;
sendBuf->getBuffer(&obuf, &sz); // 从输出缓存读数据
rv = evhttp_make_request(conn_, req, EVHTTP_REQ_POST, path_.c_str()); // 发送http请求
}
从sendAndRecvMessage实现可看出,TEvhttpClientChannel是用采用http协议来与服务器通信,后面介绍异步server时会发现,同样采用是http协议,它们使用的http库是libevent库的evhttp。
看下回调函数response的实现:
/* static */ void TEvhttpClientChannel::response(struct evhttp_request* req, void* arg) {
TEvhttpClientChannel* self = (TEvhttpClientChannel*)arg;
self->recvBuf_->resetBuffer(
EVBUFFER_DATA(req->input_buffer),
EVBUFFER_LENGTH(req->input_buffer)); // 写数据到输入缓存
self-> cob_(); // 回调用户的注册函数
}
从TEvhttpClientChannel的构造函数可知:用户需要new 一个event_base传入TEvhttpClientChannel,利用event_base来异步收发数据。
至此,异步client源码分析完。
二、异步server
异步server关心另外两个类:TwitterCobSvIf和TwitterAsyncProcessor。很明显TwitterCobSvIf是要用户继承实现的,它与同步TwitterSvIf不同的地方是成员函数多一个cob回调函数,在实现TwitterSvIf时,需要调用cob。示例如下:
class TwitterCobSvNull : virtual public TwitterCobSvIf {
public:
virtual ~TwitterCobSvNull() {}
void sendString(std::tr1::function<void(std::string const& _return)> cob, const std::string& /* data */) {
std::string _return = "";
return cob(_return);
}
};
TwitterAsyncProcessor和同步TwitterProcessor的区别同上,即成员函数多一个cob回调函数。
那么这个cob是什么函数,哪里注册的?这在thrift lib库里的TEvhttpServer和TAsyncProtocolProcessor类里可找到答案,其中TEvhttpServer是异步server,传输是采用http协议,与异步client对上。
先看看TEvhttpServer实现,同样采用event_base来异步收发数据,收到数据时,回调request函数:
void TEvhttpServer::request(struct evhttp_request* req, void* self) {
try {
static_cast<TEvhttpServer*>(self)->process(req);
} catch(std::exception& e) {
evhttp_send_reply(req, HTTP_INTERNAL, e.what(), 0);
}
}
void TEvhttpServer::process(struct evhttp_request* req) {
RequestContext* ctx = new RequestContext(req);
return processor_->process( // 这里的processor_正是TAsyncProtocolProcessor
std::tr1::bind(
&TEvhttpServer::complete, // 注册complete
this,
ctx,
std::tr1::placeholders::_1),
ctx->ibuf,
ctx->obuf);
}
void TEvhttpServer::complete(RequestContext* ctx, bool success) {
std::auto_ptr<RequestContext> ptr(ctx);
int code = success ? 200 : 400;
const char* reason = success ? "OK" : "Bad Request";
int rv = evhttp_add_header(ctx->req->output_headers, "Content-Type", "application/x-thrift");
struct evbuffer* buf = evbuffer_new();
uint8_t* obuf;
uint32_t sz;
ctx->obuf->getBuffer(&obuf, &sz); // 从输出缓冲读数据
int ret = evbuffer_add(buf, obuf, sz);
evhttp_send_reply(ctx->req, code, reason, buf); // 发送数据
}
接着看TAsyncProtocolProcessor的process实现
void TAsyncProtocolProcessor::process(
std::tr1::function<void(bool healthy)> _return,
boost::shared_ptr<TBufferBase> ibuf,
boost::shared_ptr<TBufferBase> obuf) {
boost::shared_ptr<TProtocol> iprot(pfact_->getProtocol(ibuf));
boost::shared_ptr<TProtocol> oprot(pfact_->getProtocol(obuf));
return underlying_->process( // underlying_是生成代码里的TwitterAsyncProcessor
std::tr1::bind(
&TAsyncProtocolProcessor::finish,
_return, // compere函数
oprot,
std::tr1::placeholders::_1),
iprot, oprot);
}
/* static */ void TAsyncProtocolProcessor::finish(
std::tr1::function<void(bool healthy)> _return,
boost::shared_ptr<TProtocol> oprot,
bool healthy) {
(void) oprot;
// This is a stub function to hold a reference to oprot.
return _return(healthy); // 回调compere函数
}
最后看TwitterAsyncProcessor::process,它先写fname,mtype, seqid然后调用process_fn,process_fn选择调用合理的处理函数(如process_sendString),看process_sendString实现:
void TwitterAsyncProcessor::process_sendString(std::tr1::function<void(bool ok)> cob, int32_t seqid, ::apache::thrift::protocol::TProtocol* iprot, ::apache::thrift::protocol::TProtocol* oprot)
void (TwitterAsyncProcessor::*return_fn)(std::tr1::function<void(bool ok)> cob, int32_t seqid, ::apache::thrift::protocol::TProtocol* oprot, void* ctx, const std::string& _return) =
&TwitterAsyncProcessor::return_sendString; // return_sendString正是我们要找的cob函数
iface_->sendString( // iface_是TwitterCobSvIf的具体类,用户实现的
std::tr1::bind(return_fn, this, cob, seqid, oprot, ctx, std::tr1::placeholders::_1), // cob 是 finish函数
args.data);
}
上面return_sendString是我们要找的cob函数,该函数将用户处理的结果写入输出冲缓,并发送给client。
至此,异步server分析完毕。
总结:
异步client和异步server需要一起使用,不能指定协议,采用HTTP协议进行通信,使用TMemoryBuffer来作输入输出缓存。异步client发出异步请求时没有超时机制,有可能一直不回调callback函数,多个请求不能共享异步client,而且需要用户来实现上下文的绑定。 异步server与同步server在使用上几乎一样,区别不大!