试试Boost.Asio

2023-03-06 13:18:50 浏览数 (2)

慢慢一点一点看看Boost,这段时间就Asio库吧。 据说这货和libevent的效率差不多,但是Boost的平台兼容性,你懂得。还有它帮忙干掉了很多线程安全和线程分发的事情。

Boost.Asio 依赖项:

  1. Boost.System (所以它必须链接boost_system)
  2. [可选] 如果使用read_until() or async_read_until() 函数,则依赖Boost.Regex(boost_regex)
  3. [可选] SSL功能依赖OpenSSL

先来个简单的,系统信号量 Signal控制:

使用ASIO操作信号量有一个注意事项,不允许再使用其他库或工具管理信号量(如signal() 或 sigaction()函数)

代码语言:javascript复制
#include <cstdio>
#include <iostream>
#include <cstdlib>
#include <stdint.h>
#include <cstring>
#include <string>
#include <vector>
 
#include <boost/asio.hpp>
#include <boost/bind.hpp>
 
void signal_handler(
    boost::asio::signal_set& stSigs,
    const boost::system::error_code& error,
    int signal_number)
{
    if (error)
    {
        std::cout<< error.message()<< std::endl;
    }
 
    std::cout<< "Catch a signal: "<< signal_number<< std::endl;
 
    stSigs.async_wait(boost::bind(signal_handler, boost::ref(stSigs), _1, _2));
}
 
void signal_handler2(
    boost::asio::signal_set& stSigs,
    const boost::system::error_code& error,
    int signal_number)
{
    if (error)
    {
        std::cout<< error.message()<< std::endl;
    }
 
    std::cout<< "Catch a signal - handle 2: "<< signal_number<< std::endl;
 
    stSigs.async_wait(boost::bind(signal_handler2, boost::ref(stSigs), _1, _2));
}
 
int main(int argc, char* argv[]) {
 
    boost::asio::io_service stMainService;
    boost::asio::signal_set stSigs(stMainService);
 
    stSigs.add(SIGINT);
    stSigs.add(SIGTERM);
 
    stSigs.async_wait(boost::bind(signal_handler, boost::ref(stSigs), _1, _2));
    stSigs.async_wait(boost::bind(signal_handler2, boost::ref(stSigs), _1, _2));
 
    stMainService.run();
 
    return 0;
}

So easy吧,可以一次注册多个handler。 需要注意的是每次触发signal的handler后,handler就被取消了,需要重新注册一次。否则下一次就不会跳到这个handler了

第二个尝试,网络IO:

按照文档描述,除非使用宏来禁止功能,网络IO在不同的环境下采用了不同的实现方式

  1. Windows: IOCP
  2. Linux: epoll
  3. Mac OS X, FreeBSD, NetBSD, OpenBSD: kqueue
  4. Solaris: /dev/poll

Boost.Asio的接口是仿IOCP的异步IO的形式的(参见:http://www.cnblogs.com/hello-leo/archive/2011/04/12/leo.html),所以,其他环境下都是模拟的。

代码语言:javascript复制
#include <cstdio>
#include <iostream>
#include <cstdlib>
#include <stdint.h>
#include <cstring>
#include <string>
#include <sstream>
#include <map>
 
#include <boost/asio.hpp>
#include <boost/random.hpp>
#include <boost/bind.hpp>
#include <boost/thread.hpp>
#include <boost/smart_ptr.hpp>
#include <boost/atomic.hpp>
 
 
void signal_handler(
    boost::asio::io_service& stMainService,
    const boost::system::error_code& error,
    int signal_number)
{
    if (error)
    {
        std::cout<< error.message()<< std::endl;
    }
 
    std::cout<< "Catch a signal: "<< signal_number<< std::endl;
 
    stMainService.stop();
}
 
 
/** 服务器 socket连接表 **/
std::map<boost::asio::ip::tcp::socket::native_handle_type, 
    std::pair<
        boost::shared_ptr<boost::asio::ip::tcp::socket>, 
        boost::shared_ptr<boost::asio::streambuf>
    >
> g_stServerSockMap;
 
boost::atomic_int32_t g_iMaxClientNumber, g_iCurClientNumber;
 
/**
 * 服务器异步发送数据回调函数
 * @param [in] ptrBuffStr 发送的数据buff(传过来仅是为了给智能指针计数 1,防止释放数据的)
 * @param [in] error 错误信息
 * @param [in] bytes_transferred 发送的数据大小
 */
void server_thread_send_handler(
    boost::shared_ptr<std::string> ptrBuffStr,
    const boost::system::error_code& error, // Result of operation.
    std::size_t bytes_transferred
    )
{
    if (error)
    {
        std::cout<< "==== Server Thread "<< boost::this_thread::get_id()<< " Send Failed: "<< error.message()<< std::endl;
    }
}
 
/**
 * 服务器异步接收数据回调函数
 * @param [in] ptrCurSock 收取数据的Socket
 * @param [in] ptrSockStreamBuff 收取数据的Buff对象
 * @param [in] error 错误信息
 */
void server_thread_recv_handler(
    boost::shared_ptr<boost::asio::ip::tcp::socket> ptrCurSock,
    boost::shared_ptr<boost::asio::streambuf> ptrSockStreamBuff,
    const boost::system::error_code& error // Result of operation.
    )
{
    // 5.2.1 先接收buff长度
    if (ptrSockStreamBuff->size() == sizeof(size_t))
    {
        const size_t* pLen = boost::asio::buffer_cast<const size_t*>(ptrSockStreamBuff->data());
        boost::asio::async_read(*ptrCurSock, *ptrSockStreamBuff, boost::asio::transfer_exactly(*pLen), boost::bind(
            server_thread_recv_handler, ptrCurSock, ptrSockStreamBuff, _1
        ));
 
        // 移出socket buff
        ptrSockStreamBuff->consume(sizeof(size_t));
        return;
    }
 
    // 5.2.2 再接收buff数据
    if (ptrSockStreamBuff->size() > 0)
    {
        // 5.2.3 处理数据
        boost::asio::streambuf::const_buffers_type bufs = ptrSockStreamBuff->data();
        std::string strOut(boost::asio::buffers_begin(bufs), boost::asio::buffers_begin(bufs)   ptrSockStreamBuff->size());
        strOut  = (char)(0);
        std::cout<< "==== Server Thread "<< boost::this_thread::get_id()<< " Sock "<< ptrCurSock->native_handle()<< " Recv Data: "<< strOut<< std::endl;
        
        // // 5.2.4 移出socket buff
        ptrSockStreamBuff->consume(ptrSockStreamBuff->size());
 
        // Sleep 500 毫秒。模拟正忙
        boost::this_thread::sleep_for(boost::chrono::milliseconds(500));
 
        // 5.2.5 回包
        // 组装数据
        std::stringstream stServerSendBuf;
        stServerSendBuf << "Server Thread "<< boost::this_thread::get_id()<< " Say Got{ "<< strOut<< " }";
        boost::shared_ptr<std::string> ptrBuffStr = boost::shared_ptr<std::string>(new std::string(stServerSendBuf.str()));
 
        // 先发Buff长度
        size_t uBufLen = ptrBuffStr->size();
        boost::asio::async_write(*ptrCurSock, boost::asio::buffer(&uBufLen, sizeof(uBufLen)), boost::asio::transfer_exactly(sizeof(uBufLen)),
            boost::bind(server_thread_send_handler, ptrBuffStr, _1, _2)
        );
 
        // 再发数据
        boost::asio::async_write(*ptrCurSock, boost::asio::buffer(*ptrBuffStr), boost::asio::transfer_exactly(ptrBuffStr->size()),
            boost::bind(server_thread_send_handler, ptrBuffStr, _1, _2)
        );
 
        // 5.2.6 重新监听
        boost::asio::async_read(*ptrCurSock, *ptrSockStreamBuff, boost::asio::transfer_exactly(sizeof(size_t)), boost::bind(
            server_thread_recv_handler, ptrCurSock, ptrSockStreamBuff, _1
        ));
    }
 
    // 5.2.7 关闭退出
    if (error == boost::asio::error::eof)
    {
        std::cout<< "xxxx Server Thread "<< boost::this_thread::get_id()<< " Sock Release: "<< ptrCurSock->native_handle()<< std::endl;
        g_stServerSockMap.erase(ptrCurSock->native_handle());
 
        g_iCurClientNumber.fetch_sub(1);
        return;
    }
 
    if (error)
    {
        std::cout<< "==== Server Thread "<< boost::this_thread::get_id()<< " Recv Failed: "<< error.message()<< std::endl;
        return;
    }
}
 
/**
 * 服务器异步接受Socket回调函数
 * @param [in] stMainService 服务对象
 * @param [in] stAccepter 接受Socket
 * @param [in] ptrCurSock 建立连接的Socket
 * @param [in] error 错误信息
 */
void server_thread_accept_handler(
    boost::asio::io_service& stMainService,
    boost::asio::ip::tcp::acceptor& stAccepter,
    boost::shared_ptr<boost::asio::ip::tcp::socket>& ptrCurSock,
    const boost::system::error_code& error // Result of operation.
    )
{
    if (error)
    {
       std::cout<< "     Server Accept Failed: "<< error.message()<< std::endl;
       return;
    }
 
    std::cout<< "     Server Thread "<< boost::this_thread::get_id()<< " Accept a socket: "<< ptrCurSock->native_handle()<< std::endl;
    std::cout<< "tServer Thread Local End Point:"<< std::endl<<
         "ttServer Thread Address: "<< ptrCurSock->local_endpoint().address().to_string()<< std::endl<< 
         "ttServer Thread Port: "<< ptrCurSock->local_endpoint().port()<< std::endl<< 
         "ttServer Thread Protocol: "<< ptrCurSock->local_endpoint().protocol().protocol()<< std::endl;
    std::cout<< "tServer Thread Remote End Point:"<< std::endl<<
        "ttServer Thread Address: "<< ptrCurSock->remote_endpoint().address().to_string()<< std::endl<< 
         "ttServer Thread Port: "<< ptrCurSock->remote_endpoint().port()<< std::endl<< 
         "ttServer Thread Protocol: "<< ptrCurSock->remote_endpoint().protocol().protocol()<< std::endl;
    
    g_iCurClientNumber.fetch_add(1);
    g_iMaxClientNumber = std::max(g_iMaxClientNumber.load(), g_iCurClientNumber.load());
 
    // Step 5.1 接受链入的Socket
    g_stServerSockMap[ptrCurSock->native_handle()] = std::make_pair(
        ptrCurSock, 
        boost::shared_ptr<boost::asio::streambuf>(
            new boost::asio::streambuf()
        )
    );
 
    // Step 5.2 设置Socket接收数据回调
    boost::shared_ptr<boost::asio::streambuf> ptrSockStreamBuff = g_stServerSockMap[ptrCurSock->native_handle()].second;
    boost::asio::async_read(*ptrCurSock, *ptrSockStreamBuff, boost::asio::transfer_exactly(sizeof(size_t)), boost::bind(
        server_thread_recv_handler, ptrCurSock, ptrSockStreamBuff, _1
    ));
 
    // Step 5.3 创建Socket, 接受新Socket
    ptrCurSock = boost::shared_ptr<boost::asio::ip::tcp::socket>(
        new boost::asio::ip::tcp::socket(stMainService)
    );
 
    stAccepter.async_accept(*ptrCurSock, boost::bind(
        server_thread_accept_handler, boost::ref(stMainService), boost::ref(stAccepter), boost::ref(ptrCurSock), _1
    ));
}
 
/**
 * 服务器工作线程
 * @param [in] stMainService 服务对象
 */
void server_thread(boost::asio::io_service& stMainService)
{
    std::cout<< "==== Server Thread "<< boost::this_thread::get_id()<< " Started"<< std::endl;
    stMainService.run(); // 这样,就加到线程池中去了
}
 
/**
 * 服务器主线程
 * @param [in] server_num 服务线程数
 */
void server_main_thread(int server_num)
{
    // Step 1. 创建服务对象
    static boost::asio::io_service stMainService;
 
    // Step 2. 创建地址生成器及生成地址
    boost::asio::ip::tcp::resolver stResolver(stMainService);
    // 其实第二个参数8731也可以写成http、ftp什么的,所以他这里用了字符串
    boost::asio::ip::tcp::endpoint stEndpoint = *stResolver.resolve(boost::asio::ip::tcp::resolver::query("127.0.0.1", "8731"));
 
    // Step 3. 创建接收器
    boost::asio::ip::tcp::acceptor stAccepter(stMainService, stEndpoint);
 
    // Step 4. 创建Socket
    boost::shared_ptr<boost::asio::ip::tcp::socket> ptrCurSock = boost::shared_ptr<boost::asio::ip::tcp::socket>(
        new boost::asio::ip::tcp::socket(stMainService)
    );
 
    stAccepter.async_accept(*ptrCurSock, boost::bind(
        server_thread_accept_handler, boost::ref(stMainService), boost::ref(stAccepter), boost::ref(ptrCurSock), _1
    ));
 
    // Step 5. 创建线程并加入到服务线程池
    std::vector<boost::thread*> stls;
    for (int i = 0; i < server_num;    i)
    {
        // 打印输出错开
        boost::this_thread::sleep_for(boost::chrono::milliseconds(1));
        stls.push_back(
            new boost::thread(server_thread, boost::ref(stMainService))
        );
    }
 
    // Ctrl   C退出服务
    boost::asio::signal_set stSigs(stMainService);
    stSigs.add(SIGINT);
    stSigs.add(SIGTERM);
    stSigs.async_wait(boost::bind(signal_handler, boost::ref(stMainService), _1, _2));
 
    // Step 6. 阻塞主线程,等待服务线程退出
    for (size_t i = 0; i < stls.size();    i)
    {
        stls[i]->join();
        delete stls[i];
    }
}
 
/**
 * 客户端网络线程函数
 */
void client_thread()
{
    std::cout<< "---- Client Thread "<< boost::this_thread::get_id()<< " Started"<< std::endl;
 
    // Step 1. 创建服务对象
    static boost::asio::io_service stMainService;
 
    // Step 2. 创建地址生成器及生成地址
    boost::asio::ip::tcp::resolver stResolver(stMainService);
    // 其实第二个参数8731也可以写成http、ftp什么的,所以他这里用了字符串
    boost::asio::ip::tcp::endpoint stEndpoint = *stResolver.resolve(boost::asio::ip::tcp::resolver::query("127.0.0.1", "8731"));
 
    // Step 3. 创建Socket
    boost::shared_ptr<boost::asio::ip::tcp::socket> ptrCurSock = boost::shared_ptr<boost::asio::ip::tcp::socket>(
        new boost::asio::ip::tcp::socket(stMainService)
    );
 
    // Step 4. 连接服务器
    ptrCurSock->connect(stEndpoint);
    std::cout<< "---- Client Thread "<< boost::this_thread::get_id()<< " Connect Success => "<< ptrCurSock->local_endpoint().port()<< std::endl;
 
    //** 睡眠一段时间再发数据
    static boost::random::mt19937 rng;
    static boost::random::uniform_int_distribution<> sleep_rdm(0,5000);
    int iSleepFor = 1000   sleep_rdm(rng);
    boost::this_thread::sleep_for(boost::chrono::milliseconds(iSleepFor));
 
    // ** 构造数据
    boost::asio::streambuf stSendedBuff;
    std::ostream stSendedBuffOS(&stSendedBuff);
    stSendedBuffOS<< "---- Client Thread "<< boost::this_thread::get_id()<< " Sock "<< ptrCurSock->local_endpoint().port()<< " Say Hello";
 
    // Step 5 客户端同步发送数据
    size_t ullBufSize = stSendedBuff.size();
    boost::asio::write(*ptrCurSock, boost::asio::buffer(&ullBufSize, sizeof(ullBufSize)), boost::asio::transfer_exactly(sizeof(ullBufSize)));
    boost::asio::write(*ptrCurSock, stSendedBuff, boost::asio::transfer_at_least(stSendedBuff.size() - sizeof(ullBufSize)));
    // sent data is removed from input sequence
    stSendedBuff.consume(stSendedBuff.size()); 
 
    std::cout<< "**** Client Thread "<< boost::this_thread::get_id()<< " Sended"<< std::endl;
 
    // Step 6 客户端同步接收数据
    char strRecvStr[2048];
    ptrCurSock->read_some(boost::asio::buffer(strRecvStr, sizeof(size_t)));
    ptrCurSock->read_some(boost::asio::buffer(strRecvStr   sizeof(size_t), *((size_t*)(strRecvStr))));
    std::cout<< "---- Client Thread "<< boost::this_thread::get_id()<< " Recv Data: "<< strRecvStr   sizeof(size_t)<< std::endl;
 
    boost::this_thread::sleep_for(boost::chrono::milliseconds(10000 - iSleepFor));
 
    // Step 7. 等待服务关闭
    stMainService.run();
 
    // Step 8. Socket 关闭退出
    ptrCurSock->close();
}
 
void print_asio_info();
 
int main(int argc, char* argv[]) {
    print_asio_info();
 
    g_iMaxClientNumber.store(0);
    g_iCurClientNumber.store(0);
 
    boost::thread st(server_main_thread, 3);
    std::vector<boost::thread*> ctl;
 
    const int max_client_num = 128;
    for (int i = 0; i < max_client_num;    i)
    {
        // 打印输出错开
        boost::this_thread::sleep_for(boost::chrono::milliseconds(1));
        ctl.push_back(new boost::thread(client_thread));
    }
 
    st.join();
    for (size_t i = 0; i < ctl.size();    i)
    {
        ctl[i]->join();
        delete ctl[i];
    }
 
    std::cout<< "All done, max clients: "<< g_iMaxClientNumber.load()<< std::endl;
 
    print_asio_info();
 
    return 0;
}
 
void print_asio_info()
{
    #if defined(BOOST_ASIO_HAS_IOCP)
        std::cout<< "Boost.Asio using iocp"<< std::endl;
 
        #if defined(BOOST_ASIO_HAS_SERIAL_PORT)
            std::cout<< "tSerial Port: enabled"<< std::endl;
        #else
            std::cout<< "tSerial Port: disabled"<< std::endl;
        #endif
 
        #if defined(BOOST_ASIO_HAS_WINDOWS_STREAM_HANDLE)
            std::cout<< "tStream handle: enabled"<< std::endl;
        #else
            std::cout<< "tStream handle: disabled"<< std::endl;
        #endif
 
        #if defined(BOOST_ASIO_HAS_WINDOWS_RANDOM_ACCESS_HANDLE)
            std::cout<< "tWindows Random Access Handle: enabled"<< std::endl;
        #else
            std::cout<< "tWindows Random Access Handle: disabled"<< std::endl;
        #endif
 
        #if defined(BOOST_ASIO_HAS_WINDOWS_OBJECT_HANDLE)
            std::cout<< "tObject Handle: enabled"<< std::endl;
        #else
            std::cout<< "tObject Handle: disabled"<< std::endl;
        #endif
 
        #if defined(BOOST_ASIO_HAS_WINDOWS_OVERLAPPED_PTR)
            std::cout<< "tWindows Overlapped Ptr: enabled"<< std::endl;
        #else
            std::cout<< "tWindows Overlapped Ptr: disabled"<< std::endl;
        #endif
    #elif defined(BOOST_ASIO_HAS_EPOLL)
        std::cout<< "Boost.Asio using epoll."<< std::endl;
        #if defined(BOOST_ASIO_HAS_EVENTFD)
            std::cout<< "teventfd: enabled"<< std::endl;
        #else
            std::cout<< "teventfd: disabled"<< std::endl;
        #endif
        #if defined(BOOST_ASIO_HAS_TIMERFD)
            std::cout<< "timerfd: enabled"<< std::endl;
        #else
            std::cout<< "timerfd: disabled"<< std::endl;
        #endif
    #elif defined(BOOST_ASIO_HAS_KQUEUE)
        std::cout<< "Boost.Asio using kqueue"<< std::endl;
    #elif defined(BOOST_ASIO_HAS_DEV_POLL)
        std::cout<< "Boost.Asio using solaris: /dev/poll"<< std::endl;
    #endif
 
    #if defined(BOOST_ASIO_HAS_POSIX_STREAM_DESCRIPTOR)
        std::cout<< "Posix Stream Descriptor: enabled"<< std::endl;
    #else
        std::cout<< "Posix Stream Descriptor: disabled"<< std::endl;
    #endif
 
    #if defined(BOOST_ASIO_HAS_LOCAL_SOCKETS)
        std::cout<< "Unix domain socket: enabled"<< std::endl;
    #else
        std::cout<< "Unix domain socket: disabled"<< std::endl;
    #endif
 
    #if defined(BOOST_ASIO_HAS_SIGACTION)
        std::cout<< "sigaction: enabled"<< std::endl;
    #else
        std::cout<< "sigaction: disabled"<< std::endl;
    #endif
}

这个搞起来相当费劲,经常需要跳转到Boost的源码中,查看一些回调函数的定义式write和write_some函数在completion_condition返回0时才发送,否则将数据加入到发送窗口,并且没有发生数据拷贝,也就是说,如果是异步操作,开发者必须保证发送时数据有效。(这类函数默认的completion_condition是仿函数transfer_all,返回default_max_transfer_size: 65536) 同样,read和read_some也是这样。 Send和receive函数才是立即执行的(不推荐使用)。

另外,streambuf流用于管理发送或接收缓冲,但是在发送或接收完后,要执行consume函数移出或commit移入缓冲区,否则数据不会被销毁。

UDP和TCP的类似,我就不再多写一个demo了。 以上sample的client和server的读数据采用了两种不同的方式

有一点比较爽,在多线程条件下 io_service的run函数是线程安全的,也就是说,多个线程调用同一个run的时候,就自动被加入工作线程池,在消息到来的时候io_service会找到一个可用的线程进行处理。

注:以上代码Visual Studio中需要包含Boost的include目录和lib目录;GCC或Clang中需要加编译选项-I[BOOST_PREFIX目录]/include –L[BOOST_PREFIX目录]/lib -lboost_random -lboost_thread -lboost_system -lstdc -lrt -pthread -D_POSIX_MT_, 如果BOOST不在系统动态库查找目录中且同时存在Boost的动态和静态库则加上 -static 选项

另外,这个demo代码包含一些功能检测的代码。

小试牛的第三刀,定时器Timer:

还是喜欢上代码,so…

代码语言:javascript复制
#include <cstdio>
#include <iostream>
#include <cstdlib>
#include <stdint.h>
#include <cstring>
#include <string>
#include <sstream>
#include <map>
 
#include <boost/chrono.hpp>
#include <boost/asio.hpp>
#include <boost/bind.hpp>
 
 
void signal_handler(
    boost::asio::io_service& stMainService,
    const boost::system::error_code& error,
    int signal_number)
{
    if (error)
    {
        std::cout<< error.message()<< std::endl;
    }
 
    std::cout<< "Catch a signal: "<< signal_number<< std::endl;
 
    stMainService.stop();
}
 
 
void deadline_timer_handler(
  const boost::system::error_code& error // Result of operation.
)
{
    std::cout<< "On Deadline Timer Actived"<< std::endl;
}
 
void waitable_timer_handler(
  const boost::system::error_code& error // Result of operation.
)
{
    std::cout<< "On Waitable Timer Actived"<< std::endl;
}
 
void steady_timer_handler(
  boost::asio::basic_waitable_timer<boost::chrono::steady_clock>& stSteadyTimer,
  const boost::system::error_code& error // Result of operation.
)
{
    std::cout<< "On Steady Timer Actived"<< std::endl;
    if (error == boost::asio::error::operation_aborted)
    {
        std::cout<< "On Steady Timer Abouted"<< std::endl;
    }
    else
    {
        stSteadyTimer.expires_at(stSteadyTimer.expires_at()   boost::chrono::seconds(1));
        stSteadyTimer.async_wait(boost::bind(steady_timer_handler, boost::ref(stSteadyTimer), _1));
    }
}
 
int main(int argc, char* argv[]) {
    
    // Step 1. 创建服务对象
    boost::asio::io_service stMainService;
 
    // =========== deadline timer ===========
    // Step 2(1) 创建Timer对象
    boost::asio::deadline_timer stDeadlineTimer(stMainService);
    stDeadlineTimer.expires_from_now(boost::posix_time::seconds(2));
 
    // Step 3(1) 设置Timer回调
    stDeadlineTimer.async_wait(deadline_timer_handler);
 
    // =========== high_resolution_clock timer ===========
    // Step 2(2) 创建Timer对象
    boost::asio::basic_waitable_timer<boost::chrono::high_resolution_clock> stWaitableTimer(stMainService);
    stWaitableTimer.expires_from_now(boost::chrono::seconds(1));
 
    // Step 3(2) 设置Timer回调
    stWaitableTimer.async_wait(waitable_timer_handler);
 
    // =========== steady timer ===========
    // Step 2(3) 创建Timer对象
    boost::asio::basic_waitable_timer<boost::chrono::steady_clock> stSteadyTimer(stMainService);
    stSteadyTimer.expires_from_now(boost::chrono::seconds(1));
 
    // Step 3(3) 设置Timer回调
    stSteadyTimer.async_wait(boost::bind(steady_timer_handler, boost::ref(stSteadyTimer), _1));
 
    // =========== system_timer ===========
    // =========== 对应 chrono::system_clock 懒得演示了,都差不多,只是时间单位和载体不同而已 ===========
 
    // 绑定 Ctrl   C退出服务
    boost::asio::signal_set stSigs(stMainService);
    stSigs.add(SIGINT);
    stSigs.add(SIGTERM);
    stSigs.async_wait(boost::bind(signal_handler, boost::ref(stMainService), _1, _2));
 
    stMainService.run();
 
    return 0;
}

话说Boost.Asio每次异步wait回调之后还要重新wait一下挺麻烦的

额外功能:

设备文件支持

boost::asio::serial_port 可以打开一个Unix设备文件,并作为输入输出流,然后可以用自由函数boost::asio::read(),boost::asio::async_read(),boost::asio::write(),boost::asio::async_write(),boost::asio::read_until() 和 boost::asio::async_read_until()操作这些文件

在Windows上,需要系统支持I/O completion port时才能使用,可以通过BOOST_ASIO_HAS_SERIAL_PORTS 这个宏来检测是否可用这个功能(如果定义了则可用)。

Unix系统功能

第一项是Unix Domain Sockets

这种socket可以通过

代码语言:javascript复制
local::datagram_protocol::socket socket1(my_io_service);
local::datagram_protocol::socket socket2(my_io_service);
local::connect_pair(socket1, socket2);
//或者
//服务端
local::stream_protocol::endpoint ep("/tmp/foobar");
local::stream_protocol::acceptor acceptor(my_io_service, ep);
local::stream_protocol::socket socket(my_io_service);
acceptor.accept(socket);
//客户端
local::stream_protocol::endpoint ep("/tmp/foobar");
local::stream_protocol::socket socket(my_io_service);
socket.connect(ep);

来使用,其他的部分和普通的Socket一样

第二项是指向流的文件描述符

posix::stream_descriptor in(my_io_service, ::dup(STDIN_FILENO)); posix::stream_descriptor out(my_io_service, ::dup(STDOUT_FILENO));

创建出的descriptor可以用asio的自由函数的读写函数操作

第三项是fork支持通过notify_fork函数来重建内部描述符

SSL支持

这部分依赖OpenSSL,简单的说,就是在socket外面包了一层,然后操作带ssl的socket。

大概就是

代码语言:javascript复制
typedef ssl::stream<tcp::socket> ssl_socket;

然后用ssl_socket代替tcp::socket

简要性能测试

  • 测试机器: CPU Intel(R) Xeon(R) CPU X3440 @ 2.53GHz × 8 , 内存 8096MB
  • 测试环境: Linux 2.6.32.43, GCC 4.8.0, 编译优化配置 -O2 -g -ggdb
  • 程序配置: 16 工作进程,1主进程,主线程参与逻辑
  • 测试结果: 5000-8000连接,每秒收到约320K个报文,7MB流量,每秒发送约320K个报文,12MB流量, CPU 负载: 180%(5000连接) – 195% (8000连接)

结论: 不知道为什么,压力再也上不去了, 我是把输出重定向到文件的,所以开始以为是磁盘IO爆了,写出日志次数太多,但是gperftools显示磁盘IO占用并不高,性能分布还是比较平均的。但是基本上就在16万个报文了(每个包有一次发送长度的包[4字节]和一次数据的send[不定长])

测试代码地址: https://gist.github.com/owent/5660983 profile性能分析报告: http://api.owent.net/resource/doc/link/test.asio.pdf

主要就是这个样子,一些更底层的东西比如srand可以以后再看( * ^ _ ^ * )

0 人点赞