现在的位置: 首页 > 综合 > 正文

asio通信的两个小例子

2018年04月02日 ⁄ 综合 ⁄ 共 16381字 ⁄ 字号 评论关闭

ASIO的简单的通信的例子:

服务器端测试可用代码:

 

#include <ctime>

#include <iostream>

#include <string>

#include <boost/bind.hpp>

#include <boost/shared_ptr.hpp>

#include <boost/enable_shared_from_this.hpp>

#include <boost/asio.hpp>

 

using boost::asio::ip::tcp;

 

std::string make_daytime_string()

{

  using
namespace
std; // For time_t, time and ctime;

  time_t now = time(0);

  return
ctime(&now);

}

 

class tcp_connection

 
: public
boost::enable_shared_from_this<tcp_connection>

{

public:

  typedef
boost::shared_ptr<tcp_connection> pointer;

 

  static
pointer create(boost::asio::io_service& io_service)

 
{

   
return
pointer(new tcp_connection(io_service));

 
}

 

  tcp::socket& socket()

 
{

   
return
socket_;

 
}

 

  void
start()

 
{

   
message_ = make_daytime_string();

 

   
boost::asio::async_write(socket_, boost::asio::buffer(message_),

       
boost::bind(&tcp_connection::handle_write, shared_from_this(),

         
boost::asio::placeholders::error,

         
boost::asio::placeholders::bytes_transferred));

 
}

 

private:

  tcp_connection(boost::asio::io_service& io_service)

   
:
socket_(io_service)

 
{

 
}

 

  void
handle_write(const boost::system::error_code& /*error*/,

     
size_t /*bytes_transferred*/)

 
{

 
}

 

  tcp::socket socket_;

  std::string message_;

};

 

class tcp_server

{

public:

  tcp_server(boost::asio::io_service& io_service)

   
:
acceptor_(io_service, tcp::endpoint(tcp::v4(), 13))

 
{

   
start_accept();

 
}

 

private:

  void
start_accept()

 
{

   
tcp_connection::pointer new_connection =

     
tcp_connection::create(acceptor_.io_service());

 

   
acceptor_.async_accept(new_connection->socket(),

       
boost::bind(&tcp_server::handle_accept, this, new_connection,

         
boost::asio::placeholders::error));

 
}

 

  void
handle_accept(tcp_connection::pointer new_connection,

     
const
boost::system::error_code& error)

 
{

   
if (!
error)

   
{

     
new_connection->start();

     
start_accept();

   
}

 
}

 

  tcp::acceptor acceptor_;

};

 

int main()

{

  try

 
{

   
boost::asio::io_service io_service;

   
tcp_server server(io_service);

   
io_service.run();

 
}

  catch
(
std::exception& e)

 
{

   
std::cerr << e.what() << std::endl;

 
}

 

  return
0;

}

 

 

客户端测试可用代码:

#include <iostream>

#include <boost/array.hpp>

#include <boost/asio.hpp>

 

using boost::asio::ip::tcp;

 

int main(int argc, char* argv[])

{

    try

    {

 

        boost::asio::io_service io_service;

 

        tcp::resolver::query query("127.0.0.1", "13");

        tcp::resolver resolver(io_service);

/*

        tcp::resolver::iterator
endpoint_iterator = resolver.resolve(query);

        tcp::resolver::iterator
end;

*/

        tcp::socket socket(io_service);

        boost::system::error_code error = boost::asio::error::host_not_found;

/*

        while
(error && endpoint_iterator != end)

        {

            socket.close();

            socket.connect(*endpoint_iterator++,
error);

        }

*/

        tcp::resolver::iterator iter = resolver.resolve(query);

        socket.connect( *iter,error);

        if
(
error)

            throw
boost::system::system_error(error);

 

        for
(;;)

        {

            boost::array<char,
128>
buf;

            boost::system::error_code error;

 

            size_t len = socket.read_some(boost::asio::buffer(buf), error);

 

            if
(
error == boost::asio::error::eof)

                break;
// Connection closed cleanly by peer.

            else
if (
error)

                throw
boost::system::system_error(error); // Some other error.

 

            std::cout.write(buf.data(), len);

        }

    }

    catch
(
std::exception& e)

    {

        std::cerr << e.what() << std::endl;

    }

 

    return
0;

}

 

 

Asio
的聊天室的小例子:

 

chat_message.hpp

 

#ifndef CHAT_MESSAGE_HPP

#define CHAT_MESSAGE_HPP

 

#include <cstdio>

#include <cstdlib>

#include <cstring>

 

class chat_message

{

public:

    enum
{
header_length = 4 };

    enum
{
max_body_length = 512 };

 

    chat_message()

        :
body_length_(0)

    {

    }

 

    const
char*
data() const

    {

        return
data_;

    }

 

    char*
data()

    {

        return
data_;

    }

 

    size_t length() const

    {

        return
header_length + body_length_;

    }

 

    const
char*
body() const

    {

        return
data_ + header_length;

    }

 

    char*
body()

    {

        return
data_ + header_length;

    }

 

    size_t body_length() const

    {

        return
body_length_;

    }

 

    void
body_length(size_t length)

    {

        body_length_ = length;

        if
(
body_length_ > max_body_length)

            body_length_ = max_body_length;

    }

 

    bool
decode_header()

    {

        using
namespace
std; // For strncat and atoi.

        char
header[header_length + 1] = "";

        strncat(header, data_, header_length);

        body_length_ = atoi(header);

        if
(
body_length_ > max_body_length)

        {

            body_length_ = 0;

            return
false;

        }

        return
true;

    }

 

    void
encode_header()

    {

        using
namespace
std; // For sprintf and memcpy.

        char
header[header_length + 1] = "";

        sprintf(header, "%4d", body_length_);

        memcpy(data_, header, header_length);

    }

 

private:

    char
data_[header_length + max_body_length];

    size_t body_length_;

};

 

#endif // CHAT_MESSAGE_HPP

 

 

Server.cpp

 

#include <algorithm>

#include <cstdlib>

#include <deque>

#include <iostream>

#include <list>

#include <set>

#include <boost/bind.hpp>

#include <boost/shared_ptr.hpp>

#include <boost/enable_shared_from_this.hpp>

#include <boost/asio.hpp>

#include "chat_message.hpp"

 

//整个程序是一个tcp程序

using boost::asio::ip::tcp;

 

//----------------------------------------------------------------------

 

typedef std::deque<chat_message> chat_message_queue;

 

//----------------------------------------------------------------------

 

// 一个抽象类,用于提供聊天室成员的接口

class chat_participant

{

public:

  virtual
~
chat_participant() {}

  virtual
void
deliver(const chat_message& msg) = 0;

};

 

typedef boost::shared_ptr<chat_participant>
chat_participant_ptr;

 

//----------------------------------------------------------------------

 

//聊天室很像QQ里面的组,或者说QQ里面的组就是一种聊天室,

//我们相信聊天室会有许多种,那种两个人私聊的地方也是一种聊天室

//这里面没有实现私聊

class chat_room

{

public:

  void
join(chat_participant_ptr participant)

 
{

   
participants_.insert(participant);

   
//把聊天室内缓存的消息发送给新加入的成员

   
std::for_each(recent_msgs_.begin(), recent_msgs_.end(),

       
boost::bind(&chat_participant::deliver, participant, _1));

 
}

 

  void
leave(chat_participant_ptr participant)

 
{

   
participants_.erase(participant);

 
}

 

  //像聊天室里发送一个消息

  //想象一下,如果聊天室就两个人,那么这就是私聊

  void
deliver(const chat_message& msg)

 
{

   
recent_msgs_.push_back(msg);

   
while (
recent_msgs_.size() > max_recent_msgs)

     
recent_msgs_.pop_front();

 

   
std::for_each(participants_.begin(), participants_.end(),

       
boost::bind(&chat_participant::deliver, _1, boost::ref(msg)));

 
}

 

private:

  std::set<chat_participant_ptr> participants_;

  enum
{
max_recent_msgs = 100 };

  chat_message_queue recent_msgs_;

};

 

//----------------------------------------------------------------------

 

//程序不认识人,它只认识一个一个连接,所以一个连接就代表着一个人

//在聊天室环境下,一个session就是一个成员的加入

class chat_session

 
: public
chat_participant,

   
public
boost::enable_shared_from_this<chat_session>

{

public:

  chat_session(boost::asio::io_service& io_service, chat_room& room)

   
:
socket_(io_service),

     
room_(room)

 
{

 
}

 

  tcp::socket& socket()

 
{

   
return
socket_;

 
}

 

  void
start()

 
{

   
room_.join(shared_from_this());

   
//async_read是事件处理一个机制,使用回调函数从而实现事件处理器方法

   
//本示例大量采用这个机制,也就是异步机制

   
//通过回调函数可以形成一个事件链,即在回调函数中设置一个新的事件与新回调函数

   
boost::asio::async_read(socket_,

       
boost::asio::buffer(read_msg_.data(), chat_message::header_length),

       
boost::bind(

         
&
chat_session::handle_read_header, shared_from_this(),

         
boost::asio::placeholders::error));

 
}

 

  void
deliver(const chat_message& msg)

 
{

   
bool
write_in_progress = !write_msgs_.empty();

   
write_msgs_.push_back(msg);

   
if (!
write_in_progress)

   
{

     
boost::asio::async_write(socket_,

         
boost::asio::buffer(write_msgs_.front().data(),

            write_msgs_.front().length()),

         
boost::bind(&chat_session::handle_write, shared_from_this(),

            boost::asio::placeholders::error));

   
}

 
}

 

  void
handle_read_header(const boost::system::error_code& error)

 
{

   
if (!
error
&&
read_msg_.decode_header())

   
{

     
boost::asio::async_read(socket_,

         
boost::asio::buffer(read_msg_.body(), read_msg_.body_length()),

         
boost::bind(&chat_session::handle_read_body, shared_from_this(),

            boost::asio::placeholders::error));

   
}

   
else

   
{

     
room_.leave(shared_from_this());

   
}

 
}

 

  void
handle_read_body(const boost::system::error_code& error)

 
{

   
if (!
error)

   
{

     
room_.deliver(read_msg_);

     
boost::asio::async_read(socket_,

         
boost::asio::buffer(read_msg_.data(), chat_message::header_length),

         
boost::bind(&chat_session::handle_read_header, shared_from_this(),

            boost::asio::placeholders::error));

   
}

   
else

   
{

     
room_.leave(shared_from_this());

   
}

 
}

 

  void
handle_write(const boost::system::error_code& error)

 
{

   
if (!
error)

   
{

     
write_msgs_.pop_front();

     
if (!
write_msgs_.empty())

     
{

       
boost::asio::async_write(socket_,

            boost::asio::buffer(write_msgs_.front().data(),

              write_msgs_.front().length()),

            boost::bind(&chat_session::handle_write, shared_from_this(),

              boost::asio::placeholders::error));

     
}

   
}

   
else

   
{

     
room_.leave(shared_from_this());

   
}

 
}

 

private:

  tcp::socket socket_;

  chat_room& room_;

  chat_message read_msg_;

  //为什么写数据时需要一个队列,而读数据时不需要?

  //因为可能存在需要人同时向一个人发送消息,而一个无论它如何发送消息

  //也可能一条条发送,所有没有并行的可能

  chat_message_queue write_msgs_;

};

 

typedef boost::shared_ptr<chat_session> chat_session_ptr;

 

//----------------------------------------------------------------------

 

//它只做一件事,就是等待客户端连接;当客户端连接时,新建一个session

//然后把实际操作事务交给这个session,接着继续等待客户端连接。

class chat_server

{

public:

  chat_server(boost::asio::io_service& io_service,

     
const
tcp::endpoint& endpoint)

   
:
io_service_(io_service),

     
acceptor_(io_service, endpoint)

 
{

   
//创建一个新的session,让它接受新的连接

   
chat_session_ptr new_session(new chat_session(io_service_, room_));

   
//接受新的连接,在socket上。

   
acceptor_.async_accept(new_session->socket(),

       
boost::bind(&chat_server::handle_accept, this, new_session,

         
boost::asio::placeholders::error));

 
}

 

  void
handle_accept(chat_session_ptr session,

     
const
boost::system::error_code& error)

 
{

   
if (!
error)

   
{

     
session->start();

     
std::cerr <<"lgb" <<std::endl;

     
chat_session_ptr new_session(new chat_session(io_service_, room_));

     
acceptor_.async_accept(new_session->socket(),

         
boost::bind(&chat_server::handle_accept, this, new_session,

            boost::asio::placeholders::error));

   
}

 
}

 

private:

  boost::asio::io_service& io_service_;

  tcp::acceptor acceptor_;

  chat_room room_;

};

 

typedef boost::shared_ptr<chat_server> chat_server_ptr;

typedef std::list<chat_server_ptr>
chat_server_list;

 

//----------------------------------------------------------------------

 

int main(int argc, char* argv[])

{

  try

 
{

   
if (
argc <
2)

   
{

     
std::cerr << "Usage: chat_server <port> [<port>
...]/n"
;

     
return 1;

   
}

 

   
boost::asio::io_service io_service;

 

   
chat_server_list servers;

   
//为每一个端口开启一个服务器对象

   
//让这个对象与这个端口对应

   
for (int
i = 1;
i < argc; ++i)

   
{

     
using namespace
std; // For atoi.

     
tcp::endpoint endpoint(tcp::v4(), atoi(argv[i]));

     
chat_server_ptr server(new chat_server(io_service, endpoint));

     
servers.push_back(server);

   
}

 

   
io_service.run();

 
}

  catch
(
std::exception& e)

 
{

   
std::cerr << "Exception: "
<<
e.what() << "/n";

 
}

 

  return
0;

}

 

 

客户端代码:

#include
<cstdlib>

#include <cstring>
#include <iostream>
#include <boost/array.hpp>
#include <boost/bind.hpp>
#include <boost/asio.hpp>
#include "chat_message.hpp"

//从这里可以看出来,对posix的支持是可选地编译在asio中的
//asio其时是一个纯头文件的库,它没有编译的概念,但是要实现asio的一些功能需要额外的条件
//所以这个测试有时必要的,这是做可移植一种努力
#if
defined(BOOST_ASIO_HAS_POSIX_STREAM_DESCRIPTOR)

using boost::asio::ip::tcp;
namespace posix = boost::asio::posix;

class posix_chat_client
{
public:
  posix_chat_client
(boost::asio::io_service& io_service,
      tcp
::resolver::iterator endpoint_iterator)
   
 : socket_(io_service),
      input_
(io_service, ::dup(STDIN_FILENO)),
      output_
(io_service, ::dup(STDOUT_FILENO)),
      input_buffer_
(chat_message::max_body_length)
 
 {
   
 //
Try connecting to the first endpoint.

    tcp
::endpoint endpoint = *endpoint_iterator;
    socket_.
async_connect(endpoint,
        boost
::bind(&posix_chat_client::handle_connect, this,
          boost
::asio::placeholders::error, ++endpoint_iterator));
 
 }

private:

  void handle_connect(const boost::system::error_code& error,
      tcp
::resolver::iterator endpoint_iterator)
 
 {
   
 if (!error)
   
 {
     
 //
Read the fixed-length header of the next message from the server.

     
 //async_xxx一族函数实际上都上范型的,它们会从各种设计好的可读或可写对象中
     
 //执行相应的操作,而不管这些操作实际上是什么
      boost
::asio::async_read(socket_,
          boost
::asio::buffer(read_msg_.data(), chat_message::header_length),
          boost
::bind(&posix_chat_client::handle_read_header, this,
            boost
::asio::placeholders::error));

      //
Read a line of input entered by the user.

      boost
::asio::async_read_until(input_,
input_buffer_,
 '/n',
          boost
::bind(&posix_chat_client::handle_read_input, this,
            boost
::asio::placeholders::error,
            boost
::asio::placeholders::bytes_transferred));
   
 }
   
 else if (endpoint_iterator != tcp::resolver::iterator())
   
 {
     
 //
That endpoint didn't work, try the next one.

      socket_.
close();
      tcp
::endpoint endpoint = *endpoint_iterator;
      socket_.
async_connect(endpoint,
          boost
::bind(&posix_chat_client::handle_connect, this,
            boost
::asio::placeholders::error, ++endpoint_iterator));
   
 }
 
 }

  void handle_read_header(const boost::system::error_code& error)
 
 {
   
 if (!error && read_msg_.decode_header())
   
 {
     
 //
Read the variable-length body of the message from the server.

      boost
::asio::async_read(socket_,
          boost
::asio::buffer(read_msg_.body(), read_msg_.body_length()),
          boost
::bind(&posix_chat_client::handle_read_body, this,
            boost
::asio::placeholders::error));
   
 }
   
 else
   
 {
      close
();
   
 }
 
 }

  void handle_read_body(const boost::system::error_code& error)
 
 {
   
 if (!error)
   
 {
     
 //
Write out the message we just received, terminated by a newline.

     
 static char eol[] = { '/n' };
      boost
::array<boost::asio::const_buffer, 2> buffers = {{
        boost
::asio::buffer(read_msg_.body(), read_msg_.body_length()),
        boost
::asio::buffer(eol) }};
      boost
::asio::async_write(output_,
buffers,

          boost
::bind(&posix_chat_client::handle_write_output, this,
            boost
::asio::placeholders::error));
   
 }
   
 else
   
 {
      close
();
   
 }
 
 }

  void handle_write_output(const boost::system::error_code& error)
 
 {
   
 if (!error)
   
 {
     
 //
Read the fixed-length header of the next message from the server.

      boost
::asio::async_read(socket_,
          boost
::asio::buffer(read_msg_.data(), chat_message::header_length),
          boost
::bind(&posix_chat_client::handle_read_header, this,
            boost
::asio::placeholders::error));
   
 }
   
 else
   
 {
      close
();
   
 }
 
 }

  void handle_read_input(const boost::system::error_code& error,
      std
::size_t length)
 
 {
   
 if (!error)
   
 {
     
 //
Write the message (minus the newline) to the server.

      write_msg_.
body_length(length - 1);
      input_buffer_.
sgetn(write_msg_.body(), length - 1);
      input_buffer_.
consume(1); // Remove newline from input.
      write_msg_.
encode_header();
      boost
::asio::async_write(socket_,
          boost
::asio::buffer(write_msg_.data(), write_msg_.length()),
          boost
::bind(&posix_chat_client::handle_write, this,
            boost
::asio::placeholders::error));
   
 }
   
 else if (error == boost::asio::error::not_found)
   
 {
     
 //
Didn't get a newline. Send whatever we have.

      write_msg_.
body_length(input_buffer_.size());
      input_buffer_.
sgetn(write_msg_.body(), input_buffer_.size());
      write_msg_.
encode_header();
      boost
::asio::async_write(socket_,
          boost
::asio::buffer(write_msg_.data(), write_msg_.length()),
          boost
::bind(&posix_chat_client::handle_write, this,
            boost
::asio::placeholders::error));
   
 }
   
 else
   
 {
      close
();
   
 }
 
 }

  void handle_write(const boost::system::error_code& error)
 
 {
   
 if (!error)
   
 {
     
 //
Read a line of input entered by the user.

      boost
::asio::async_read_until(input_,
input_buffer_,
 '/n',
          boost
::bind(&posix_chat_client::handle_read_input, this,
            boost
::asio::placeholders::error,
            boost
::asio::placeholders::bytes_transferred));
   
 }
   
 else
   
 {
      close
();
   
 }
 
 }

  void close()
 
 {
   
 //
Cancel all outstanding asynchronous operations.

    socket_.
close();
    input_.
close();
    output_.
close();
 
 }

private:
  tcp
::socket socket_;
 
 //这是对POSIX中的文件描述符的对等物
 
 //asio认为POSIX中的用一个short整数来作为文件描述符是单调而不安全的
  posix
::stream_descriptor input_;
  posix
::stream_descriptor output_;
  chat_message read_msg_
;
  chat_message write_msg_
;
 
 //这是一个来自std::streambuf的类型,它是对streambuf的一个定制
 
 //STL的可扩展功能允许我们去这么做
  boost
::asio::streambuf input_buffer_;
};

int main(int argc, char* argv[])
{
 
 try
 
 {
   
 if (argc != 3)
   
 {
      std
::cerr << "Usage:
posix_chat_client <host> <port>
/n";
     
 return 1;
   
 }

    boost::asio::io_service io_service;

    tcp::resolver resolver(io_service);
    tcp
::resolver::query query(argv[1],
argv
[2]);
    tcp
::resolver::iterator iterator = resolver.resolve(query);

    posix_chat_client c(io_service,
iterator
);

    io_service.run();
 
 }
 
 catch (std::exception& e)
 
 {
    std
::cerr << "Exception:
"
 << e.what() << "/n";
 
 }

  return 0;
}

#else //
defined(BOOST_ASIO_HAS_POSIX_STREAM_DESCRIPTOR)

int main() {}
#endif //
defined(BOOST_ASIO_HAS_POSIX_STREAM_DESCRIPTOR)

【上篇】
【下篇】

抱歉!评论已关闭.