目录
线程池的实现
前言
线程池的概念
使用原因及适用场合
线程池的实现原理
程序测试
线程池的实现
前言
初学 C++, 想封装点常用的 C++ 类, 已经写好了 mutex,cond,thread 的类, 想用起来写点东西, 于是就决定写线程池了, 这里拙笔记录下学习笔记.
本文主要内容包括: 线程池的概念, 使用原因, 适用场景, 线程池的实现, 任务调度逻辑, 样例测试.
线程池的概念
线程池是指在一个多线程程序中创建一个线程集合, 在执行新的任务的时候不是新建一个线程, 而是使用线程池中已创建好的线程, 一旦任务执行完毕, 线程就会休眠等待新的任务分配下来, 线程池中的线程数量取决于机器给进程所能分配的内存大小, 以及应用程序的需求.
使用原因及适用场合
1. 在服务器端编程中, 最原始的方法我们使用顺序化的结构, 一个服务器只能处理一个客户, 如果同时 2 个客户端链接上来了, 服务器只能先处理了先到达的那个个, 这样第二个客户端只能等了, 影响客户的响应时间. 它只适用于客户量少的短连接. 这时候有方案 2.
2. 在多线程服务器端编程中, 一个服务器如果要处理多条链接的客户端, 当链接很少的时候我们可以每来一条链接创建一个线程. 但当并发量很大的时候呢, 不停地的增加线程, 在某个时间计算机资源可能耗尽. 于是有了方案 3
3. 为了弥补方案 2 中每个请求创建线程的缺陷, 我们使用固定大小线程池, 全部 IO 交给 IO 复用线程解决(本文不涉及), 而任务计算交给线程池. 如果任务彼此独立, IO 压力不大, 那么这种方案非常适合.
当然服务器模型远不止这 3 种, 还有很多方案, 本文不涉.
线程池的实现原理
线程池类主要维系两个队列: 任务队列, 线程队列
线程池通过 take 方法从线程队列提取任务, 到一个线程中去执行; 有任务就提取执行, 无任务则阻塞线程休眠.
任务队列可以单独写个任务类出来, 也可以写个任务类基类, 预留虚任务函数接口, 继承下来泛化.
当然最便利的方法就是直接用函数地址来做任务咯.
typedef void (*Task)(void);
线程队列 线程队列通过我自己写的线程类实现.
- #include <pthread.h>
- class Thread{
- public:
- typedef void (*threadFun_t)(void *arg);
- explicit Thread(const threadFun_t &threadRoutine, void *arg);
- ~Thread();
- void start();
- void join();
- static void *threadGuide(void *arg);
- pthread_t getThreadId() const{
- return m_threadId;
- }
- private:
- pthread_t m_threadId;
- bool m_isRuning;
- threadFun_t m_threadRoutine;
- void *m_threadArg;
- };
- Thread::Thread(const threadFun_t &threadRoutine, void *arg)
- :m_isRuning(false),
- m_threadId(0),
- m_threadRoutine(threadRoutine),
- m_threadArg(arg){
- }
- Thread::~Thread(){
- if(m_isRuning){// 如果线程正在执行, 则分离此线程.
- CHECK(!pthread_detach(m_threadId));
- }
- }
- void *Thread::threadGuide(void *arg){
- Thread *p = static_cast<Thread *>(arg);
- p->m_threadRoutine(p->m_threadArg);
- return NULL;
- }
- void Thread::join(){
- VERIFY(m_isRuning);
- CHECK(!pthread_join(m_threadId, NULL));
- m_isRuning = false;
- }
- void Thread::start(){
- pthread_attr_t attr;
- CHECK(!pthread_attr_init(&attr));
- //CHECK(!pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED)); //set thread separation state property
- CHECK(!pthread_attr_setinheritsched(&attr, PTHREAD_EXPLICIT_SCHED)); //Set thread inheritance
- CHECK(!pthread_attr_setschedpolicy(&attr, SCHED_OTHER)); //set thread scheduling policy
- CHECK(!pthread_attr_setscope(&attr, PTHREAD_SCOPE_SYSTEM)); //Set thread scope
- CHECK(!pthread_create(&m_threadId, &attr, threadGuide, this));
- m_isRuning = true;
- }{
- MutexLockGuard lock(m_mutex);
- m_isRuning = false;
- }
- }
构造传入带一个空类型参数指针(
为什么要带这个空类型指针后面会提
)函数指针, 通过 start()方法创建线程, 然后执行 threadGuide()方法调用构造时候传入的函数指针执行咱们想运行的函数实现 Thread 类.
这两个队列在线程池中的定义如下:
- private:
- std::vector<Thread *> m_threads;
- std::deque<Task> m_tasks;
任务调度逻辑
任务分配逻辑主要靠两个条件变量实现,(条件变量本文不做详述)
1. 任务队列是否空.
2. 任务队列是否满.
其逻辑如下图所示:
start()方法是线程池的运行方法. 通过它创建线程池.
threadRoutine()就是我们就是线程池中创建的线程,
线程跑起来后, 通过 isRunning 控制线程循环是否退出.
stop()方法关闭线程池, 回收资源.
循环中判断 : 有任务则执行, 无任务则 wait 阻塞等待.
- void ThreadPool::start(){
- m_isRuning = true;
- m_threads.reserve(m_threadsSize);
- for(size_t i = 0; i <m_threadsSize; i++){
- m_threads.push_back(new Thread(threadRoutine, this));
- m_threads[i]->start();
- }
- }
Thread(threadRoutine, this) 这里就是为什么我线程类要带一个无符号类型指针参数的原因, 因为静态函数无法调用 c++ 的类成员函数 (主要原因是类在编译期间未实例化没有明确的地址.) 我们只能通过线程池对象的 this 指针调用它的成员.
任务调度的源码实现:
- ThreadPool::ThreadPool(size_t tasksSize, size_t threadsSize)
- :m_tasksSzie(tasksSize),
- m_threadsSize(threadsSize),
- m_mutex(),
- m_tasksEmpty(m_mutex),
- m_tasksFull(m_mutex),
- m_isRuning(false){
- }
- ThreadPool::~ThreadPool(){
- if(m_isRuning){
- stop();
- }
- }
- void ThreadPool::threadRoutine(void *arg){
- ThreadPool *p = static_cast<ThreadPool *>(arg);
- while(p->m_isRuning){
- ThreadPool::Task task(p->take());
- if(task){
- task();
- }
- }
- }
- ThreadPool::Task ThreadPool::take(){
- MutexLockGuard lock(m_mutex);
- while(m_tasks.empty() && m_isRuning){
- m_tasksEmpty.wait();
- }
- if(!m_tasks.empty()){
- Task task = m_tasks.front();
- m_tasks.pop_front();
- m_tasksFull.notify();
- return task;
- }
- return NULL;
- }
- void ThreadPool::addTask(Task task){
- if(m_threads.empty()){// 如果线程池是空的, 直接跑任务.
- task();
- }
- else{
- MutexLockGuard lock(m_mutex);
- while(m_tasksSzie> 0 && m_tasks.size()>= m_tasksSzie){
- m_tasksFull.wait();
- }
- m_tasks.push_back(task);
- m_tasksEmpty.notify();
- }
- }
- void ThreadPool::start(){
- m_isRuning = true;
- m_threads.reserve(m_threadsSize);
- for(size_t i = 0; i <m_threadsSize; i++){
- m_threads.push_back(new Thread(threadRoutine, this));
- m_threads[i]->start();
- }
- }
- void ThreadPool::stop(){
- {
- MutexLockGuard lock(m_mutex);
- m_isRuning = false;
- m_tasksEmpty.notifyAll();
- }
- for(int i = m_threadsSize - 1; i>= 0; i--){
- m_threads[i]->join();
- delete(m_threads[i]);
- m_threads.pop_back();
- }
- }
程序测试
测试代码:
创建一个有两个线程, 拥有 5 个任务的任务队列, 执行 8 个加数任务.
- #include <stdio.h>
- #include <stdlib.h>
- #include <pthread.h>
- #include "MutexLock.hh"
- #include "Thread.hh"
- #include <unistd.h>
- #include "Condition.hh"
- #include "ThreadPool.hh"
- #include <vector>
- //threadPool test
- MutexLock CntLock;
- int cnt = 0;
- void test(void){
- unsigned long i = 0xfffffff;
- //MutexLockGuard loo(CntLock);
- //CntLock.lock();
- while(i--);
- printf("%d\n", ++cnt);
- //CntLock.unlock();
- sleep(1);
- }
- int main()
- {
- //ThreadPool Test
- ThreadPool tp(5, 2);
- tp.start();
- sleep(3);
- for(int i = 0; i < 8; i++)
- tp.addTask(test);
- getchar();
- return 0;
- }
简单 test 结果:
- thread 140068496353024 run task
- thread 140068504745728 run task
- 1
- 2
- thread 140068504745728 run task
- thread 140068496353024 run task
- 3
- 4
- thread 140068496353024 run task
- thread 140068504745728 run task
- 5
- 6
- thread 140068496353024 run task
- thread 140068504745728 run task
- 7
- 8
来源: https://www.cnblogs.com/ailumiyana/p/9402761.html