为什么要学习《精进C++》?

2022-12-04 16:24:11 浏览数 (1)

在没学习《精进C 》课程完整版上线了之前,大家先来看看下面这段代码。是否上头?挠头?不知所云?

代码语言:javascript复制
       template<typename F, typename... Args>
        auto submit(F &&f, Args &&...args) -> std::future<decltype(f(args...))> 
        {

            std::function<decltype(f(args...))()> func = std::bind(std::forward<F>(f), std::forward<Args>(args)...);

            auto task_ptr = std::make_shared<std::packaged_task<decltype(f(args...))()>>(func);

            std::function<void()> warpper_func = [task_ptr]()
            {
                (*task_ptr)();
            };

            m_queue.push(warpper_func);
            m_conditional_lock.notify_one();
            return task_ptr->get_future();
        }

下面就是利用以上内容编写的线程池模板,在实际工程中,发挥至关重要的作用。

1线程池模板

//为什么是任务队列?

//希望任务以发送它相同的顺序逐个执行

//注意事项

//1,线程池中的线程会持续查询任务队列是否有可用工作,当两个甚至多个线程试图同时执行查询工作时,就会引起灾难

//因此,需要对std::queue进行包装,实现一个线程安全的任务队列:利用mutex来限制并发访问

//https://zhuanlan.zhihu.com/p/367309864

代码语言:javascript复制
template<typename T>
class SafeQueue
{
    private:
        //利用模板函数的构造队列
        std::queue<T> m_queue;
        //访问锁
        std::mutex m_mutex;
    public:
        SafeQueue(){}
        SafeQueue(SafeQueue &&other){}
        ~SafeQueue(){}

        //返回队列是否为空
        bool empty()
        {
            std::unique_lock<std::mutex> lock(m_mutex);
            return m_queue.empty();
        }
        //返回队列大小
        int size()
        {
            std::unique_lock<std::mutex> lock(m_mutex);
            return m_queue.size();
        }
        //队列添加元素
        void push(T &t)
        {
            std::unique_lock<std::mutex> lock(m_mutex);
            m_queue.emplace(t);
        }
        //队列取出元素
        bool pop(T &t)
        {
            std::unique_lock<std::mutex> lock(m_mutex);
            if(m_queue.empty())
            {
                return false;
            }

            //取出队首元素,并进行右值引用
            t = std::move(m_queue.front());
            m_queue.pop();//弹出入队的第一个元素

            return true;
        }
};

//线程池

//1,提交函数:负责向任务队列中添加任务

//1.1 接受任何参数的任何函数 (普通函数,lambda,成员函数.......)

//1.2 立即返回东西,避免阻塞主线程

代码语言:javascript复制
class ThreadPool
{
    private:
        //内置工作线程类
        class ThreadWorker
        {
            private:
                //工作id
                int m_id;
                //所属线程池
                ThreadPool *m_pool;
            public:
                //构造函数
                ThreadWorker(ThreadPool *pool, const int id):m_pool(pool),m_id(id){}

                //重载 ()操作
                void operator()()
                {
                    //基础类 func
                    std::function<void()> func;
                    //是否正在取出队列中的元素
                    bool dequeued;

                    while(!m_pool->m_shutdown)
                    {
                        {
                            //枷锁
                            std::unique_lock<std::mutex> lock(m_pool->m_conditional_mutex);
                            
                            //如果任务队列为空,阻塞当前线程
                            if(m_pool->m_queue.empty())
                            {
                                //等待条件变量来通知
                                m_pool->m_conditional_lock.wait(lock);
                            }

                            //取出任务队列中的元素
                            dequeued = m_pool->m_queue.pop(func);

                        }

                        //如果成功取出,执行工作函数
                        if(dequeued)
                        {
                            func();
                        }
                    }

                }
        };

        //线程池是否关闭
        bool m_shutdown;
        //执行函数安全队列
        SafeQueue<std::function<void()>> m_queue;
        //工作线程队列
        std::vector<std::thread> m_threads;
        //线程休眠互斥
        std::mutex m_conditional_mutex;
        //环境锁,可以让线程处于休眠或唤醒状态
        std::condition_variable m_conditional_lock;

    public:
        
        //线程池构造函数
        ThreadPool(const int n_threads =4): m_threads(std::vector<std::thread>(n_threads)), m_shutdown(false)
        {

        }
        ThreadPool(const ThreadPool&) = delete;
        ThreadPool(ThreadPool &&) = delete;
        ThreadPool &operator=(const ThreadPool &) = delete;
        ThreadPool &operator=(ThreadPool &&) = delete;

        //初始化线程池
        void init()
        {
            for(int i=0; i < m_threads.size();   i)
            {
                //分配工作线程
                m_threads.at(i) = std::thread(ThreadWorker(this,i));
            }
        }

        //等待线程结束当前任务 关闭线程池
        void shutdown()
        {
            m_shutdown = true;
            //通知,唤醒所有线程
            m_conditional_lock.notify_all();

            for(int i = 0; i < m_threads.size();  i)
            {
                //判断线程是否在在等待
                if(m_threads.at(i).joinable())
                {
                    //将线程加入到等待队列
                    m_threads.at(i).join();
                }
            }
        }
        

        //1, 提交函数
        template<typename F, typename... Args>
        auto submit(F &&f, Args &&...args) -> std::future<decltype(f(args...))> //尾返回类型的推导,该函数的返回值会从 std::future<decltype(f(args...))中自动推导出来
        {
            //连接函数和参数定义,特殊函数类型,避免左右错误
            //std::function 进行包装产生一个特殊函数,对多个相似的函数进行包装,可以hold任何通过 ()来调用的对象,使用std::bind将函数f和参数args绑定起来
            std::function<decltype(f(args...))()> func = std::bind(std::forward<F>(f), std::forward<Args>(args)...);
            
            //decltype 解决了auto关键字只能对变量类型进行型别推导的缺陷
            //std::packaged_task可以用来封装可以调用的目标,从而实现异步的调用,func作为他的实例化参数
            auto task_ptr = std::make_shared<std::packaged_task<decltype(f(args...))()>>(func);
            
            //再次利用std::function 将task_ptr指向的std::packaged_task对象取出并包装为 void函数
            std::function<void()> warpper_func = [task_ptr]()
            {
                (*task_ptr)();
            };

            //队列通用安全压入队列
            m_queue.push(warpper_func);

            //唤醒一个等待中的线程
            //为解决死锁而生,当互斥操作不够引入。也就是当条件不满足时 某个线程不能继续执行,此时
            //std::condition_variable实例被创建出现就是用于唤醒等待线程从而避免 死锁
            m_conditional_lock.notify_one();

            //返回先前注册的任务指针
            return task_ptr->get_future();

            //该函数的结果是:获取返回类型为实例为 f(arg...) 的 std::future<>的submit函数

        }
};

测试代码

代码语言:javascript复制
//测试代码
std::random_device rd; // 真实随机数产生器

std::mt19937 mt(rd()); //生成计算随机数mt

std::uniform_int_distribution<int> dist(-1000, 1000); //生成-1000到1000之间的离散均匀分布数

auto rnd = std::bind(dist, mt);

// 设置线程睡眠时间
void simulate_hard_computation()
{
    std::this_thread::sleep_for(std::chrono::milliseconds(2000   rnd()));
}

// 添加两个数字的简单函数并打印结果
void multiply(const int a, const int b)
{
    simulate_hard_computation();
    const int res = a * b;
    std::cout << a << " * " << b << " = " << res << std::endl;
}

// 添加并输出结果
void multiply_output(int &out, const int a, const int b)
{
    simulate_hard_computation();
    out = a * b;
    std::cout << a << " * " << b << " = " << out << std::endl;
}

// 结果返回
int multiply_return(const int a, const int b)
{
    simulate_hard_computation();
    const int res = a * b;
    std::cout << a << " * " << b << " = " << res << std::endl;
    return res;
}

void example()
{
    // 创建3个线程的线程池
    ThreadPool pool(3);

    // 初始化线程池
    pool.init();

    // 提交乘法操作,总共30个
    for (int i = 1; i <= 3;   i)
        for (int j = 1; j <= 10;   j)
        {
            pool.submit(multiply, i, j);
        }

    // 使用ref传递的输出参数提交函数
    int output_ref;
    auto future1 = pool.submit(multiply_output, std::ref(output_ref), 5, 6);

    // 等待乘法输出完成
    future1.get();
    std::cout << "Last operation result is equals to " << output_ref << std::endl;

    // 使用return参数提交函数
    auto future2 = pool.submit(multiply_return, 5, 3);

    // 等待乘法输出完成
    int res = future2.get();
    std::cout << "Last operation result is equals to " << res << std::endl;

    // 关闭线程池
    pool.shutdown();
}

int main()
{
    example();
    return 0;
}

2单队列

//单任务队列线程池

//在线程池构造时初始化线程数,在xigou时候停止线程池,对外也只需要提供提交任务的接口就好

第一步:线程安全队列

代码语言:javascript复制
//线程安全队列
namespace Diana {

    template<typename T>
    class SafeQueue 
    {
        public:
            void push(const T &item) 
            {
                {
                    std::scoped_lock lock(mtx_);
                    queue_.push(item);
                }
                cond_.notify_one();
            }

            void push(T &&item) 
            {// 两个push方法,此处不是万能引用而是单纯右值
                {
                    std::scoped_lock lock(mtx_);
                    queue_.push(std::move(item));
                }
                cond_.notify_one();
            }

            // template <typename U>
            // void push(U&& item) 
            // {
            //     {
            //         static_assert(std::is_same<U,T>::value==true);
            //         std::scoped_lock lock(mtx_);
            //         queue_.push(std::forward(item));
            //     }
            //     cond_.notify_one();
            // }


            bool pop(T &item) 
            {
                std::unique_lock lock(mtx_);

                cond_.wait(lock, [&]() {
                    return !queue_.empty() || stop_;
                });

                if (queue_.empty())
                    return false;

                item = std::move(queue_.front());
                queue_.pop();
                return true;
            }

            std::size_t size() const 
            {
                std::scoped_lock lock(mtx_);
                return queue_.size();
            }

            bool empty() const 
            {
                std::scoped_lock lock(mtx_);
                return queue_.empty();
            }

            void stop() 
            {
                {
                    std::scoped_lock lock(mtx_);
                    stop_ = true;
                }
                cond_.notify_all();
            }

        private:
            std::condition_variable cond_;
            mutable std::mutex mtx_;
            std::queue<T> queue_;
            bool stop_ = false;
        };

};// namespace Diana

第二步:队列线程池

代码语言:javascript复制
namespace Diana {

    using WorkItem = std::function<void()>;
    // * 简易多线程单任务队列线程池,使用SafeQueue线程安全队列。
    class SimplePool {
    public:
        explicit SimplePool(size_t threads = std::thread::hardware_concurrency()) 
        {
            for (size_t i = 0; i < threads;   i) 
            {
                workers_.emplace_back([this]() {
                    for (;;) 
                    {
                        std::function<void()> task;

                        if (!queue_.pop(task))
                            return;

                        if (task)
                            task();
                    }
                });
            }
        }

        void push(WorkItem item) 
        {
            queue_.push(std::move(item));
        }

        ~SimplePool() 
        {
            queue_.stop();
            for (auto& thd: workers_)
                thd.join();
        }

    private:
        SafeQueue<WorkItem> queue_;
        std::vector<std::thread> workers_;
    };

};// namespace Diana

第三步:测试

代码语言:javascript复制
std::string funA(std::string str) 
{
    return "hello"   str;
}

void test_simple_thread_pool() 
{
    std::cout << "test_simple_thread_pool()" << std::endl;
    Diana::SimplePool threadPool;
    threadPool.push([] { std::cout << "hellon"; });
    // * 此处必须使用shared_ptr进行包装,
    // * 否则在std::function<void()>中会尝试生成std::packaged_task的拷贝构造函数,
    // ! std::packaged_task禁止拷贝操作
    auto task = std::make_shared<std::packaged_task<std::string()>>(std::bind(funA, "world"));
    std::future<std::string> res = task->get_future();
    threadPool.push([task = std::move(task)] { (*task)(); });
    // ! 以下实现方法是错误的
    //  auto task = std::packaged_task<std::string()>(std::bind(funA, "world"));
    //  std::future<std::string> res = task.get_future();
    //  threadPool.enqueue(std::move(task));
    std::cout << res.get() << std::endl;
}

3多队列

//多任务线程队列

//每个线程对应着一个自己的任务队列,当提交一个任务时,可以指定他放到任意一个线程的任务队列

//在用户没有指定任务队列时,就为该任务随即选择一个线程对应的任务队列

代码语言:javascript复制
namespace Diana {

    using WorkItem = std::function<void()>;
    // * 简易多线程多任务队列线程池,使用SafeQueue线程安全队列。
    class MultiplePool {
    public:
        explicit MultiplePool(size_t thread_num = std::thread::hardware_concurrency())
            : queues_(thread_num),thread_num_(thread_num) 
        {
            auto worker = [this](size_t id) {
                while (true) 
                {
                    WorkItem task{};
                    if (!queues_[id].pop(task))
                        break;

                    if (task)
                        task();
                }
            };

            workers_.reserve(thread_num_);
            for (size_t i = 0; i < thread_num_;   i) 
            {
                workers_.emplace_back(worker, i);
            }
        }

        int schedule_by_id(WorkItem fn, size_t id = -1)
        {
            if (fn == nullptr)
                return -1;

            if (id == -1)
            {
                id = rand() % thread_num_;// 随机插入到一个线程的任务队列中
                queues_[id].push(std::move(fn));
            } 
            else 
            {
               // assert(id < thread_num_);// 插入指定线程的任务队列
                queues_[id].push(std::move(fn));
            }

            return 0;
        }

        ~MultiplePool() 
        {
            for (auto& queue: queues_) 
            {
                queue.stop();// 停止每一个任务队列
            }
            for (auto& worker: workers_) 
            {
                worker.join();// 阻塞,等待每个线程执行结束
            }
        }

    private:
        std::vector<Diana::SafeQueue<WorkItem>> queues_;// 每个线程对应一个任务队列
        size_t thread_num_;
        std::vector<std::thread> workers_;
    };

};// namespace Diana

测试

代码语言:javascript复制
std::string funA(std::string str) 
{
    return "hello"   str;
}

//多任务队列
void test_multiple_thread_pool() 
{
    std::cout << "test_multiple_thread_pool" << std::endl;
    Diana::MultiplePool threadPool;
    threadPool.schedule_by_id([] { std::cout << "hellon"; });
    auto task = std::make_shared<std::packaged_task<std::string()>>(std::bind(funA, "world"));
    std::future<std::string> res = task->get_future();
    threadPool.schedule_by_id([task = std::move(task)] { (*task)(); });
    std::cout << res.get() << std::endl;
}
int main()
{
    test_simple_thread_pool();

    test_multiple_thread_pool();
}

文章来源:

https://zhuanlan.zhihu.com/p/367309864

0 人点赞