现在的位置: 首页 > 综合 > 正文

thrift源码研究-异步client&server

2018年03月30日 ⁄ 综合 ⁄ 共 7317字 ⁄ 字号 评论关闭

      之前研究的thrift源码都是属于thrift的同步client&server,接下来看看异步client&server是怎样的?

       通过搜索发现异步client&server网上介绍得少或基本没有相关信息,于是乎,还是啃源码来得爽快,因为有一句话我挺赞同的:源代码是最好的设计文档。

      下面以test.thrift为例来深入研究异步client&server

/* @file : test.thrift */ 

namespace cpp thrift.example

service Twitter {
   string sendString(1:string data);
}

      生成客户代码时,需要加cob_style来生成异步client&server,如下命令:

      thrift -r -strict  --gen cpp:cob_style -o ./ test.thrift

      执行该命令后,所生成目录gen-cpp下有一些生成文件,除了多个Twitter_async_server.skeleton.cpp和Twitter.h&Twitter.cpp多了些异步代码外,其他跟不加cob_style一样。

     好了,代码都有了,接下来就通过代码来看看异步client&server是如何工作的。

一、分析异步client

     先从客户代码开始研究,上面提到Twitter.h&Twitter.cpp多了些异步代码,那么只要分析这些异步代码就可以了,在test.thrift例中,异步代码主要包括TwitterCobClient、TwitterCobSvIf和TwitterAsyncProcessor三个类,这里先关下TwitterCobClient类,它是异步client,类声明如下:

class TwitterCobClient : virtual public TwitterCobClIf {
 public:
  TwitterCobClient(boost::shared_ptr< ::apache::thrift::async::TAsyncChannel> channel, ::apache::thrift::protocol::TProtocolFactory* protocolFactory) :
    channel_(channel),
    itrans_(new ::apache::thrift::transport::TMemoryBuffer()),
    otrans_(new ::apache::thrift::transport::TMemoryBuffer()),
    piprot_(protocolFactory->getProtocol(itrans_)),
    poprot_(protocolFactory->getProtocol(otrans_)) {
    iprot_ = piprot_.get();
    oprot_ = poprot_.get();
  }
  boost::shared_ptr< ::apache::thrift::async::TAsyncChannel> getChannel() {
    return channel_;
  }
  virtual void completed__(bool /* success */) {}
  void sendString(std::tr1::function<void(TwitterCobClient* client)> cob, const std::string& data);
  void send_sendString(const std::string& data);
  void recv_sendString(std::string& _return);
 protected:
  boost::shared_ptr< ::apache::thrift::async::TAsyncChannel> channel_;
  boost::shared_ptr< ::apache::thrift::transport::TMemoryBuffer> itrans_;
  boost::shared_ptr< ::apache::thrift::transport::TMemoryBuffer> otrans_;
  boost::shared_ptr< ::apache::thrift::protocol::TProtocol> piprot_;
  boost::shared_ptr< ::apache::thrift::protocol::TProtocol> poprot_;
  ::apache::thrift::protocol::TProtocol* iprot_;
  ::apache::thrift::protocol::TProtocol* oprot_;
};

     通过类实现发现:(1)completed__(bool /* success */)是虚函数,用于通知用户数据接收完成,需要用户重载实现;(2)sendString函数带有回调参数function<void(TwitterCobClient* client)> cob,用于数据接收时回调,这是异步的特点;(3)send_sendString和recv_sendString分别用于写数据到输出缓存和从输入缓存读数据(4)它拥有TAsyncChannel,后面分析之;(5)thansport采用TMemoryBuffer,TMemoryBuffer是用于程序内部之间通信用的,在这里起到读写缓存作用

     看下关键函数sendString的实现:

void TwitterCobClient::sendString(std::tr1::function<void(TwitterCobClient* client)> cob, const std::string& data)
{
  send_sendString(data);      
  channel_->sendAndRecvMessage(std::tr1::bind(cob, this), otrans_.get(), itrans_.get());  
}

     很容易知道send_sendString是写数据到输出缓存,但channel_->sendAndRecvMessage是干什么呢?这需要找到具体类的实现,目前thrift的c++lib库里只找到一个TAsyncChannel的实现类:TEvhttpClientChannel,为节约篇幅,下面的实现代码都是精简之后的,对理解原理没影响。

      TEvhttpClientChannel的sendAndRecvMessage实现如下:

void TEvhttpClientChannel::sendAndRecvMessage(
    const VoidCallback& cob,
    apache::thrift::transport::TMemoryBuffer* sendBuf,
    apache::thrift::transport::TMemoryBuffer* recvBuf) {
  cob_ = cob;           //  绑定回调函数,这里是直接赋值,可见一个请求得有独自的Channel,不然回调函数会被覆盖
  recvBuf_ = recvBuf;

  struct evhttp_request* req = evhttp_request_new(response, this);     // 采用http协议,收到完整数据后会回调response

  uint8_t* obuf;
  uint32_t sz;
  sendBuf->getBuffer(&obuf, &sz);  // 从输出缓存读数据

  rv = evhttp_make_request(conn_, req, EVHTTP_REQ_POST, path_.c_str());  // 发送http请求

}

     从sendAndRecvMessage实现可看出,TEvhttpClientChannel是用采用http协议来与服务器通信,后面介绍异步server时会发现,同样采用是http协议,它们使用的http库是libevent库的evhttp。

      看下回调函数response的实现:

/* static */ void TEvhttpClientChannel::response(struct evhttp_request* req, void* arg) {
  TEvhttpClientChannel* self = (TEvhttpClientChannel*)arg;
  self->recvBuf_->resetBuffer(
      EVBUFFER_DATA(req->input_buffer),
      EVBUFFER_LENGTH(req->input_buffer));    // 写数据到输入缓存
    self-> cob_();   // 回调用户的注册函数
  }

   从TEvhttpClientChannel的构造函数可知:用户需要new 一个event_base传入TEvhttpClientChannel,利用event_base来异步收发数据。
   至此,异步client源码分析完。

   二、异步server

       异步server关心另外两个类:TwitterCobSvIf和TwitterAsyncProcessor。很明显TwitterCobSvIf是要用户继承实现的,它与同步TwitterSvIf不同的地方是成员函数多一个cob回调函数,在实现TwitterSvIf时,需要调用cob。示例如下:
class TwitterCobSvNull : virtual public TwitterCobSvIf {
 public:
  virtual ~TwitterCobSvNull() {}
  void sendString(std::tr1::function<void(std::string const& _return)> cob, const std::string& /* data */) {
    std::string _return = "";
    return cob(_return);
  }
};

     TwitterAsyncProcessor和同步TwitterProcessor的区别同上,即成员函数多一个cob回调函数。

     那么这个cob是什么函数,哪里注册的?这在thrift lib库里的TEvhttpServer和TAsyncProtocolProcessor类里可找到答案,其中TEvhttpServer是异步server,传输是采用http协议,与异步client对上。

     先看看TEvhttpServer实现,同样采用event_base来异步收发数据,收到数据时,回调request函数:

void TEvhttpServer::request(struct evhttp_request* req, void* self) {
  try {
    static_cast<TEvhttpServer*>(self)->process(req);
  } catch(std::exception& e) {
    evhttp_send_reply(req, HTTP_INTERNAL, e.what(), 0);
  }
}
void TEvhttpServer::process(struct evhttp_request* req) {
  RequestContext* ctx = new RequestContext(req);
  return processor_->process(     // 这里的processor_正是TAsyncProtocolProcessor
      std::tr1::bind(
        &TEvhttpServer::complete,   // 注册complete
        this,
        ctx,
        std::tr1::placeholders::_1),
      ctx->ibuf,
      ctx->obuf);
}

void TEvhttpServer::complete(RequestContext* ctx, bool success) {
  std::auto_ptr<RequestContext> ptr(ctx);

  int code = success ? 200 : 400;
  const char* reason = success ? "OK" : "Bad Request";

  int rv = evhttp_add_header(ctx->req->output_headers, "Content-Type", "application/x-thrift");

  struct evbuffer* buf = evbuffer_new();
    uint8_t* obuf;
    uint32_t sz;
    ctx->obuf->getBuffer(&obuf, &sz);   // 从输出缓冲读数据
    int ret = evbuffer_add(buf, obuf, sz);

  evhttp_send_reply(ctx->req, code, reason, buf);   // 发送数据
  }

    接着看TAsyncProtocolProcessor的process实现

void TAsyncProtocolProcessor::process(
    std::tr1::function<void(bool healthy)> _return,
    boost::shared_ptr<TBufferBase> ibuf,
    boost::shared_ptr<TBufferBase> obuf) {
  boost::shared_ptr<TProtocol> iprot(pfact_->getProtocol(ibuf));
  boost::shared_ptr<TProtocol> oprot(pfact_->getProtocol(obuf));
  return underlying_->process(     // underlying_是生成代码里的TwitterAsyncProcessor
      std::tr1::bind(
        &TAsyncProtocolProcessor::finish,   
        _return,  // compere函数
        oprot,
        std::tr1::placeholders::_1),
      iprot, oprot);
}

/* static */ void TAsyncProtocolProcessor::finish(
    std::tr1::function<void(bool healthy)> _return,
    boost::shared_ptr<TProtocol> oprot,
    bool healthy) {
  (void) oprot;
  // This is a stub function to hold a reference to oprot.
  return _return(healthy);  // 回调compere函数
}

      最后看TwitterAsyncProcessor::process,它先写fname,mtype, seqid然后调用process_fn,process_fn选择调用合理的处理函数(如process_sendString),看process_sendString实现:

 void TwitterAsyncProcessor::process_sendString(std::tr1::function<void(bool ok)> cob, int32_t seqid, ::apache::thrift::protocol::TProtocol* iprot, ::apache::thrift::protocol::TProtocol* oprot)

void (TwitterAsyncProcessor::*return_fn)(std::tr1::function<void(bool ok)> cob, int32_t seqid, ::apache::thrift::protocol::TProtocol* oprot, void* ctx, const std::string& _return)    =
    &TwitterAsyncProcessor::return_sendString;              // return_sendString正是我们要找的cob函数
    iface_->sendString(                      // iface_是TwitterCobSvIf的具体类,用户实现的
      std::tr1::bind(return_fn, this, cob, seqid, oprot, ctx, std::tr1::placeholders::_1),    // cob 是 finish函数
      args.data);

}

    上面return_sendString是我们要找的cob函数,该函数将用户处理的结果写入输出冲缓,并发送给client。

    至此,异步server分析完毕。

 

总结:

        异步client和异步server需要一起使用,不能指定协议,采用HTTP协议进行通信,使用TMemoryBuffer来作输入输出缓存。异步client发出异步请求时没有超时机制,有可能一直不回调callback函数,多个请求不能共享异步client,而且需要用户来实现上下文的绑定。 异步server与同步server在使用上几乎一样,区别不大!

抱歉!评论已关闭.