原因排查
经过一个多小时的代码排查终于查明了线上程序线程数过多的原因: 这是一个接收 mq 消息的一个服务, 程序大体思路是这样的, 监听的线程每次收到一条消息, 就启动一个线程去执行, 每次启动的线程都是新的. 说到这里, 咱们就谈一谈这个程序有哪些弊端呢:
每次收到一条消息都创建一个新的线程, 要知道线程的资源对于系统来说是很昂贵的, 消息处理完成还要销毁这个线程.
这个程序用到的线程数量是没有限制的. 当线程到达一定数量, 程序反而因线程在 CPU 切换开销的原因处理效率降低. 无论的你的服务器 CPU 是多少核心, 这个现象都有发生的可能.
解决问题
线程多的问题该怎么解决呢, 增加 CPU 核心数? 治标不治本. 对于开发者而言, 最为常用也最为有效的是线程池化, 也就是说线程池.
线程池是一种多线程处理形式, 处理过程中将任务添加到队列, 然后在创建线程后自动启动这些任务. 这避免了在处理短时间任务时创建与销毁线程的代价. 线程池不仅能够保证内核的充分利用, 还能防止过分调度. 可用线程数量应该取决于可用的并发处理器, 处理器内核, 内存, 网络 sockets 等的数量. 例如, 线程数一般取 CPU 数量 + 2 比较合适, 线程数过多会导致额外的线程切换开销.
线程池其中一项很重要的技术点就是任务的队列, 队列虽然属于一种基础的数据结构, 但是发挥了举足轻重的作用.
队列
队列是一种特殊的线性表, 特殊之处在于它只允许在表的前端 (front) 进行删除操作, 而在表的后端 (rear) 进行插入操作, 和栈一样, 队列是一种操作受限制的线性表. 进行插入操作的端称为队尾, 进行删除操作的端称为队头.
队列是一种采用的 FIFO(first in first out)方式的线性表, 也就是经常说的先进先出策略.
实现
数组 队列可以用数组 Q[1...m]来存储, 数组的上界 m 即是队列所容许的最大容量. 在队列的运算中需设两个指针: head, 队头指针, 指向实际队头元素 + 1 的位置; tail, 队尾指针, 指向实际队尾元素位置. 一般情况下, 两个指针的初值设为 0, 这时队列为空, 没有元素. 以下为一个简单的实例(生产环境需要优化):
- public class QueueArray<T>
- {
- // 队列元素的数组容器
- T[] container = null;
- int IndexHeader, IndexTail;
- public QueueArray(int size)
- {
- container = new T[size];
- IndexHeader = 0;
- IndexTail = 0;
- }
- public void Enqueue(T item)
- {
- // 入队的元素放在头指针的指向位置, 然后头指针前移
- container[IndexHeader] = item;
- IndexHeader++;
- }
- public T Dequeue()
- {
- // 出队: 把尾元素指针指向的元素取出并清空 (不清空也可以) 对应的位置, 尾指针前移
- T item = container[IndexTail];
- container[IndexTail] = default(T);
- IndexTail++;
- return item;
- }
- }
链表 队列采用的 FIFO(first in first out), 新元素总是被插入到链表的尾部, 而读取的时候总是从链表的头部开始读取. 每次读取一个元素, 释放一个元素. 所谓的动态创建, 动态释放. 因而也不存在溢出等问题. 由于链表由元素连接而成, 遍历也方便. 以下是一个实例仅供参考:
- public class QueueLinkList<T>
- {
- LinkedList<T> contianer = null;
- public QueueLinkList()
- {
- contianer = new LinkedList<T>();
- }
- public void Enqueue(T item)
- {
- // 入队的元素其实就是加入到队尾
- contianer.AddLast(item);
- }
- public T Dequeue()
- {
- // 出队: 取链表第一个元素, 然后把这个元素删除
- T item = contianer.First.Value;
- contianer.RemoveFirst();
- return item;
- }
- }
- // 线程池
- public class ThreadPool
- {
- bool PoolEnable = false; // 线程池是否可用
- List<Thread> ThreadContainer = null; // 线程的容器
- ConcurrentQueue<ActionData> JobContainer = null; // 任务的容器
- public ThreadPool(int threadNumber)
- {
- PoolEnable = true;
- ThreadContainer = new List<Thread>(threadNumber);
- JobContainer = new ConcurrentQueue<ActionData>();
- for (int i = 0; i <threadNumber; i++)
- {
- var t = new Thread(RunJob);
- ThreadContainer.Add(t);
- t.Start();
- }
- }
- // 向线程池添加一个任务
- public void AddTask(Action<object> job,object obj, Action<Exception> errorCallBack=null)
- {
- if (JobContainer != null)
- {
- JobContainer.Enqueue(new ActionData { Job = job, Data = obj , ErrorCallBack= errorCallBack });
- }
- }
- // 终止线程池
- public void FinalPool()
- {
- PoolEnable = false;
- JobContainer = null;
- if (ThreadContainer != null)
- {
- foreach (var t in ThreadContainer)
- {
- // 强制线程退出并不好, 会有异常
- //t.Abort();
- t.Join();
- }
- ThreadContainer = null;
- }
- }
- private void RunJob()
- {
- while (true&& JobContainer!=null&& PoolEnable)
- {
- // 任务列表取任务
- ActionData job=null;
- JobContainer?.TryDequeue(out job);
- if (job == null)
- {
- // 如果没有任务则休眠
- Thread.Sleep(10);
- continue;
- }
- try
- {
- // 执行任务
- job.Job.Invoke(job.Data);
- }
- catch(Exception error)
- {
- // 异常回调
- job?.ErrorCallBack(error);
- }
- }
- }
- }
- public class ActionData
- {
- // 执行任务的参数
- public object Data { get; set; }
- // 执行的任务
- public Action<object> Job { get; set; }
- // 发生异常时候的回调方法
- public Action<Exception> ErrorCallBack { get; set; }
- }
- ThreadPool pool = new ThreadPool(100);
- for (int i = 0; i <5000; i++)
- {
- pool.AddTask((obj) =>
- {
- Console.WriteLine($"{obj}__{System.Threading.Thread.CurrentThread.ManagedThreadId}");
- }, i, (e) =>
- {
- Console.WriteLine(e.Message);
- });
- }
- pool.FinalPool();
- Console.Read();
来源: https://juejin.im/post/5c8626c4f265da2de33f5b5b