现在的位置: 首页 > 综合 > 正文

TcpClient connector(39)

2013年12月15日 ⁄ 综合 ⁄ 共 14155字 ⁄ 字号 评论关闭

muduo库对编写tcp客户端程序的支持
Connector    // 主动发起连接
TcpClient    // 包含了一个Connector对象

测试程序
Reactor_test11.cc    // echo server

TcpClient_test.cc    // echo client




TcpClient 头文件

TcpClient.h

// Copyright 2010, Shuo Chen.  All rights reserved.
// http://code.google.com/p/muduo/
//
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.

// Author: Shuo Chen (chenshuo at chenshuo dot com)
//
// This is a public header file, it must only include public header files.

#ifndef MUDUO_NET_TCPCLIENT_H
#define MUDUO_NET_TCPCLIENT_H

#include <boost/noncopyable.hpp>

#include <muduo/base/Mutex.h>
#include <muduo/net/TcpConnection.h>

namespace muduo
{
namespace net
{

class Connector;
typedef boost::shared_ptr<Connector> ConnectorPtr;

class TcpClient : boost::noncopyable
{
 public:
  // TcpClient(EventLoop* loop);
  // TcpClient(EventLoop* loop, const string& host, uint16_t port);
  TcpClient(EventLoop* loop,
            const InetAddress& serverAddr,
            const string& name);
  ~TcpClient();  // force out-line dtor, for scoped_ptr members.

  void connect();
  void disconnect();
  void stop();

  TcpConnectionPtr connection() const
  {
    MutexLockGuard lock(mutex_);
    return connection_;
  }

  bool retry() const;
  void enableRetry() { retry_ = true; }

  /// Set connection callback.
  /// Not thread safe.
  void setConnectionCallback(const ConnectionCallback& cb)
  { connectionCallback_ = cb; }

  /// Set message callback.
  /// Not thread safe.
  void setMessageCallback(const MessageCallback& cb)
  { messageCallback_ = cb; }

  /// Set write complete callback.
  /// Not thread safe.
  void setWriteCompleteCallback(const WriteCompleteCallback& cb)
  { writeCompleteCallback_ = cb; }

 private:
  /// Not thread safe, but in loop
  void newConnection(int sockfd);
  /// Not thread safe, but in loop
  void removeConnection(const TcpConnectionPtr& conn);

  EventLoop* loop_;
  ConnectorPtr connector_;  // 用于主动发起连接
  const string name_;       // 名称
  ConnectionCallback connectionCallback_;       // 连接建立回调函数
  MessageCallback messageCallback_;             // 消息到来回调函数
  WriteCompleteCallback writeCompleteCallback_; // 数据发送完毕回调函数
  bool retry_;   // 重连,是指连接建立之后又断开的时候是否重连,
                //这个跟connector里面的retry是不一样的,connector里面的retry表示连接不成功时是否要重连
  bool connect_; // atomic 是否发起连接
  // always in loop thread
  int nextConnId_;          // name_ + nextConnId_用于标识一个连接
  mutable MutexLock mutex_;
  TcpConnectionPtr connection_; // Connector连接成功以后,得到一个TcpConnection
};

}
}

#endif  // MUDUO_NET_TCPCLIENT_H


TcpClient 源文件

TcpClient.cc

// Copyright 2010, Shuo Chen.  All rights reserved.
// http://code.google.com/p/muduo/
//
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.

// Author: Shuo Chen (chenshuo at chenshuo dot com)
//

#include <muduo/net/TcpClient.h>

#include <muduo/base/Logging.h>
#include <muduo/net/Connector.h>
#include <muduo/net/EventLoop.h>
#include <muduo/net/SocketsOps.h>

#include <boost/bind.hpp>

#include <stdio.h>  // snprintf

using namespace muduo;
using namespace muduo::net;

// TcpClient::TcpClient(EventLoop* loop)
//   : loop_(loop)
// {
// }

// TcpClient::TcpClient(EventLoop* loop, const string& host, uint16_t port)
//   : loop_(CHECK_NOTNULL(loop)),
//     serverAddr_(host, port)
// {
// }

namespace muduo
{
namespace net
{
namespace detail
{

void removeConnection(EventLoop* loop, const TcpConnectionPtr& conn)
{
  loop->queueInLoop(boost::bind(&TcpConnection::connectDestroyed, conn));
}

void removeConnector(const ConnectorPtr& connector)
{
  //connector->
}

}
}
}

TcpClient::TcpClient(EventLoop* loop,
                     const InetAddress& serverAddr,
                     const string& name)
  : loop_(CHECK_NOTNULL(loop)),
    connector_(new Connector(loop, serverAddr)),
    name_(name),
    connectionCallback_(defaultConnectionCallback),
    messageCallback_(defaultMessageCallback),
    retry_(false),
    connect_(true),
    nextConnId_(1)
{
  // 设置连接成功回调函数
  connector_->setNewConnectionCallback(
      boost::bind(&TcpClient::newConnection, this, _1));
  // FIXME setConnectFailedCallback
  LOG_INFO << "TcpClient::TcpClient[" << name_
           << "] - connector " << get_pointer(connector_);
}

TcpClient::~TcpClient()
{
  LOG_INFO << "TcpClient::~TcpClient[" << name_
           << "] - connector " << get_pointer(connector_);
  TcpConnectionPtr conn;
  {
    MutexLockGuard lock(mutex_);
    conn = connection_;
  }
  if (conn)
  {
    // FIXME: not 100% safe, if we are in different thread

    // 重新设置TcpConnection中的closeCallback_为detail::removeConnection
    //这里不用TcpClient::removeConnection的原因是 TcpClient::removeConnection 有重连功能,
    //这里已经不需要重连了,直接调用detail::removeConnection就行了
    CloseCallback cb = boost::bind(&detail::removeConnection, loop_, _1);
    loop_->runInLoop(
        boost::bind(&TcpConnection::setCloseCallback, conn, cb));
  }
  else
  {
    // 这种情况,说明connector处于未连接状态,将connector_停止
    connector_->stop();
    // FIXME: HACK
    loop_->runAfter(1, boost::bind(&detail::removeConnector, connector_));
  }
}

void TcpClient::connect()
{
  // FIXME: check state
  LOG_INFO << "TcpClient::connect[" << name_ << "] - connecting to "
           << connector_->serverAddress().toIpPort();
  connect_ = true;
  connector_->start();   // 发起连接
}

// 用于连接已建立的情况下,关闭连接
void TcpClient::disconnect()
{
  connect_ = false;

  {
    MutexLockGuard lock(mutex_);
    if (connection_)
    {
      connection_->shutdown();
    }
  }
}

// 停止connector_ ,连接尚未成功时进行断开
void TcpClient::stop()
{
  connect_ = false;
  connector_->stop();
}

//有新的连接到来
void TcpClient::newConnection(int sockfd)
{
  loop_->assertInLoopThread();
  InetAddress peerAddr(sockets::getPeerAddr(sockfd));
  char buf[32];
  snprintf(buf, sizeof buf, ":%s#%d", peerAddr.toIpPort().c_str(), nextConnId_);
  ++nextConnId_;
  string connName = name_ + buf;

  InetAddress localAddr(sockets::getLocalAddr(sockfd));
  // FIXME poll with zero timeout to double confirm the new connection
  // FIXME use make_shared if necessary
  TcpConnectionPtr conn(new TcpConnection(loop_,
                                          connName,
                                          sockfd,
                                          localAddr,
                                          peerAddr));

  conn->setConnectionCallback(connectionCallback_);
  conn->setMessageCallback(messageCallback_);
  conn->setWriteCompleteCallback(writeCompleteCallback_);
  conn->setCloseCallback(
      boost::bind(&TcpClient::removeConnection, this, _1)); // FIXME: unsafe
  {
    MutexLockGuard lock(mutex_);
    connection_ = conn;     // 保存TcpConnection
  }
  conn->connectEstablished();        // 这里回调connectionCallback_
}

/*移除connector*/
void TcpClient::removeConnection(const TcpConnectionPtr& conn)
{
  loop_->assertInLoopThread();
  assert(loop_ == conn->getLoop());

  {
    MutexLockGuard lock(mutex_);
    assert(connection_ == conn);
    connection_.reset();
  }
  /*放到IO线程中销毁*/
  loop_->queueInLoop(boost::bind(&TcpConnection::connectDestroyed, conn));
  //如果需要重连,则重新连接
  if (retry_ && connect_)
  {
    LOG_INFO << "TcpClient::connect[" << name_ << "] - Reconnecting to "
             << connector_->serverAddress().toIpPort();
    // 这里的重连是指连接建立成功之后被断开的重连
    connector_->restart();
  }
}



Connector 头文件

Connector.h

// Copyright 2010, Shuo Chen.  All rights reserved.
// http://code.google.com/p/muduo/
//
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.

// Author: Shuo Chen (chenshuo at chenshuo dot com)
//
// This is an internal header file, you should not include this.

#ifndef MUDUO_NET_CONNECTOR_H
#define MUDUO_NET_CONNECTOR_H

#include <muduo/net/InetAddress.h>

#include <boost/enable_shared_from_this.hpp>
#include <boost/function.hpp>
#include <boost/noncopyable.hpp>
#include <boost/scoped_ptr.hpp>

namespace muduo
{
namespace net
{

class Channel;
class EventLoop;

// 主动发起连接,带有自动重连功能
class Connector : boost::noncopyable,
                  public boost::enable_shared_from_this<Connector>
{
 public:
  typedef boost::function<void (int sockfd)> NewConnectionCallback;

  Connector(EventLoop* loop, const InetAddress& serverAddr);
  ~Connector();
  /*设置连接成功后的回调函数*/
  void setNewConnectionCallback(const NewConnectionCallback& cb)
  { newConnectionCallback_ = cb; }

  void start();  // can be called in any thread
  void restart();  // must be called in loop thread
  void stop();  // can be called in any thread
//服务器IP地址
  const InetAddress& serverAddress() const { return serverAddr_; }

 private:
  enum States { kDisconnected/*断开状态*/, kConnecting/*正在连接中*/, kConnected/*已连接*/ };
  static const int kMaxRetryDelayMs = 30*1000;          // 30秒,最大重连延迟时间
  static const int kInitRetryDelayMs = 500;             // 0.5秒,初始状态,连接不上,0.5秒后重连

  void setState(States s) { state_ = s; }
  void startInLoop();
  void stopInLoop();
  void connect();
  void connecting(int sockfd);
  void handleWrite();
  void handleError();
  void retry(int sockfd);
  int removeAndResetChannel();
  void resetChannel();

  EventLoop* loop_;         // 所属EventLoop
  InetAddress serverAddr_;  // 服务器端地址
  bool connect_; // atomic是否连接
  States state_;  // FIXME: use atomic variable
  boost::scoped_ptr<Channel> channel_;    // Connector所对应的Channel
  NewConnectionCallback newConnectionCallback_;     // 连接成功回调函数,
  int retryDelayMs_;        // 重连延迟时间(单位:毫秒)
};

}
}

#endif  // MUDUO_NET_CONNECTOR_H


Connector 源文件

Connector.cc

// Copyright 2010, Shuo Chen.  All rights reserved.
// http://code.google.com/p/muduo/
//
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.

// Author: Shuo Chen (chenshuo at chenshuo dot com)
//

#include <muduo/net/Connector.h>

#include <muduo/base/Logging.h>
#include <muduo/net/Channel.h>
#include <muduo/net/EventLoop.h>
#include <muduo/net/SocketsOps.h>

#include <boost/bind.hpp>

#include <errno.h>

using namespace muduo;
using namespace muduo::net;

const int Connector::kMaxRetryDelayMs;

Connector::Connector(EventLoop* loop, const InetAddress& serverAddr)
  : loop_(loop),
    serverAddr_(serverAddr),
    connect_(false),
    state_(kDisconnected),
    retryDelayMs_(kInitRetryDelayMs)
{
  LOG_DEBUG << "ctor[" << this << "]";
}

Connector::~Connector()
{
  LOG_DEBUG << "dtor[" << this << "]";
  assert(!channel_);
}

// 可以跨线程调用 ,把重新发起的连接函数放到IO线程去,所以是线程
// 安全的
void Connector::start()
{
  connect_ = true;
  loop_->runInLoop(boost::bind(&Connector::startInLoop, this)); // FIXME: unsafe
}

/*重新发起连接*/
void Connector::startInLoop()
{
  loop_->assertInLoopThread();
  assert(state_ == kDisconnected);
  if (connect_)
  { 
    /*如果为true 重新连接,
    如果调用stop 后,那么connect_ = false
    */
    connect();
  }
  else
  {
    LOG_DEBUG << "do not connect";
  }
}

/*让IO线程线程停止连接,这个连接处于*/
void Connector::stop()
{
  connect_ = false;
  loop_->runInLoop(boost::bind(&Connector::stopInLoop, this)); // FIXME: unsafe
  // FIXME: cancel timer
}

void Connector::stopInLoop()
{
  loop_->assertInLoopThread();
  if (state_ == kConnecting)
  {
    setState(kDisconnected);
    int sockfd = removeAndResetChannel();   // 将通道从poller中移除关注,并将channel置空
    retry(sockfd);      // 这里并非要重连,只是调用sockets::close(sockfd);
  }
}

/*发起连接  */
void Connector::connect()
{
  int sockfd = sockets::createNonblockingOrDie();   // 创建非阻塞套接字
  /*进行连接请求*/
  int ret = sockets::connect(sockfd, serverAddr_.getSockAddrInet());

  int savedErrno = (ret == 0) ? 0 : errno;
  switch (savedErrno)
  {
    case 0:
    case EINPROGRESS:   // 非阻塞套接字,未连接成功返回码是EINPROGRESS表示正在连接
    case EINTR:
    case EISCONN:           // 连接成功
      connecting(sockfd);
      break;

    case EAGAIN:
    case EADDRINUSE:
    case EADDRNOTAVAIL:
    case ECONNREFUSED:
    case ENETUNREACH:
      retry(sockfd);        // 重连
      break;

    case EACCES:
    case EPERM:
    case EAFNOSUPPORT:
    case EALREADY:
    case EBADF:
    case EFAULT:
    case ENOTSOCK:
      LOG_SYSERR << "connect error in Connector::startInLoop " << savedErrno;
      sockets::close(sockfd);   // 不能重连,关闭sockfd
      break;

    default:
      LOG_SYSERR << "Unexpected error in Connector::startInLoop " << savedErrno;
      sockets::close(sockfd);
      // connectErrorCallback_();
      break;
  }
}

// 不能跨线程调用
void Connector::restart()
{
  loop_->assertInLoopThread();
  setState(kDisconnected);
  retryDelayMs_ = kInitRetryDelayMs;
  connect_ = true;
  startInLoop();
}

void Connector::connecting(int sockfd)
{
  //不管是真正连接还是连接成功了,都把状态设置为kConnecting
  setState(kConnecting);
  assert(!channel_);
  // Channel与sockfd关联
  channel_.reset(new Channel(loop_, sockfd));
  // 设置可写回调函数,主要是为了“正在连接”状态设置的
  channel_->setWriteCallback(
      boost::bind(&Connector::handleWrite, this)); // FIXME: unsafe
  // 设置错误回调函数
  channel_->setErrorCallback(
      boost::bind(&Connector::handleError, this)); // FIXME: unsafe

  // channel_->tie(shared_from_this()); is not working,
  // as channel_ is not managed by shared_ptr

  channel_->enableWriting();     // 让Poller关注可写事件,writing一定产生,不管是连接成功还是连接失败,
                                // 
}

int Connector::removeAndResetChannel()
{
  channel_->disableAll();
  channel_->remove();            // 从poller移除关注
  int sockfd = channel_->fd();
  // Can't reset channel_ here, because we are inside Channel::handleEvent
  // 不能在这里重置channel_,因为正在调用Channel::handleEvent
  loop_->queueInLoop(boost::bind(&Connector::resetChannel, this)); // FIXME: unsafe
  return sockfd;
}

void Connector::resetChannel()
{
  channel_.reset();     // channel_ 置空
}


void Connector::handleWrite()
{
  LOG_TRACE << "Connector::handleWrite " << state_;
  /*由于在Connector::connecting注册可写事件时,“状态” 被设置为kConnecting(不管完不完成连接),
  所以第一次handleWrite时,得把“状态”设置为kConnected*/
  if (state_ == kConnecting)
  { 
    /*这里不用再关注可写事件了,这里也是防止busy loop*/
    int sockfd = removeAndResetChannel();   // 从poller中移除关注,并将channel置空
    // socket可写并不意味着连接一定建立成功
    // 还需要用getsockopt(sockfd, SOL_SOCKET, SO_ERROR, ...)再次确认一下。
    int err = sockets::getSocketError(sockfd);
    if (err)        // 有错误
    {
      LOG_WARN << "Connector::handleWrite - SO_ERROR = "
               << err << " " << strerror_tl(err);
      retry(sockfd);        // 重连
    }
    else if (sockets::isSelfConnect(sockfd))        // 自连接
    {
      LOG_WARN << "Connector::handleWrite - Self connect";
      retry(sockfd);        // 重连
    }
    else    // 连接成功
    {
      setState(kConnected);
      if (connect_)
      {
        newConnectionCallback_(sockfd);     // 回调连接成功的回调函数
      }
      else
      {
        sockets::close(sockfd);
      }
    }
  }
  else
  {
    // what happened?
    assert(state_ == kDisconnected);
  }
}

/*连接出错的处理函数*/
void Connector::handleError()
{
  LOG_ERROR << "Connector::handleError";
  assert(state_ == kConnecting);

  int sockfd = removeAndResetChannel();     // 从poller中移除关注,并将channel置空
  int err = sockets::getSocketError(sockfd);
  LOG_TRACE << "SO_ERROR = " << err << " " << strerror_tl(err);
  retry(sockfd);
}

// 采用back-off策略重连,即重连时间逐渐延长,0.5s, 1s, 2s, ...直至30s
void Connector::retry(int sockfd)
{
  sockets::close(sockfd);
  setState(kDisconnected);
  if (connect_)
  {
    LOG_INFO << "Connector::retry - Retry connecting to " << serverAddr_.toIpPort()
             << " in " << retryDelayMs_ << " milliseconds. ";
    // 注册一个定时操作,重连
    loop_->runAfter(retryDelayMs_/1000.0,
                    boost::bind(&Connector::startInLoop, shared_from_this()));
    retryDelayMs_ = std::min(retryDelayMs_ * 2, kMaxRetryDelayMs);
  }
  else
  {
    LOG_DEBUG << "do not connect";
  }
}


测试程序

#include <muduo/net/Channel.h>
#include <muduo/net/TcpClient.h>

#include <muduo/base/Logging.h>
#include <muduo/net/EventLoop.h>
#include <muduo/net/InetAddress.h>

#include <boost/bind.hpp>

#include <stdio.h>

using namespace muduo;
using namespace muduo::net;

class TestClient
{
 public:
  TestClient(EventLoop* loop, const InetAddress& listenAddr)
    : loop_(loop),
      client_(loop, listenAddr, "TestClient"),
      stdinChannel_(loop, 0)
  {
    client_.setConnectionCallback(
        boost::bind(&TestClient::onConnection, this, _1));
    client_.setMessageCallback(
        boost::bind(&TestClient::onMessage, this, _1, _2, _3));
    //client_.enableRetry();
    // 标准输入缓冲区中有数据的时候,回调TestClient::handleRead
    stdinChannel_.setReadCallback(boost::bind(&TestClient::handleRead, this));
    stdinChannel_.enableReading();
  }

  void connect()
  {
    client_.connect();
  }

 private:
  void onConnection(const TcpConnectionPtr& conn)
  {
    if (conn->connected())
    {
      printf("onConnection(): new connection [%s] from %s\n",
             conn->name().c_str(),
             conn->peerAddress().toIpPort().c_str());
    }
    else
    {
      printf("onConnection(): connection [%s] is down\n",
             conn->name().c_str());
    }
  }

  void onMessage(const TcpConnectionPtr& conn, Buffer* buf, Timestamp time)
  {
    string msg(buf->retrieveAllAsString());
    printf("onMessage(): recv a message [%s]\n", msg.c_str());
    LOG_TRACE << conn->name() << " recv " << msg.size() << " bytes at " << time.toFormattedString();
  }

  // 标准输入缓冲区中有数据的时候,回调该函数
  void handleRead()
  {
    char buf[1024] = {0};
    fgets(buf, 1024, stdin);
    buf[strlen(buf)-1] = '\0';      // 去除\n
    client_.connection()->send(buf);
  }

  EventLoop* loop_;
  TcpClient client_;
  Channel stdinChannel_;        // 标准输入Channel
};

int main(int argc, char* argv[])
{
  LOG_INFO << "pid = " << getpid() << ", tid = " << CurrentThread::tid();
  EventLoop loop;
  InetAddress serverAddr("127.0.0.1", 8888);
  TestClient client(&loop, serverAddr);
  client.connect();
  loop.loop();
}

抱歉!评论已关闭.