PS: 整理自极客时间《Java 并发编程》 https://time.geekbang.org/column/intro/159
1. 概述
三种性质
可见性: 一个线程对共享变量的修改, 另一个线程能立刻看到. 缓存可导致可见性问题.
原子性: 一个或多个 CPU 执行操作不被中断. 线程切换可导致原子性问题.
有序性: 编译器优化可能导致指令顺序发生改变. 编译器优化可能导致有序性问题.
三个问题
安全性问题: 线程安全
活跃性问题: 死锁, 活锁, 饥饿
性能问题:
使用无锁结构: TLS,Copy-On-Write, 乐观锁; Java 的原子类, Disruptor 无锁队列
减少锁的持有时间: 让锁细粒度. 如 ConcurrentHashmap; 再如读写锁, 读无锁写有锁
2. Java 内存模型
volatile
C 语言中的原意: 禁用 CPU 缓存, 从内存中读出和写入.
Java 语言的引申义:
Java 会将变量立刻写入内存, 其他线程读取时直接从内存读 (普通变量改变后, 什么时候写入内存是不一定的)
禁止指令重排序
解决问题:
保证可见性
保证有序性
不能保证原子性
Happens-Before 规则 (H-B)
程序顺序性规则: 前面执行的语句对后面语句可见
volatile 变量规则: volatile 变量的写操作对后续的读操作可见
传递性规则: A H-B B,B H-B C, 那么 A H-B C
管程中锁的规则: 对一个锁的解锁 H-B 于 后续对这个锁的加锁
3. 互斥锁 sychronized
锁对象: 非静态 this, 静态 Class, 括号 Object 参数
预防死锁:
互斥: 不能破坏
占有且等待: 同时申请所有资源
不可抢占: sychronized 解决不了, Lock 可以解决
循环等待: 给资源设置 id 字段, 每次都是按顺序申请锁
等待通知机制:
- wait,notify,notifyAll
- class Allocator {
- private List<Object> als;
- // 一次性申请所有资源
- synchronized void apply(
- Object from, Object to){
- // 经典写法
- while(als.contains(from) ||
- als.contains(to)){
- try{
- wait();
- }catch(Exception e){
- }
- }
- als.add(from);
- als.add(to);
- }
- // 归还资源
- synchronized void free(
- Object from, Object to){
- als.remove(from);
- als.remove(to);
- notifyAll();
- }
- }
4. 线程的生命周期
通用线程的生命周期:
Java 线程的生命周期:
状态流转:
RUNNABLE -- BLOCKED: 线程获取和等待 sychronized 隐式锁
ps: 调用阻塞式 API 时, 不会进入 BLOCKED 状态, 但对于操作系统而言, 线程实际上进入了休眠态, 只不过 JVM 不关心.
- RUNNABLE -- WAITING:
- Object.wait()
- Thread.join()
- LockSupport.park()
RUNNABLE -- TIMED-WAITING: 调用各种带超时参数的线程方法
NEW -- RUNNABLE:Thread.start()
RUNNABLE -- TERMINATED: 线程运行完毕, 有异常抛出, 或手动调用线程 stop()
6. 线程的性能指标
延迟: 发出请求到收到响应
吞吐量: 单位时间内处理的请求数量
最佳线程数:
CPU 密集型: 线程数 = CPU 核数 + 1
IO 密集型: 线程数 = (IO 耗时 / CPU 耗时 + 1)* CPU 核数
7. JDK 并发包
Lock:lock,unlock
互斥锁, 和 sychronized 一样的功能, 里面能保证可见性
Condition:await,signal
条件, 相比于 sychronized 的 Object.wait,Condition 可以实现多条件唤醒等待机制
Semaphore:acquire,release
信号量, 可以用来实现多个线程访问一个临界区, 如实现对象池设计中的限流器
ReadWriteLock:readLock,writeLock
写锁, 读锁, 允许多线程读, 一个线程写, 写锁持有时所有读锁和写锁的获取都阻塞 (写锁的获取要等所有读写锁释放)
适用于读多写少的场景
StampedLock:tryOptimisticRead,validate
写锁, 读锁 (分悲观读锁, 乐观读锁):
线程同步:
CountDownLatch: 一个线程等待多个线程
初始化 --> countDown(减 1) --> await(等待为 0)
CyclicBarrier: 一组线程之间相互等待
初始化 --> 设置回调函数 (为 0 时执行, 并返回原始值) --> await(减 1 并等待为 0)
并发容器:
List:
CopyOnWriteArrayList: 适用写少的场景, 要容忍可能的读不一致
Map:
ConcurrentHashMap: 分段锁
ConcurrentSkipListMap: 跳表
Set:
CopyOnWriteArraySet: 同上
ConcurrentSkipListSet: 同上
Queue:
分类: 阻塞 Blocking, 单端 Queue, 双端 Deque
单端阻塞 (BlockingQueue):Array~,Linked~,Sychronized~,LinkedTransfer~,Priority~,Delay~
双端阻塞 (BlockingDeque):Linked~
单端非阻塞 (Queue):ConcurrentLinked~
双端非阻塞 (Deque):ConcurrentLinked~
原子类:
无锁方案原理: 增加了硬件支持, 即 CPU 的 CAS 指令
ABA 问题: 有解决 ABA 问题的需求时, 增加一个递增的版本号纬度化解
分类: 原子化基本数据类型, 原子化引用类型, 原子化数组, 原子化对象属性更新器, 原子化累加器
- Future:
- Future:cancel,isCanceled,isDone,get
FutureTask: 实现了 Runnable 和 Future 接口
强大工具类
CompletableFuture: 一个强大的异步编程工具类 (任务之间有聚合关系), 暂时略
CompletionService: 批量并行任务, 暂时略
8. 线程池
设计原理:
用生产者消费者模型, 线程池是消费者, 调用者是生产者.
线程池对象里维护一个阻塞队列, 一个已经跑起来的工作线程组 ThreadsList
ThreadList 里面循环从队列中去 Runnable 任务, 并调用 run 方法
- // 简化的线程池, 仅用来说明工作原理
- class MyThreadPool{
- // 利用阻塞队列实现生产者 - 消费者模式
- BlockingQueue<Runnable> workQueue;
- // 保存内部工作线程
- List<WorkerThread> threads
- = new ArrayList<>();
- // 构造方法
- MyThreadPool(int poolSize,
- BlockingQueue<Runnable> workQueue){
- this.workQueue = workQueue;
- // 创建工作线程
- for(int idx=0; idx<poolSize; idx++){
- WorkerThread work = new WorkerThread();
- work.start();
- threads.add(work);
- }
- }
- // 提交任务
- void execute(Runnable command){
- workQueue.put(command);
- }
- // 工作线程负责消费任务, 并执行任务
- class WorkerThread extends Thread{
- public void run() {
- // 循环取任务并执行
- while(true){ 1
- Runnable task = workQueue.take();
- task.run();
- }
- }
- }
- }
- /** 下面是使用示例 **/
- // 创建有界阻塞队列
- BlockingQueue<Runnable> workQueue =
- new LinkedBlockingQueue<>(2);
- // 创建线程池
- MyThreadPool pool = new MyThreadPool(
- 10, workQueue);
- // 提交任务
- pool.execute(()->{
- System.out.println("hello");
- });
- ThreadPoolExcutor
参数
corePoolSize: 线程池保有的最小线程数
maximumPoolSize: 线程池创建的最大线程数
keepAliveTime: 工作线程多久没收到任务, 被认为是闲的
workQueue: 工作队列
threadFactory: 通过这个参数自定义如何创建线程
handler: 任务拒绝策略
默认为 AbortPolicy, 会抛出 RejectedExecutionException, 这是个运行时异常, 要注意
方法
- void execute()
- Future submit(Runnable task | Callable task)
9. 鸟瞰并行任务分类
来源: https://www.cnblogs.com/flashsun/p/10776168.html