快速了解Boost.Asio 的多线程模型
Boost.Asio有两种支持多线程的方式,第一种方式比较简单:在多线程的场景下,每个线程都持有一个io_service,并且每个线程都调用各自的io_service的run()方法。
另一种支持多线程的方式:全局只分配一个io_service,并且让这个io_service在多个线程之间共享,每个线程都调用全局的io_service的run()方法。
每个线程一个I/OService
让我们先分析第一种方案:在多线程的场景下,每个线程都持有一个io_service(通常的做法是,让线程数和CPU核心数保持一致)。那么这种方案有什么特点呢?
1 在多核的机器上,这种方案可以充分利用多个CPU核心。
2 某个socket描述符并不会在多个线程之间共享,所以不需要引入同步机制。
3 在eventhandler中不能执行阻塞的操作,否则将会阻塞掉io_service所在的线程。
下面我们实现了一个AsioIOServicePool,封装了线程池的创建操作:
classAsioIOServicePool
{
public:
usingIOService=boost::asio::io_service;
usingWork=boost::asio::io_service::work;
usingWorkPtr=std::unique_ptr;
AsioIOServicePool(std::size_tsize=std::thread::hardware_concurrency())
:ioServices_(size),
works_(size),
nextIOService_(0)
{
for(std::size_ti=0;i(newWork(ioServices_[i]));
}
for(std::size_ti=0;iioServices_;
std::vectorworks_;
std::vectorthreads_;
std::size_tnextIOService_;
};
AsioIOServicePool使用起来也很简单:
std::mutexmtx;//protectstd::cout
AsioIOServicePoolpool;
boost::asio::steady_timertimer{pool.getIOService(),std::chrono::seconds{2}};
timer.async_wait([&mtx](constboost::system::error_code&ec)
{
std::lock_guardlock(mtx);
std::cout<<"Hello,World!"<
一个I/OService与多个线程
另一种方案则是先分配一个全局io_service,然后开启多个线程,每个线程都调用这个io_service的run()方法。这样,当某个异步事件完成时,io_service就会将相应的eventhandler交给任意一个线程去执行。
然而这种方案在实际使用中,需要注意一些问题:
1 在eventhandler中允许执行阻塞的操作(例如数据库查询操作)。
2 线程数可以大于CPU核心数,譬如说,如果需要在eventhandler中执行阻塞的操作,为了提高程序的响应速度,这时就需要提高线程的数目。
3 由于多个线程同时运行事件循环(eventloop),所以会导致一个问题:即一个socket描述符可能会在多个线程之间共享,容易出现竞态条件(racecondition)。譬如说,如果某个socket的可读事件很快发生了两次,那么就会出现两个线程同时读同一个socket的问题(可以使用strand解决这个问题)。
下面实现了一个线程池,在每个worker线程中执行io_service的run()方法:
classAsioThreadPool
{
public:
AsioThreadPool(intthreadNum=std::thread::hardware_concurrency())
:work_(newboost::asio::io_service::work(service_))
{
for(inti=0;iwork_;
std::vectorthreads_;
};
无锁的同步方式
要怎样解决前面提到的竞态条件呢?Boost.Asio提供了io_service::strand:如果多个eventhandler通过同一个strand对象分发(dispatch),那么这些eventhandler就会保证顺序地执行。
例如,下面的例子使用strand,所以不需要使用互斥锁保证同步了:
AsioThreadPoolpool(4);//开启4个线程
boost::asio::steady_timertimer1{pool.getIOService(),std::chrono::seconds{1}};
boost::asio::steady_timertimer2{pool.getIOService(),std::chrono::seconds{1}};
intvalue=0;
boost::asio::io_service::strandstrand{pool.getIOService()};
timer1.async_wait(strand.wrap([&value](constboost::system::error_code&ec)
{
std::cout<<"Hello,World!"<
多线程EchoServer
下面的EchoServer可以在多线程中使用,它使用asio::strand来解决前面提到的竞态问题:
classTCPConnection:publicstd::enable_shared_from_this
{
public:
TCPConnection(boost::asio::io_service&io_service)
:socket_(io_service),
strand_(io_service)
{}
tcp::socket&socket(){returnsocket_;}
voidstart(){doRead();}
private:
voiddoRead()
{
autoself=shared_from_this();
socket_.async_read_some(
boost::asio::buffer(buffer_,buffer_.size()),
strand_.wrap([this,self](boost::system::error_codeec,
std::size_tbytes_transferred)
{
if(!ec){doWrite(bytes_transferred);}
}));
}
voiddoWrite(std::size_tlength)
{
autoself=shared_from_this();
boost::asio::async_write(
socket_,boost::asio::buffer(buffer_,length),
strand_.wrap([this,self](boost::system::error_codeec,
std::size_t/*bytes_transferred*/)
{
if(!ec){doRead();}
}));
}
private:
tcp::socketsocket_;
boost::asio::io_service::strandstrand_;
std::arraybuffer_;
};
classEchoServer
{
public:
EchoServer(boost::asio::io_service&io_service,unsignedshortport)
:io_service_(io_service),
acceptor_(io_service,tcp::endpoint(tcp::v4(),port))
{
doAccept();
}
voiddoAccept()
{
autoconn=std::make_shared(io_service_);
acceptor_.async_accept(conn->socket(),
[this,conn](boost::system::error_codeec)
{
if(!ec){conn->start();}
this->doAccept();
});
}
private:
boost::asio::io_service&io_service_;
tcp::acceptoracceptor_;
};
以上就是快速了解Boost.Asio的多线程模型的详细内容,更多关于c++Boost.Asio多线程模型的资料请关注毛票票其它相关文章!