线程池是一种多线程处理形式, 初始创建多个线程, 初始线程处于 wait 状态. 处理过程中将任务添加到队列中, 按照队列顺序依次处理, 此时线程处于 work 状态自动启动这些任务. 线程任务处理完后继续处理队列中待执行任务, 最后完成所有任务放回至线程池统一销毁. 线程池线程都是后台线程, 适用于连续产生大量并发任务的场合. 每个线程都使用默认的堆栈大小, 以默认的优先级运行, 并处于多线程单元中.
线程池(英语: thread pool): 一种线程 https://baike.baidu.com/item/线程 使用模式. 线程过多会带来调度开销, 进而影响缓存局部性和整体性能. 而线程池维护着多个线程, 等待监督管理者分配可并发执行的任务. 线程池不仅避免了在处理短时间任务时创建与销毁线程的代价, 还能够保证内核的充分利用, 防止过分调度.
1, 初始化线程池: 创建初始的任务链表 (队列) 及 N 个线程, 初始化条件变量;
2, 任务链表管理: 任务存放至链表中, 记录当前待处理任务量, 并存放未处理的任务, 为线程池提供一种缓冲机制;
当任务链表头为空, 则表示无待处理任务, 可等待或销毁线程池; 当任务链表头指向非空, 线程执行待任务, 任务链表头下移, 等待任务 - 1.
3, 线程池管理器(ThreadPoolManager): 监测并管理线程池中线程状态,
线程状态管理: 1)wait -- 线程池处于非关闭, 当前无任务, 线程关闭;
2)work -- 线程开启, 执行待处理任务(process), 任务链表头下移, 等待任务 - 1;
4, 线程池销毁: 当任务全部完成或接收到销毁指令, 销毁等待链表及所有线程, 销毁后指针置空.
- #include <stdio.h>
- #include <stdlib.h>
- #include <unistd.h>
- #include <sys/types.h>
- #include <pthread.h>
- typedef struct task
- {
- void *(*process) (void *arg);
- void *arg;
- struct task *next;
- } Cthread_task;
- /* 线程池结构 */
- typedef struct
- {
- pthread_mutex_t queue_lock;
- pthread_cond_t queue_ready;
- /* 链表结构, 线程池中所有等待任务 7*/
- Cthread_task *queue_head;
- /* 是否销毁线程池 */
- int shutdown;
- pthread_t *threadid;
- /* 线程池中线程数目 3*/
- int max_thread_num;
- /* 当前等待的任务数 */
- int cur_task_size;
- } Cthread_pool;
- static Cthread_pool *pool = NULL;
- void *thread_routine (void *arg);
- void pool_init (int max_thread_num)
- {
- int i = 0;
- pool = (Cthread_pool *) malloc (sizeof (Cthread_pool));
- pthread_mutex_init (&(pool->queue_lock), NULL);
- /* 初始化条件变量 */
- pthread_cond_init (&(pool->queue_ready), NULL);
- pool->queue_head = NULL; // 等待任务链表为空
- pool->max_thread_num = max_thread_num;
- pool->cur_task_size = 0;
- pool->shutdown = 0;
- pool->threadid = (pthread_t *) malloc (max_thread_num * sizeof (pthread_t));
- for (i = 0; i <max_thread_num; i++) //3
- {
- pthread_create (&(pool->threadid[i]), NULL, thread_routine, NULL); // 创建线程
- }
- }
- /* 向线程池中加入任务 */
- int pool_add_task (void *(*process) (void *arg), void *arg)
- {
- /* 构造一个新任务 初始化 */
- Cthread_task *task = (Cthread_task *) malloc (sizeof (Cthread_task));
- task->process = process;
- task->arg = arg;
- task->next = NULL;
- pthread_mutex_lock (&(pool->queue_lock));
- /* 将任务加入到等待队列中 加到链表尾部或空成员中 */
- Cthread_task *member = pool->queue_head;
- if (member != NULL)
- {
- while (member->next != NULL)
- member = member->next;
- member->next = task;
- }
- else
- {
- pool->queue_head = task;
- }
- pool->cur_task_size++;
- pthread_mutex_unlock (&(pool->queue_lock));
- pthread_cond_signal (&(pool->queue_ready)); // 唤醒线程
- return 0;
- }
- /* 销毁线程池, 等待队列中的任务不会再被执行, 但是正在运行的线程会一直
- 把任务运行完后再退出 */
- int pool_destroy ()
- {
- if (pool->shutdown)
- return -1;/* 防止两次调用 */
- pool->shutdown = 1;
- /* 唤醒所有等待线程, 线程池要销毁了 */
- pthread_cond_broadcast (&(pool->queue_ready));
- /* 阻塞等待线程退出, 否则就成僵尸了 */
- int i;
- for (i = 0; i <pool->max_thread_num; i++)
- pthread_join (pool->threadid[i], NULL);
- free (pool->threadid);
- /* 销毁等待队列 */
- Cthread_task *head = NULL;
- while (pool->queue_head != NULL)
- {
- head = pool->queue_head;
- pool->queue_head = pool->queue_head->next;
- free (head);
- }
- /* 条件变量和互斥量也别忘了销毁 */
- pthread_mutex_destroy(&(pool->queue_lock));
- pthread_cond_destroy(&(pool->queue_ready));
- free (pool);
- /* 销毁后指针置空是个好习惯 */
- pool=NULL;
- return 0;
- }
- void * thread_routine (void *arg)
- {
- printf ("starting thread 0x%x\n", pthread_self ());
- while (1)
- {
- pthread_mutex_lock (&(pool->queue_lock));
- while (pool->cur_task_size == 0 && !pool->shutdown) // 当前没有任务, 线程池处于非关闭
- {
- printf ("thread 0x%x is waiting\n", pthread_self ());
- pthread_cond_wait (&(pool->queue_ready), &(pool->queue_lock)); // 线程等待
- }
- /* 线程池要销毁了 */
- if (pool->shutdown)
- {
- /* 遇到 break,continue,return 等跳转语句, 千万不要忘记先解锁 */
- pthread_mutex_unlock (&(pool->queue_lock));
- printf ("thread 0x%x will exit\n", pthread_self ());
- pthread_exit (NULL);
- }
- printf ("thread 0x%x is starting to work\n", pthread_self ());
- /* 待处理任务减 1, 并取出链表中的头元素 */
- pool->cur_task_size--; // 等待任务 - 1
- Cthread_task *task = pool->queue_head; // 取第一个任务处理
- pool->queue_head = task->next; // 链表头指向下一个任务
- pthread_mutex_unlock (&(pool->queue_lock));
- /* 调用回调函数, 执行任务 */
- (*(task->process)) (task->arg);
- free (task);
- task = NULL;
- }
- /* 这一句应该是不可达的 */
- pthread_exit (NULL);
- }
- void * myprocess (void *arg)
- {
- printf ("threadid is 0x%x, working on task %d\n", pthread_self (),*(int *) arg);
- sleep (1);/* 休息一秒, 延长任务的执行时间 */
- return NULL;
- }
- int main (int argc, char **argv)
- {
- pool_init (3);/* 创建线程池, 线程池中最多三个活动线程 */
- /* 连续向池中投入 10 个任务 */
- int *workingnum = (int *) malloc (sizeof (int) * 10);
- int i;
- for (i = 0; i < 10; i++)
- {
- workingnum[i] = i;
- pool_add_task (myprocess, &workingnum[i]);
- }
- /* 等待所有任务完成 */
- sleep (5);
- /* 销毁线程池 */
- pool_destroy ();
- free (workingnum);
- return 0;
- }
来源: http://www.bubuko.com/infodetail-3109179.html