下面介绍Boost.Asio的异常处理和计时器(timer)
一 Exceptions
Boost.Asio提供两种异常处理方式:通过try/catch的方式获取异常或者通过错误码的方式。
下面是一个通过try/catch的方式的一个例子:
#include <boost/asio.hpp> #include <boost/shared_ptr.hpp> #include <boost/thread.hpp> #include <boost/thread/mutex.hpp> #include <boost/bind.hpp> #include <iostream> boost::mutex global_stream_lock; void WorkerThread( boost::shared_ptr< boost::asio::io_service > io_service ) { global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] Thread Start" << std::endl; global_stream_lock.unlock(); try { io_service->run(); } catch( std::exception & ex ) { global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] Exception: " << ex.what() << std::endl; global_stream_lock.unlock(); } global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] Thread Finish" << std::endl; global_stream_lock.unlock(); } void RaiseAnException( boost::shared_ptr< boost::asio::io_service > io_service ) { global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] " << __FUNCTION__ << std::endl; global_stream_lock.unlock(); io_service->post( boost::bind( &RaiseAnException, io_service ) ); throw( std::runtime_error( "Oops!" ) ); } int main( int argc, char * argv[] ) { boost::shared_ptr< boost::asio::io_service > io_service( new boost::asio::io_service ); boost::shared_ptr< boost::asio::io_service::work > work( new boost::asio::io_service::work( *io_service ) ); global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] The program will exit when all work has finished." << std::endl; global_stream_lock.unlock(); boost::thread_group worker_threads; for( int x = 0; x < 2; ++x ) { worker_threads.create_thread( boost::bind( &WorkerThread, io_service ) ); } io_service->post( boost::bind( &RaiseAnException, io_service ) ); worker_threads.join_all(); return 0; }
看上去程序好像要不停的输出oops的异常信息,其实不是,只输出了两次。这是因为抛出的异常被当前线程catch,处理,从而中断了线程继续运行。将上面的程序进行如下的修改可以更直接的说明这点:
#include <boost/asio.hpp> #include <boost/shared_ptr.hpp> #include <boost/thread.hpp> #include <boost/thread/mutex.hpp> #include <boost/bind.hpp> #include <iostream> boost::mutex global_stream_lock; void WorkerThread( boost::shared_ptr< boost::asio::io_service > io_service ) { global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] Thread Start" << std::endl; global_stream_lock.unlock(); try { io_service->run(); } catch( std::exception & ex ) { global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] Exception: " << ex.what() << std::endl; global_stream_lock.unlock(); } global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] Thread Finish" << std::endl; global_stream_lock.unlock(); } void RaiseAnException( boost::shared_ptr< boost::asio::io_service > io_service ) { global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] " << __FUNCTION__ << std::endl; global_stream_lock.unlock(); io_service->post( boost::bind( &RaiseAnException, io_service ) ); io_service->post( boost::bind( &RaiseAnException, io_service ) );//多post一次 throw( std::runtime_error( "Oops!" ) ); } int main( int argc, char * argv[] ) { boost::shared_ptr< boost::asio::io_service > io_service( new boost::asio::io_service ); boost::shared_ptr< boost::asio::io_service::work > work( new boost::asio::io_service::work( *io_service ) ); global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] The program will exit when all work has finished." << std::endl; global_stream_lock.unlock(); boost::thread_group worker_threads; for( int x = 0; x < 3; ++x )//改为3 { worker_threads.create_thread( boost::bind( &WorkerThread, io_service ) ); } io_service->post( boost::bind( &RaiseAnException, io_service ) ); worker_threads.join_all(); return 0; }
多post一次而不将线程数改为3,多post的任务将得不到处理,这是因为exception的产生中断了线程的执行。
下面再来看一个关于错误码(error variable)的例子:
#include <boost/asio.hpp> #include <boost/shared_ptr.hpp> #include <boost/thread.hpp> #include <boost/thread/mutex.hpp> #include <boost/bind.hpp> #include <iostream> boost::mutex global_stream_lock; void WorkerThread( boost::shared_ptr< boost::asio::io_service > io_service ) { global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] Thread Start" << std::endl; global_stream_lock.unlock(); boost::system::error_code ec; io_service->run( ec ); if( ec ) { global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] Exception: " << ec << std::endl; global_stream_lock.unlock(); } global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] Thread Finish" << std::endl; global_stream_lock.unlock(); } void RaiseAnException( boost::shared_ptr< boost::asio::io_service > io_service ) { global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] " << __FUNCTION__ << std::endl; global_stream_lock.unlock(); io_service->post( boost::bind( &RaiseAnException, io_service ) ); throw( std::runtime_error( "Oops!" ) ); } int main( int argc, char * argv[] ) { boost::shared_ptr< boost::asio::io_service > io_service( new boost::asio::io_service ); boost::shared_ptr< boost::asio::io_service::work > work( new boost::asio::io_service::work( *io_service ) ); global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] The program will exit when all work has finished." << std::endl; global_stream_lock.unlock(); boost::thread_group worker_threads; for( int x = 0; x < 2; ++x ) { worker_threads.create_thread( boost::bind( &WorkerThread, io_service ) ); } io_service->post( boost::bind( &RaiseAnException, io_service ) ); worker_threads.join_all(); return 0; }
运行上面的程序会产生core dump,这是因为产生的exception没有被catch。根本原因是因为错误码的方式不会将用户自定义的异常转换为错误码,而只针对Boost.Asio的库才会起作用。针对Boost.Asio库产生的异常,如果没有使用错误码,将会上抛异常;如果使用了,将会将异常转换为对应的错误码。
如果一个异常产生了,针对异常的类型:系统异常还是简单的文本错误,来进行相应的处理,如果是系统异常,我们需要在异常处理方法中调用stop来让io_service正常终止;如果是文本错误,则需要将io_service重新拉起来,防止线程退出。修改上面的例子,可以看得更清楚些,下面采用while的方式来将io_service重新拉起:
#include <boost/asio.hpp> #include <boost/shared_ptr.hpp> #include <boost/thread.hpp> #include <boost/thread/mutex.hpp> #include <boost/bind.hpp> #include <iostream> boost::mutex global_stream_lock; void WorkerThread( boost::shared_ptr< boost::asio::io_service > io_service ) { global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] Thread Start" << std::endl; global_stream_lock.unlock(); while( true ) { try { boost::system::error_code ec; io_service->run( ec ); if( ec ) { global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] Error: " << ec << std::endl; global_stream_lock.unlock(); } break; } catch( std::exception & ex ) { global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] Exception: " << ex.what() << std::endl; global_stream_lock.unlock(); } } global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] Thread Finish" << std::endl; global_stream_lock.unlock(); } void RaiseAnException( boost::shared_ptr< boost::asio::io_service > io_service ) { global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] " << __FUNCTION__ << std::endl; global_stream_lock.unlock(); io_service->post( boost::bind( &RaiseAnException, io_service ) ); throw( std::runtime_error( "Oops!" ) ); } int main( int argc, char * argv[] ) { boost::shared_ptr< boost::asio::io_service > io_service( new boost::asio::io_service ); boost::shared_ptr< boost::asio::io_service::work > work( new boost::asio::io_service::work( *io_service ) ); global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] The program will exit when all work has finished." << std::endl; global_stream_lock.unlock(); boost::thread_group worker_threads; for( int x = 0; x < 2; ++x ) { worker_threads.create_thread( boost::bind( &WorkerThread, io_service ) ); } io_service->post( boost::bind( &RaiseAnException, io_service ) ); worker_threads.join_all(); return 0; }
运行上面代码,终端上会连续输出,不停止。真正项目中这样的方式肯定不是我们希望看到的。
二 Timer
timer在第一篇中有过介绍,先看一个的例子:
#include <boost/asio.hpp> #include <boost/shared_ptr.hpp> #include <boost/thread.hpp> #include <boost/thread/mutex.hpp> #include <boost/bind.hpp> #include <iostream> boost::mutex global_stream_lock; void WorkerThread( boost::shared_ptr< boost::asio::io_service > io_service ) { global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] Thread Start" << std::endl; global_stream_lock.unlock(); while( true ) { try { boost::system::error_code ec; io_service->run( ec ); if( ec ) { global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] Error: " << ec << std::endl; global_stream_lock.unlock(); } break; } catch( std::exception & ex ) { global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] Exception: " << ex.what() << std::endl; global_stream_lock.unlock(); } } global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] Thread Finish" << std::endl; global_stream_lock.unlock(); } void TimerHandler( const boost::system::error_code & error ) { if( error ) { global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] Error: " << error << std::endl; global_stream_lock.unlock(); } else { global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] TimerHandler " << std::endl; global_stream_lock.unlock(); } } int main( int argc, char * argv[] ) { boost::shared_ptr< boost::asio::io_service > io_service( new boost::asio::io_service ); boost::shared_ptr< boost::asio::io_service::work > work( new boost::asio::io_service::work( *io_service ) ); global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] Press [return] to exit." << std::endl; global_stream_lock.unlock(); boost::thread_group worker_threads; for( int x = 0; x < 2; ++x ) { worker_threads.create_thread( boost::bind( &WorkerThread, io_service ) ); } boost::asio::deadline_timer timer( *io_service ); timer.expires_from_now( boost::posix_time::seconds( 5 ) ); timer.async_wait( TimerHandler ); std::cin.get(); io_service->stop(); worker_threads.join_all(); return 0; }
很简单,程序等待5秒后运行handler。如果上面的例子我们需要一个可以反复使用的timer,可以把timer做成全局的,但是全局的变量是线程不安全的。这个时候可以用智能指针来解决这一问题:
#include <boost/asio.hpp> #include <boost/shared_ptr.hpp> #include <boost/thread.hpp> #include <boost/thread/mutex.hpp> #include <boost/bind.hpp> #include <iostream> boost::mutex global_stream_lock; void WorkerThread( boost::shared_ptr< boost::asio::io_service > io_service ) { global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] Thread Start" << std::endl; global_stream_lock.unlock(); while( true ) { try { boost::system::error_code ec; io_service->run( ec ); if( ec ) { global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] Error: " << ec << std::endl; global_stream_lock.unlock(); } break; } catch( std::exception & ex ) { global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] Exception: " << ex.what() << std::endl; global_stream_lock.unlock(); } } global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] Thread Finish" << std::endl; global_stream_lock.unlock(); } void TimerHandler( const boost::system::error_code & error, boost::shared_ptr< boost::asio::deadline_timer > timer ) { if( error ) { global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] Error: " << error << std::endl; global_stream_lock.unlock(); } else { global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] TimerHandler " << std::endl; global_stream_lock.unlock(); timer->expires_from_now( boost::posix_time::seconds( 5 ) ); timer->async_wait( boost::bind( &TimerHandler, _1, timer ) ); } } int main( int argc, char * argv[] ) { boost::shared_ptr< boost::asio::io_service > io_service( new boost::asio::io_service ); boost::shared_ptr< boost::asio::io_service::work > work( new boost::asio::io_service::work( *io_service ) ); global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] Press [return] to exit." << std::endl; global_stream_lock.unlock(); boost::thread_group worker_threads; for( int x = 0; x < 2; ++x ) { worker_threads.create_thread( boost::bind( &WorkerThread, io_service ) ); } boost::shared_ptr< boost::asio::deadline_timer > timer( new boost::asio::deadline_timer( *io_service ) ); timer->expires_from_now( boost::posix_time::seconds( 5 ) ); timer->async_wait( boost::bind( &TimerHandler, _1, timer ) ); std::cin.get(); io_service->stop(); worker_threads.join_all(); return 0; }
从上面可以发现,通过智能指针结合bind可以将timer反复使用。上面_1是一个占位符,因为TimerHandler需要两个参数,第一个是关于错误码的,第二个是timer类指针。_1表示第一个参数暂时不提供。
对于上面的例子,如果我们在一个线程中启用timer,在另外一个线程中执行事件,如果timer的handler方法和事件的handler方法需要访问同一个object,那么当这两个事件同时发生时,就会出现线程不安全的情况,怎么避免呢,还是采用strand。
#include <boost/asio.hpp> #include <boost/shared_ptr.hpp> #include <boost/thread.hpp> #include <boost/thread/mutex.hpp> #include <boost/bind.hpp> #include <iostream> boost::mutex global_stream_lock; void WorkerThread( boost::shared_ptr< boost::asio::io_service > io_service ) { global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] Thread Start" << std::endl; global_stream_lock.unlock(); while( true ) { try { boost::system::error_code ec; io_service->run( ec ); if( ec ) { global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] Error: " << ec << std::endl; global_stream_lock.unlock(); } break; } catch( std::exception & ex ) { global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] Exception: " << ex.what() << std::endl; global_stream_lock.unlock(); } } global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] Thread Finish" << std::endl; global_stream_lock.unlock(); } void TimerHandler( const boost::system::error_code & error, boost::shared_ptr< boost::asio::deadline_timer > timer, boost::shared_ptr< boost::asio::io_service::strand > strand ) { if( error ) { global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] Error: " << error << std::endl; global_stream_lock.unlock(); } else { std::cout << "[" << boost::this_thread::get_id() << "] TimerHandler " << std::endl; timer->expires_from_now( boost::posix_time::seconds( 1 ) ); timer->async_wait( strand->wrap( boost::bind( &TimerHandler, _1, timer, strand ) ) ); } } void PrintNum( int x ) { std::cout << "[" << boost::this_thread::get_id() << "] x: " << x << std::endl; boost::this_thread::sleep( boost::posix_time::milliseconds( 1000 ) ); } int main( int argc, char * argv[] ) { boost::shared_ptr< boost::asio::io_service > io_service( new boost::asio::io_service ); boost::shared_ptr< boost::asio::io_service::work > work( new boost::asio::io_service::work( *io_service ) ); boost::shared_ptr< boost::asio::io_service::strand > strand( new boost::asio::io_service::strand( *io_service ) ); global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] Press [return] to exit." << std::endl; global_stream_lock.unlock(); boost::thread_group worker_threads; for( int x = 0; x < 2; ++x ) { worker_threads.create_thread( boost::bind( &WorkerThread, io_service ) ); } boost::this_thread::sleep( boost::posix_time::seconds( 1 ) ); strand->post( boost::bind( &PrintNum, 1 ) ); strand->post( boost::bind( &PrintNum, 2 ) ); strand->post( boost::bind( &PrintNum, 3 ) ); strand->post( boost::bind( &PrintNum, 4 ) ); strand->post( boost::bind( &PrintNum, 5 ) ); boost::shared_ptr< boost::asio::deadline_timer > timer( new boost::asio::deadline_timer( *io_service ) ); timer->expires_from_now( boost::posix_time::seconds( 1 ) ); timer->async_wait( strand->wrap( boost::bind( &TimerHandler, _1, timer, strand ) ) ); std::cin.get(); io_service->stop(); worker_threads.join_all(); return 0; }
运行上面的程序,会发现一个按顺序输出的1 2 3 4 5以及随后输出的TimerHandler,如果没有strand,在输出1 2 3 4 5的过程中可能会出现杂乱的输出,这是因为标准输出std::cout并没有加锁,这就会产生上面说的线程不安全问题,采用strand可以很好的解决这个问题。