跳转至

线程池

简介

线程池(thread pool): 一种线程使用模式,线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在短时间任务创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数据取决于可用的并发处理器、处理器内核、内存、网络sockets等数量。

实现

实现一个简单的懒汉单例模式的线程池。

这里线程池内部是一个线程数组 std::vector<std::thread> 用做管理线程和一个自己实现的循环队列 ::ring_queue<::thread_task> 做任务队列。

简单封装了一个 thread_task 用来维护线程执行的任务,其内部维护一个可调用对象及其参数。

线程池提供添加任务和启动的功能。在启动后,空闲线程会自动从任务队列中取任务并执行,直到任务队列为空,则停止。

线程池
#pragma once
#include <unistd.h>
#include <thread>
#include <vector>
#include <iostream>
#include <functional>
#include <tuple>
#include "ring_queue.hpp"

// 生成 L ~ (R - 1) 的 元组索引
template<std::size_t ...N>
class tuple_indices {};

template<std::size_t L,std::size_t R,std::size_t ..._Ind>
class make_indices : public make_indices<L,R - 1,R - 1,_Ind...> {};

template<std::size_t L,std::size_t ..._Ind>
class make_indices<L,L,_Ind...> : public tuple_indices<_Ind...> {};


// 编译期求元组大小
template<class Tp>
struct tuple_size{ };

template<class ...Args>
struct tuple_size<std::tuple<Args...>>
{
    // 编译期求值表达式
    static constexpr std::size_t value = sizeof...(Args);
};


/**
* @brief 线程任务,包装任意可调用对象及其参数,禁止拷贝,只能移动。
*        只是简单的封装,不能获得函数执行返回值,不支持引用语义。
*/
class thread_task
{
    // 对回调函数的包装,用于处理任意类型的可调用对象机器参数,并在需要时执行他们
    struct _m_invoke_impl_base
    { virtual void run() = 0; };

    using self = thread_task;
    using _invoker_ptr = std::unique_ptr<_m_invoke_impl_base>;


    template<class _Tuple>
    struct _m_invoke_impl : public _m_invoke_impl_base
    {   
        _Tuple __m_t;   // 存可调用对象和参数包的元组

        template<class Callable,class ...Args>
        _m_invoke_impl(Callable&& __f,Args&&... __args)
            :__m_t(
                std::tuple<Callable,Args...>(
                    __f,std::forward<Args>(__args)...
                )
            )
        { }

        // 调用 __f(__args...)
        template<size_t ..._Ind>
        void _m_invoke(tuple_indices<_Ind...>)
        {
            std::get<0>(__m_t)(std::get<_Ind>(std::move(__m_t))...);
        }

        // 生成 tuple 索引包,并调用函数
        virtual void run()
        { 
            _m_invoke(make_indices<
                1, tuple_size<decltype(__m_t)>::value /*1 ~ (N - 1) 的索引包*/
            >()); 
        }
    };

    _invoker_ptr _m_impl;   // 指向具体的任务对象(基类指针指向子类对象实现多态)


public:
    thread_task() = default;

    template<class Callable,class ...Args>
    thread_task(Callable&& func,Args&&... args)
        :_m_impl(
            new _m_invoke_impl<std::tuple<Callable,Args...>>(
                func,std::forward<Args>(args)...
            )
        )
    { }

    thread_task(const self&) = delete;
    self& operator=(const self&) = delete;

    thread_task(self&&) = default;
    self& operator=(self&&) = default;

    // 执行任务
    void run()
    { _m_impl->run(); }
};


/**
* @brief 懒汉单例模式的线程池
*/
class thread_pool
{
private:
    std::vector<std::thread> _pool; // 线程池
    ::ring_queue<thread_task> _task_que; // 任务队列

    // 实现线程安全的单例模式
    static std::mutex _single_mt;
    static thread_pool* _single;

    void _run()
    {
        for(;;)
        { _task_que.get().run(); /* 从线程队列区取任务 */ }
    }

    thread_pool(size_t thread_pool_size,size_t task_que_size)
        :_pool(thread_pool_size)
        ,_task_que(task_que_size)
    { }

public:

    // 单例模式
    thread_pool(const thread_pool&) = delete;
    thread_pool& operator=(const thread_pool&) = delete;

    // 创建单例线程池,要保证线程安全
    static thread_pool* 
        get_pool(size_t thread_pool_size,size_t task_que_size) noexcept
    {
        // 如果以及创建成功,反复申请锁效率低下,这里直接返回
        if(_single != nullptr) return _single;
        std::unique_lock<std::mutex> lg(_single_mt);
        // 保证第一次创建线程池时,只有一个线程成功申请
        if(_single != nullptr) return _single;
        return _single = new thread_pool(thread_pool_size,task_que_size);
    }

    /**
    * @brief 启动线程池
    */
    void start() noexcept
    {
        for(auto&td:_pool)
            td = std::thread(std::bind(&thread_pool::_run,this));
    }

    /**
    * @brief 向线程池任务队列中加入新任务
    * @param new_task 新的任务
    */
    void add_task(::thread_task&& new_task)
    {
        _task_que.put(std::move(new_task));
    }

    /**
    * @brief 通过可调用对象及其参数构造新的线程任务
    * @param _func 可调用对象
    * @param ..._args 参数
    */
    template<class Callable,class ...Args>
    void add_task(Callable&& _func,Args&&... _args)
    {
        _task_que.put(::thread_task(_func,std::forward<Args>(_args)...));
    }
};


thread_pool* thread_pool::_single = nullptr;
std::mutex thread_pool::_single_mt;
#pragma once
#include <iostream>
#include <vector>
#include <thread>
#include <pthread.h>
#include <semaphore.h>
#include <mutex>

/**
*  @brief 线程安全的循环队列。
*/
template<class Tp>
class ring_queue
{
    std::vector<Tp> _rq;  // 数组实现循环队列
    int _p_pos;     // 生产者当前可以放数据的位置
    int _c_pos;     // 消费者当前可以取的数据位置
    int _capcity;   // 队列容量 
    std::mutex _p_mt;   // 实现生产者间的互斥
    std::mutex _c_mt;   // 实现消费者间的互斥
    sem_t _p_sem;   // 剩余空间
    sem_t _c_sem;   // 队列中的数据量

    int P(sem_t& _sem) 
    { return sem_wait(&_sem); }

    int V(sem_t& _sem)
    { return sem_post(&_sem); }

public:
    ring_queue(int n)
        :_rq(n)
        ,_p_pos(0)
        ,_c_pos(0)
    {
        sem_init(&_p_sem,0,n);
        sem_init(&_c_sem,0,0);
        _capcity = n;
    }

    ring_queue(const ring_queue&) = delete;

    void put(const Tp& in)
    {
        P(_p_sem); // 先申请空间
        _p_mt.lock();   // 放数据时加锁
        _rq[_p_pos] = in;
        (_p_pos += 1) %= _capcity;
        _p_mt.unlock();
        V(_c_sem);
    }

    void put(Tp&& in)
    {
        P(_p_sem); // 先申请空间
        _p_mt.lock();   // 放数据时加锁
        _rq[_p_pos] = std::move(in);
        (_p_pos += 1) %= _capcity;
        _p_mt.unlock();
        V(_c_sem);
    }

    Tp get() 
    {
        P(_c_sem); // 先申请数据
        _c_mt.lock();   // 取数据时加锁
        Tp ret = std::move(_rq[_c_pos]);
        (_c_pos += 1) %= _capcity;
        _c_mt.unlock(); 
        V(_p_sem);
        return ret;
    }

    ~ring_queue()
    {
        sem_destroy(&_c_sem);
        sem_destroy(&_p_sem);
    }
};