muduo库对编写tcp客户端程序的支持
Connector // 主动发起连接
TcpClient // 包含了一个Connector对象
测试程序
Reactor_test11.cc // echo server
TcpClient_test.cc // echo client
TcpClient 头文件
TcpClient.h
// Copyright 2010, Shuo Chen. All rights reserved. // http://code.google.com/p/muduo/ // // Use of this source code is governed by a BSD-style license // that can be found in the License file. // Author: Shuo Chen (chenshuo at chenshuo dot com) // // This is a public header file, it must only include public header files. #ifndef MUDUO_NET_TCPCLIENT_H #define MUDUO_NET_TCPCLIENT_H #include <boost/noncopyable.hpp> #include <muduo/base/Mutex.h> #include <muduo/net/TcpConnection.h> namespace muduo { namespace net { class Connector; typedef boost::shared_ptr<Connector> ConnectorPtr; class TcpClient : boost::noncopyable { public: // TcpClient(EventLoop* loop); // TcpClient(EventLoop* loop, const string& host, uint16_t port); TcpClient(EventLoop* loop, const InetAddress& serverAddr, const string& name); ~TcpClient(); // force out-line dtor, for scoped_ptr members. void connect(); void disconnect(); void stop(); TcpConnectionPtr connection() const { MutexLockGuard lock(mutex_); return connection_; } bool retry() const; void enableRetry() { retry_ = true; } /// Set connection callback. /// Not thread safe. void setConnectionCallback(const ConnectionCallback& cb) { connectionCallback_ = cb; } /// Set message callback. /// Not thread safe. void setMessageCallback(const MessageCallback& cb) { messageCallback_ = cb; } /// Set write complete callback. /// Not thread safe. void setWriteCompleteCallback(const WriteCompleteCallback& cb) { writeCompleteCallback_ = cb; } private: /// Not thread safe, but in loop void newConnection(int sockfd); /// Not thread safe, but in loop void removeConnection(const TcpConnectionPtr& conn); EventLoop* loop_; ConnectorPtr connector_; // 用于主动发起连接 const string name_; // 名称 ConnectionCallback connectionCallback_; // 连接建立回调函数 MessageCallback messageCallback_; // 消息到来回调函数 WriteCompleteCallback writeCompleteCallback_; // 数据发送完毕回调函数 bool retry_; // 重连,是指连接建立之后又断开的时候是否重连, //这个跟connector里面的retry是不一样的,connector里面的retry表示连接不成功时是否要重连 bool connect_; // atomic 是否发起连接 // always in loop thread int nextConnId_; // name_ + nextConnId_用于标识一个连接 mutable MutexLock mutex_; TcpConnectionPtr connection_; // Connector连接成功以后,得到一个TcpConnection }; } } #endif // MUDUO_NET_TCPCLIENT_H
TcpClient 源文件
TcpClient.cc
// Copyright 2010, Shuo Chen. All rights reserved. // http://code.google.com/p/muduo/ // // Use of this source code is governed by a BSD-style license // that can be found in the License file. // Author: Shuo Chen (chenshuo at chenshuo dot com) // #include <muduo/net/TcpClient.h> #include <muduo/base/Logging.h> #include <muduo/net/Connector.h> #include <muduo/net/EventLoop.h> #include <muduo/net/SocketsOps.h> #include <boost/bind.hpp> #include <stdio.h> // snprintf using namespace muduo; using namespace muduo::net; // TcpClient::TcpClient(EventLoop* loop) // : loop_(loop) // { // } // TcpClient::TcpClient(EventLoop* loop, const string& host, uint16_t port) // : loop_(CHECK_NOTNULL(loop)), // serverAddr_(host, port) // { // } namespace muduo { namespace net { namespace detail { void removeConnection(EventLoop* loop, const TcpConnectionPtr& conn) { loop->queueInLoop(boost::bind(&TcpConnection::connectDestroyed, conn)); } void removeConnector(const ConnectorPtr& connector) { //connector-> } } } } TcpClient::TcpClient(EventLoop* loop, const InetAddress& serverAddr, const string& name) : loop_(CHECK_NOTNULL(loop)), connector_(new Connector(loop, serverAddr)), name_(name), connectionCallback_(defaultConnectionCallback), messageCallback_(defaultMessageCallback), retry_(false), connect_(true), nextConnId_(1) { // 设置连接成功回调函数 connector_->setNewConnectionCallback( boost::bind(&TcpClient::newConnection, this, _1)); // FIXME setConnectFailedCallback LOG_INFO << "TcpClient::TcpClient[" << name_ << "] - connector " << get_pointer(connector_); } TcpClient::~TcpClient() { LOG_INFO << "TcpClient::~TcpClient[" << name_ << "] - connector " << get_pointer(connector_); TcpConnectionPtr conn; { MutexLockGuard lock(mutex_); conn = connection_; } if (conn) { // FIXME: not 100% safe, if we are in different thread // 重新设置TcpConnection中的closeCallback_为detail::removeConnection //这里不用TcpClient::removeConnection的原因是 TcpClient::removeConnection 有重连功能, //这里已经不需要重连了,直接调用detail::removeConnection就行了 CloseCallback cb = boost::bind(&detail::removeConnection, loop_, _1); loop_->runInLoop( boost::bind(&TcpConnection::setCloseCallback, conn, cb)); } else { // 这种情况,说明connector处于未连接状态,将connector_停止 connector_->stop(); // FIXME: HACK loop_->runAfter(1, boost::bind(&detail::removeConnector, connector_)); } } void TcpClient::connect() { // FIXME: check state LOG_INFO << "TcpClient::connect[" << name_ << "] - connecting to " << connector_->serverAddress().toIpPort(); connect_ = true; connector_->start(); // 发起连接 } // 用于连接已建立的情况下,关闭连接 void TcpClient::disconnect() { connect_ = false; { MutexLockGuard lock(mutex_); if (connection_) { connection_->shutdown(); } } } // 停止connector_ ,连接尚未成功时进行断开 void TcpClient::stop() { connect_ = false; connector_->stop(); } //有新的连接到来 void TcpClient::newConnection(int sockfd) { loop_->assertInLoopThread(); InetAddress peerAddr(sockets::getPeerAddr(sockfd)); char buf[32]; snprintf(buf, sizeof buf, ":%s#%d", peerAddr.toIpPort().c_str(), nextConnId_); ++nextConnId_; string connName = name_ + buf; InetAddress localAddr(sockets::getLocalAddr(sockfd)); // FIXME poll with zero timeout to double confirm the new connection // FIXME use make_shared if necessary TcpConnectionPtr conn(new TcpConnection(loop_, connName, sockfd, localAddr, peerAddr)); conn->setConnectionCallback(connectionCallback_); conn->setMessageCallback(messageCallback_); conn->setWriteCompleteCallback(writeCompleteCallback_); conn->setCloseCallback( boost::bind(&TcpClient::removeConnection, this, _1)); // FIXME: unsafe { MutexLockGuard lock(mutex_); connection_ = conn; // 保存TcpConnection } conn->connectEstablished(); // 这里回调connectionCallback_ } /*移除connector*/ void TcpClient::removeConnection(const TcpConnectionPtr& conn) { loop_->assertInLoopThread(); assert(loop_ == conn->getLoop()); { MutexLockGuard lock(mutex_); assert(connection_ == conn); connection_.reset(); } /*放到IO线程中销毁*/ loop_->queueInLoop(boost::bind(&TcpConnection::connectDestroyed, conn)); //如果需要重连,则重新连接 if (retry_ && connect_) { LOG_INFO << "TcpClient::connect[" << name_ << "] - Reconnecting to " << connector_->serverAddress().toIpPort(); // 这里的重连是指连接建立成功之后被断开的重连 connector_->restart(); } }
Connector 头文件
Connector.h
// Copyright 2010, Shuo Chen. All rights reserved. // http://code.google.com/p/muduo/ // // Use of this source code is governed by a BSD-style license // that can be found in the License file. // Author: Shuo Chen (chenshuo at chenshuo dot com) // // This is an internal header file, you should not include this. #ifndef MUDUO_NET_CONNECTOR_H #define MUDUO_NET_CONNECTOR_H #include <muduo/net/InetAddress.h> #include <boost/enable_shared_from_this.hpp> #include <boost/function.hpp> #include <boost/noncopyable.hpp> #include <boost/scoped_ptr.hpp> namespace muduo { namespace net { class Channel; class EventLoop; // 主动发起连接,带有自动重连功能 class Connector : boost::noncopyable, public boost::enable_shared_from_this<Connector> { public: typedef boost::function<void (int sockfd)> NewConnectionCallback; Connector(EventLoop* loop, const InetAddress& serverAddr); ~Connector(); /*设置连接成功后的回调函数*/ void setNewConnectionCallback(const NewConnectionCallback& cb) { newConnectionCallback_ = cb; } void start(); // can be called in any thread void restart(); // must be called in loop thread void stop(); // can be called in any thread //服务器IP地址 const InetAddress& serverAddress() const { return serverAddr_; } private: enum States { kDisconnected/*断开状态*/, kConnecting/*正在连接中*/, kConnected/*已连接*/ }; static const int kMaxRetryDelayMs = 30*1000; // 30秒,最大重连延迟时间 static const int kInitRetryDelayMs = 500; // 0.5秒,初始状态,连接不上,0.5秒后重连 void setState(States s) { state_ = s; } void startInLoop(); void stopInLoop(); void connect(); void connecting(int sockfd); void handleWrite(); void handleError(); void retry(int sockfd); int removeAndResetChannel(); void resetChannel(); EventLoop* loop_; // 所属EventLoop InetAddress serverAddr_; // 服务器端地址 bool connect_; // atomic是否连接 States state_; // FIXME: use atomic variable boost::scoped_ptr<Channel> channel_; // Connector所对应的Channel NewConnectionCallback newConnectionCallback_; // 连接成功回调函数, int retryDelayMs_; // 重连延迟时间(单位:毫秒) }; } } #endif // MUDUO_NET_CONNECTOR_H
Connector 源文件
Connector.cc
// Copyright 2010, Shuo Chen. All rights reserved. // http://code.google.com/p/muduo/ // // Use of this source code is governed by a BSD-style license // that can be found in the License file. // Author: Shuo Chen (chenshuo at chenshuo dot com) // #include <muduo/net/Connector.h> #include <muduo/base/Logging.h> #include <muduo/net/Channel.h> #include <muduo/net/EventLoop.h> #include <muduo/net/SocketsOps.h> #include <boost/bind.hpp> #include <errno.h> using namespace muduo; using namespace muduo::net; const int Connector::kMaxRetryDelayMs; Connector::Connector(EventLoop* loop, const InetAddress& serverAddr) : loop_(loop), serverAddr_(serverAddr), connect_(false), state_(kDisconnected), retryDelayMs_(kInitRetryDelayMs) { LOG_DEBUG << "ctor[" << this << "]"; } Connector::~Connector() { LOG_DEBUG << "dtor[" << this << "]"; assert(!channel_); } // 可以跨线程调用 ,把重新发起的连接函数放到IO线程去,所以是线程 // 安全的 void Connector::start() { connect_ = true; loop_->runInLoop(boost::bind(&Connector::startInLoop, this)); // FIXME: unsafe } /*重新发起连接*/ void Connector::startInLoop() { loop_->assertInLoopThread(); assert(state_ == kDisconnected); if (connect_) { /*如果为true 重新连接, 如果调用stop 后,那么connect_ = false */ connect(); } else { LOG_DEBUG << "do not connect"; } } /*让IO线程线程停止连接,这个连接处于*/ void Connector::stop() { connect_ = false; loop_->runInLoop(boost::bind(&Connector::stopInLoop, this)); // FIXME: unsafe // FIXME: cancel timer } void Connector::stopInLoop() { loop_->assertInLoopThread(); if (state_ == kConnecting) { setState(kDisconnected); int sockfd = removeAndResetChannel(); // 将通道从poller中移除关注,并将channel置空 retry(sockfd); // 这里并非要重连,只是调用sockets::close(sockfd); } } /*发起连接 */ void Connector::connect() { int sockfd = sockets::createNonblockingOrDie(); // 创建非阻塞套接字 /*进行连接请求*/ int ret = sockets::connect(sockfd, serverAddr_.getSockAddrInet()); int savedErrno = (ret == 0) ? 0 : errno; switch (savedErrno) { case 0: case EINPROGRESS: // 非阻塞套接字,未连接成功返回码是EINPROGRESS表示正在连接 case EINTR: case EISCONN: // 连接成功 connecting(sockfd); break; case EAGAIN: case EADDRINUSE: case EADDRNOTAVAIL: case ECONNREFUSED: case ENETUNREACH: retry(sockfd); // 重连 break; case EACCES: case EPERM: case EAFNOSUPPORT: case EALREADY: case EBADF: case EFAULT: case ENOTSOCK: LOG_SYSERR << "connect error in Connector::startInLoop " << savedErrno; sockets::close(sockfd); // 不能重连,关闭sockfd break; default: LOG_SYSERR << "Unexpected error in Connector::startInLoop " << savedErrno; sockets::close(sockfd); // connectErrorCallback_(); break; } } // 不能跨线程调用 void Connector::restart() { loop_->assertInLoopThread(); setState(kDisconnected); retryDelayMs_ = kInitRetryDelayMs; connect_ = true; startInLoop(); } void Connector::connecting(int sockfd) { //不管是真正连接还是连接成功了,都把状态设置为kConnecting setState(kConnecting); assert(!channel_); // Channel与sockfd关联 channel_.reset(new Channel(loop_, sockfd)); // 设置可写回调函数,主要是为了“正在连接”状态设置的 channel_->setWriteCallback( boost::bind(&Connector::handleWrite, this)); // FIXME: unsafe // 设置错误回调函数 channel_->setErrorCallback( boost::bind(&Connector::handleError, this)); // FIXME: unsafe // channel_->tie(shared_from_this()); is not working, // as channel_ is not managed by shared_ptr channel_->enableWriting(); // 让Poller关注可写事件,writing一定产生,不管是连接成功还是连接失败, // } int Connector::removeAndResetChannel() { channel_->disableAll(); channel_->remove(); // 从poller移除关注 int sockfd = channel_->fd(); // Can't reset channel_ here, because we are inside Channel::handleEvent // 不能在这里重置channel_,因为正在调用Channel::handleEvent loop_->queueInLoop(boost::bind(&Connector::resetChannel, this)); // FIXME: unsafe return sockfd; } void Connector::resetChannel() { channel_.reset(); // channel_ 置空 } void Connector::handleWrite() { LOG_TRACE << "Connector::handleWrite " << state_; /*由于在Connector::connecting注册可写事件时,“状态” 被设置为kConnecting(不管完不完成连接), 所以第一次handleWrite时,得把“状态”设置为kConnected*/ if (state_ == kConnecting) { /*这里不用再关注可写事件了,这里也是防止busy loop*/ int sockfd = removeAndResetChannel(); // 从poller中移除关注,并将channel置空 // socket可写并不意味着连接一定建立成功 // 还需要用getsockopt(sockfd, SOL_SOCKET, SO_ERROR, ...)再次确认一下。 int err = sockets::getSocketError(sockfd); if (err) // 有错误 { LOG_WARN << "Connector::handleWrite - SO_ERROR = " << err << " " << strerror_tl(err); retry(sockfd); // 重连 } else if (sockets::isSelfConnect(sockfd)) // 自连接 { LOG_WARN << "Connector::handleWrite - Self connect"; retry(sockfd); // 重连 } else // 连接成功 { setState(kConnected); if (connect_) { newConnectionCallback_(sockfd); // 回调连接成功的回调函数 } else { sockets::close(sockfd); } } } else { // what happened? assert(state_ == kDisconnected); } } /*连接出错的处理函数*/ void Connector::handleError() { LOG_ERROR << "Connector::handleError"; assert(state_ == kConnecting); int sockfd = removeAndResetChannel(); // 从poller中移除关注,并将channel置空 int err = sockets::getSocketError(sockfd); LOG_TRACE << "SO_ERROR = " << err << " " << strerror_tl(err); retry(sockfd); } // 采用back-off策略重连,即重连时间逐渐延长,0.5s, 1s, 2s, ...直至30s void Connector::retry(int sockfd) { sockets::close(sockfd); setState(kDisconnected); if (connect_) { LOG_INFO << "Connector::retry - Retry connecting to " << serverAddr_.toIpPort() << " in " << retryDelayMs_ << " milliseconds. "; // 注册一个定时操作,重连 loop_->runAfter(retryDelayMs_/1000.0, boost::bind(&Connector::startInLoop, shared_from_this())); retryDelayMs_ = std::min(retryDelayMs_ * 2, kMaxRetryDelayMs); } else { LOG_DEBUG << "do not connect"; } }
测试程序
#include <muduo/net/Channel.h> #include <muduo/net/TcpClient.h> #include <muduo/base/Logging.h> #include <muduo/net/EventLoop.h> #include <muduo/net/InetAddress.h> #include <boost/bind.hpp> #include <stdio.h> using namespace muduo; using namespace muduo::net; class TestClient { public: TestClient(EventLoop* loop, const InetAddress& listenAddr) : loop_(loop), client_(loop, listenAddr, "TestClient"), stdinChannel_(loop, 0) { client_.setConnectionCallback( boost::bind(&TestClient::onConnection, this, _1)); client_.setMessageCallback( boost::bind(&TestClient::onMessage, this, _1, _2, _3)); //client_.enableRetry(); // 标准输入缓冲区中有数据的时候,回调TestClient::handleRead stdinChannel_.setReadCallback(boost::bind(&TestClient::handleRead, this)); stdinChannel_.enableReading(); } void connect() { client_.connect(); } private: void onConnection(const TcpConnectionPtr& conn) { if (conn->connected()) { printf("onConnection(): new connection [%s] from %s\n", conn->name().c_str(), conn->peerAddress().toIpPort().c_str()); } else { printf("onConnection(): connection [%s] is down\n", conn->name().c_str()); } } void onMessage(const TcpConnectionPtr& conn, Buffer* buf, Timestamp time) { string msg(buf->retrieveAllAsString()); printf("onMessage(): recv a message [%s]\n", msg.c_str()); LOG_TRACE << conn->name() << " recv " << msg.size() << " bytes at " << time.toFormattedString(); } // 标准输入缓冲区中有数据的时候,回调该函数 void handleRead() { char buf[1024] = {0}; fgets(buf, 1024, stdin); buf[strlen(buf)-1] = '\0'; // 去除\n client_.connection()->send(buf); } EventLoop* loop_; TcpClient client_; Channel stdinChannel_; // 标准输入Channel }; int main(int argc, char* argv[]) { LOG_INFO << "pid = " << getpid() << ", tid = " << CurrentThread::tid(); EventLoop loop; InetAddress serverAddr("127.0.0.1", 8888); TestClient client(&loop, serverAddr); client.connect(); loop.loop(); }