這個系列Lee哥會分享一些c++中的開源架構。
主要從TCP協定棧、架構、并發性、資料庫、國際化、壓縮、日志、多媒體庫、序列化、XML庫、腳本、Json庫、數學庫、安全、WEB應用架構、網絡庫、異步事件等方面來分享。分享的内容不僅限于自己來寫,有會借鑒網上大牛們的一些文章與大家分享。
之前我們講了TCP,今天我們來講一講架構。這裡Lee哥把架構的技術分成4個子產品。
1.facebook開發和使用的開源c++庫folly
2.高性能網絡應用程式c++11 evented IO LibSourcey
3.企業應用程式開發架構libphenom
4.通用C++庫的集合Boost
對于TCP協定棧想了解的朋友可以看一下
bilibili/video/av57632138
今天分享的是:boost asio 實作一個TCP服務端線程池
原文轉載于blog.csdn/hust_wangyajun/article/details/52571752
tcp的伺服器端綁定并監聽端口,如果用戶端比較少,可以對每個用戶端建立一個線程進行通訊處理,但當用戶端的數量比較龐大的時候這種思路就變得不可行,一方面線程切換的開銷太大,另一方面,多數線程并不出于“工作”狀态,
賣二手手機靓号平台長期出于等待事件的狀态。這時,可以使用線程池的架構加快處理速度。
廢話少說,直接上代碼
#include
using boost::asio::ip::tcp;
class handler_allocator
: private boost::noncopyable
{
public:
handler_allocator()
: in_use_(false)
}
void* allocate(std::size_t size)
if (!in_use_ && size < storage_.size)
in_use_=true;
return storage_.address();
else
return ::operator new(size);
void deallocate(void* pointer)
if (pointer==storage_.address())
in_use_=false;
::operator delete(pointer);
private:
// Storage space used for handler-based custom memory allocation.
boost::aligned_storage<1024> storage_;
// Whether the handler-based custom allocation storage has been used.
bool in_use_;
};
template
class custom_alloc_handler
custom_alloc_handler(handler_allocator& a, Handler h)
: allocator_(a),
handler_(h)
void operator()(Arg1 arg1)
handler_(arg1);
void operator()(Arg1 arg1, Arg2 arg2)
handler_(arg1, arg2);
friend void* asio_handler_allocate(std::size_t size,
custom_alloc_handler* this_handler)
return this_handler->allocator_.allocate(size);
friend void asio_handler_deallocate(void pointer, std::size_t /size*/,
this_handler->allocator_.deallocate(pointer);
handler_allocator& allocator_;
Handler handler_;
// Helper function to wrap a handler object to add custom allocation.
inline custom_alloc_handler make_custom_alloc_handler(
handler_allocator& a, Handler h)
return custom_alloc_handler(a, h);
/// A pool of io_service objects.
class io_service_pool
/// Construct the io_service pool.
explicit io_service_pool(std::size_t pool_size) : next_io_service_(0)
if (pool_size==0)
throw std::runtime_error("io_service_pool size is 0");
// Give all the io_services work to do so that their run() functions will not
// exit until they are explicitly stopped.
for (std::size_t i=0; i < pool_size; ++i)
io_service_ptr io_service(new boost::asio::io_service);
work_ptr work(new boost::asio::io_service::work(*io_service));
io_services_.push_back(io_service);
work_.push_back(work);
// Run all io_service objects in the pool.
void run()
// Create a pool of threads to run all of the io_services.
std::vector > threads;
for (std::size_t i=0; i < io_services_.size(); ++i)
boost::shared_ptr thread(new boost::thread(
boost::bind(&boost::asio::io_service::run, io_services_[i])));
threads.push_back(thread);
// Wait for all threads in the pool to exit.
for (std::size_t i=0; i < threads.size(); ++i)
threads[i]->join();
// Stop all io_service objects in the pool.
void stop()
// Explicitly stop all io_services.
io_services_[i]->stop();
// Get an io_service to use.
boost::asio::io_service& get_io_service()
// Use a round-robin scheme to choose the next io_service to use.
boost::asio::io_service& io_service=*io_services_[next_io_service_];
++next_io_service_;
if (next_io_service_==io_services_.size())
next_io_service_=0;
return io_service;
typedef boost::shared_ptr io_service_ptr;
typedef boost::shared_ptr work_ptr;
/// The pool of io_services.
std::vector io_services_;
/// The work that keeps the io_services running.
std::vector work_;
/// The next io_service to use for a connection.
std::size_t next_io_service_;
class session
: public boost::enable_shared_from_this
session(boost::asio::io_service& work_service
, boost::asio::io_service& io_service)
: socket_(io_service)
, io_work_service(work_service)
tcp::socket& socket()
return socket_;
void start()
boost::system::error_code error;
handle_write(error);
socket_.async_read_some(boost::asio::buffer(data_),
make_custom_alloc_handler(allocator_,
boost::bind(&session::handle_read,
shared_from_this(),
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred)));
void handle_read(const boost::system::error_code& error,
size_t bytes_transferred)
if (!error)
boost::shared_ptr > buf(new std::vector);
buf->resize(bytes_transferred);
std::copy(data_.begin(), data_.begin() + bytes_transferred, buf->begin());
io_work_service.post(boost::bind(&session::on_receive
, shared_from_this(), buf, bytes_transferred));
void handle_write(const boost::system::error_code& error)
char notice[]="Welcome to Connect to Hiper Service";
size_t num;
try
num=socket_.send(boost::asio::buffer(notice));
catch(std::exception &e)
std::cout<<"exception: "<
if(num>0)
std::cout<<"send : "<< notice << std::endl;
void on_receive(boost::shared_ptr > buffers
, size_t bytes_transferred)
char data_stream=&(buffers->begin());
// in here finish the work.
std::cout << "receive :" << bytes_transferred << " bytes." <<
"message :" << data_stream << std::endl;
// The io_service used to finish the work.
boost::asio::io_service& io_work_service;
// The socket used to communicate with the client.
tcp::socket socket_;
// Buffer used to store data received from the client.
boost::array data_;
// The allocator to use for handler-based custom memory allocation.
handler_allocator allocator_;
typedef boost::shared_ptr session_ptr;
class server
server(short port, std::size_t io_service_pool_size)
: io_service_pool_(io_service_pool_size)
, io_service_work_pool_(io_service_pool_size)
, acceptor_(io_service_pool_.get_io_service(), tcp::endpoint(tcp::v4(), port))
session_ptr new_session(new session(io_service_work_pool_.get_io_service()
, io_service_pool_.get_io_service()));
acceptor_.async_accept(new_session->socket(),
boost::bind(&server::handle_accept, this, new_session,
boost::asio::placeholders::error));
void handle_accept(session_ptr new_session,
const boost::system::error_code& error)
new_session->start();
new_session.reset(new session(io_service_work_pool_.get_io_service()
io_thread_.reset(new boost::thread(boost::bind(&io_service_pool::run
, &io_service_pool_)));
work_thread_.reset(new boost::thread(boost::bind(&io_service_pool::run
, &io_service_work_pool_)));
io_service_pool_.stop();
io_service_work_pool_.stop();
io_thread_->join();
work_thread_->join();
boost::shared_ptr io_thread_;
boost::shared_ptr work_thread_;
io_service_pool io_service_pool_;
io_service_pool io_service_work_pool_;
tcp::acceptor acceptor_;