多线程进阶 => JUC 并发编程
1, 什么是 JUC
源码 + 官方文档 面试高频问!
java.util 工具包, 包, 分类
业务: 普通的线程代码 Thread
Runnable 没有返回值, 效率相比于 Callable 相对较低!
2, 线程和进程
- public synchronized void start() {
- /**
- * This method is not invoked for the main method thread or "system"
- * group threads created/set up by the VM. Any new functionality added
- * to this method in the future may have to also be added to the VM.
- *
- * A zero status value corresponds to state "NEW".
- */
- if (threadStatus != 0)
- throw new IllegalThreadStateException();
- /* Notify the group that this thread is about to be started
- * so that it can be added to the group's list of threads
- * and the group's unstarted count can be decremented. */
- group.add(this);
- boolean started = false;
- try {
- start0();
- started = true;
- } finally {
- try {
- if (!started) {
- group.threadStartFailed(this);
- }
- } catch (Throwable ignore) {
- /* do nothing. If start0 threw a Throwable then
- it will be passed up the call stack */
- }
- }
- }
- // 本地方法, 底层的 C++ , java 无法直接操作硬件
- private native void start0();
- package com.kuang;
- public class demo1 {
- public static void main(String[] args) {
- // 获取 CPU 的核数
- // CPU 密集型, IO 密集型
- System.out.println(Runtime.getRuntime().availableProcessors());
- }
- }
并发编程的本质: 充分利用 CPU 的资源
所有的公司都很看重!
企业, 挣钱 => 提高效率, 裁员, 找一个厉害的人顶替三个不怎么样的人;
人员 (减), 技术成本 (高)
2.1, 线程有几个状态
- public enum State {
- // 新生
- NEW,
- // 运行
- RUNNABLE,
- // 阻塞
- BLOCKED,
- // 等待, 死死地等
- WAITING,
- // 超时等待, 等不下去了就不等
- TIMED_WAITING,
- // 终止
- TERMINATED;
- }
wait/sleep 区别
来自不同的类
- wait => Object
- sleep => Thread
企业当中, 休眠, TimeUnit 类
关于锁的释放
wait 会释放锁
sleep 睡觉了, 抱着锁睡觉, 不会释放锁!
使用的范围是不同的
wait 必须在同步代码块中
sleep 可以在任何地方睡
是否需要捕获异常 (二者都需要捕获异常)
wait 不需要捕获异常
sleep 必须要捕获异常
3,Lock 锁 (重点)
3.1, 传统 synchronized
3.2,Lock 接口
公平锁: 十分公平, 可以先来后到
非公平锁: 十分不公平, 可以插队 (默认)
- package com.kuang;
- import java.util.concurrent.locks.Lock;
- import java.util.concurrent.locks.ReentrantLock;
- public class SaleTicketDemo {
- public static void main(String[] args) {
- Ticket ticket = new Ticket();
- new Thread(() -> { for (int i = 0; i <30; i++) ticket.sale(); }, "A").start();
- new Thread(() -> { for (int i = 0; i <30; i++) ticket.sale(); }, "B").start();
- new Thread(() -> { for (int i = 0; i <30; i++) ticket.sale(); }, "C").start();
- }
- }
- class Ticket {
- private int number = 30;
- Lock lock = new ReentrantLock();
- public void sale() {
- lock.lock();
- try {
- if (number> 0) {
- System.out.println(Thread.currentThread().getName() + "剩余" + (--number) + "张");
- }
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- lock.unlock();
- }
- }
- }
3.3,synchronized 和 Lock 区别
4, 生产者和消费者问题
面试的: 单例模式, 排序算法, 生产者和消费者, 死锁
4.1, 生产者和消费者问题 synchronized 版
- package com.kuang.pc;
- public class A {
- public static void main(String[] args) {
- Data data = new Data();
- new Thread(()->{
- try {
- for (int i = 0; i <10; i++)
- data.increament();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- },"A").start();
- new Thread(()->{
- try {
- for (int i = 0; i <10; i++)
- data.decreament();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- },"B").start();
- }
- }
- class Data{
- private int number = 0;
- public synchronized void increament() throws InterruptedException {
- if (number!=0){
- wait();
- }
- number++;
- System.out.println(Thread.currentThread().getName()+"==>"+number);
- notifyAll();
- }
- public synchronized void decreament() throws InterruptedException {
- if (number==0){
- wait();
- }
- number--;
- System.out.println(Thread.currentThread().getName()+"==>"+number);
- notifyAll();
- }
- }
问题存在, A B C D 4 个线程! 虚假唤醒
if 改成 while
- package com.kuang.pc;
- public class A {
- public static void main(String[] args) {
- Data data = new Data();
- new Thread(()->{
- try {
- for (int i = 0; i <10; i++)
- data.increament();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- },"A").start();
- new Thread(()->{
- try {
- for (int i = 0; i <10; i++)
- data.decreament();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- },"B").start();
- new Thread(()->{
- try {
- for (int i = 0; i <10; i++)
- data.increament();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- },"C").start();
- new Thread(()->{
- try {
- for (int i = 0; i <10; i++)
- data.decreament();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- },"D").start();
- }
- }
- class Data{
- private int number = 0;
- public synchronized void increament() throws InterruptedException {
- while (number!=0){
- wait();
- }
- number++;
- System.out.println(Thread.currentThread().getName()+"==>"+number);
- notifyAll();
- }
- public synchronized void decreament() throws InterruptedException {
- while (number==0){
- wait();
- }
- number--;
- System.out.println(Thread.currentThread().getName()+"==>"+number);
- notifyAll();
- }
- }
4.2,JUC 版的生产者和消费者问题
代码实现:
- package com.kuang.pc;
- import java.util.concurrent.locks.Condition;
- import java.util.concurrent.locks.Lock;
- import java.util.concurrent.locks.ReentrantLock;
- public class A {
- public static void main(String[] args) {
- Data data = new Data();
- new Thread(() -> {
- try {
- for (int i = 0; i <10; i++)
- data.increament();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }, "A").start();
- new Thread(() -> {
- try {
- for (int i = 0; i <10; i++)
- data.decreament();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }, "B").start();
- new Thread(() -> {
- try {
- for (int i = 0; i <10; i++)
- data.increament();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }, "C").start();
- new Thread(() -> {
- try {
- for (int i = 0; i <10; i++)
- data.decreament();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }, "D").start();
- }
- }
- class Data {
- private int number = 0;
- Lock lock = new ReentrantLock();
- Condition condition = lock.newCondition();
- public void increament() throws InterruptedException {
- lock.lock();
- try {
- while (number != 0) {
- condition.await();
- }
- number++;
- System.out.println(Thread.currentThread().getName() + "==>" + number);
- condition.signalAll();
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- lock.unlock();
- }
- }
- public void decreament() throws InterruptedException {
- lock.lock();
- try {
- while (number == 0) {
- condition.await();
- }
- number--;
- System.out.println(Thread.currentThread().getName() + "==>" + number);
- condition.signalAll();
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- lock.unlock();
- }
- }
- }
任何一个新的技术, 绝对不仅仅是覆盖了原来的技术, 一定会有优势和补充!
4.3,Condition----- 精准的通知和唤醒线程
代码测试:
- package com.kuang.pc;
- import java.util.concurrent.locks.Condition;
- import java.util.concurrent.locks.Lock;
- import java.util.concurrent.locks.ReentrantLock;
- public class A {
- public static void main(String[] args) {
- Data data = new Data();
- new Thread(() -> {
- for (int i = 0; i <10; i++)
- data.printA();
- }, "A").start();
- new Thread(() -> {
- for (int i = 0; i <10; i++)
- data.printB();
- }, "B").start();
- new Thread(() -> {
- for (int i = 0; i <10; i++)
- data.printC();
- }, "C").start();
- }
- }
- class Data {
- private int number = 1;
- private Lock lock = new ReentrantLock();
- private Condition condition1 = lock.newCondition();
- private Condition condition2 = lock.newCondition();
- private Condition condition3 = lock.newCondition();
- public void printA() {
- lock.lock();
- try {
- while (number!=0){
- condition1.await();
- }
- number++;
- System.out.println(Thread.currentThread().getName()+"=>AAAAAA");
- condition2.signal();
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- lock.unlock();
- }
- }
- public void printB() {
- lock.lock();
- try {
- while (number!=1){
- condition2.await();
- }
- number++;
- System.out.println(Thread.currentThread().getName()+"=>BBBBBB");
- condition3.signal();
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- lock.unlock();
- }
- }
- public void printC() {
- lock.lock();
- try {
- while (number!=2){
- condition3.await();
- }
- number=0;
- System.out.println(Thread.currentThread().getName()+"=>CCCCCC");
- condition1.signal();
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- lock.unlock();
- }
- }
- }
5,8 锁现象
如何判断锁的对象! 永远知道什么是锁, 锁到底锁的是谁?
对象, Class
6, 集合类不安全
6.1,List 不安全
6.2,Set 不安全
hashSet 底层是什么?
6.3,Map 不安全
回顾 Map 的基本操作
7,Callable
Callable 和 Runnable 的区别:
可以有返回值
可以抛出异常
方法不同, call()/ run()
代码测试:
- package com.kuang;
- import java.util.concurrent.Callable;
- import java.util.concurrent.ExecutionException;
- import java.util.concurrent.FutureTask;
- public class Test {
- public static void main(String[] args) throws ExecutionException, InterruptedException {
- // 适配类
- FutureTask futureTask = new FutureTask(new Callable() {
- @Override
- public Integer call() throws Exception {
- System.out.println("call()");
- return 1024;
- }
- });
- new Thread(futureTask,"A").start();
- new Thread(futureTask,"B").start(); // 结果会被缓存, 效率高
- // 获取 Callable 返回结果并打印
- System.out.println(futureTask.get());
- }
- }
细节:
有缓存
结果可能需要等待, 会阻塞!
8, 常用的辅助类 (必会)
8.1,CountDownLatch
- package com.kuang;
- import java.util.concurrent.CountDownLatch;
- // 计数器
- public class CountDownLatchDemo {
- public static void main(String[] args) throws InterruptedException {
- // 总数是 6, 必须要执行任务的时候, 再使用!
- CountDownLatch countDownLatch = new CountDownLatch(6);
- for (int i = 1; i <= 6; i++) {
- new Thread(() -> {
- System.out.println(Thread.currentThread().getName()+"Go Out");
- countDownLatch.countDown();// 数量减一
- }, String.valueOf(i)).start();
- }
- countDownLatch.await();// 等待计数器归零, 再向下执行
- System.out.println("Close Door");
- }
- }
8.2,CyclicBarrier
加法计数器
- package com.kuang;
- import java.util.concurrent.BrokenBarrierException;
- import java.util.concurrent.CyclicBarrier;
- public class CyclicBarrierDemo {
- public static void main(String[] args) {
- CyclicBarrier cyclicBarrier = new CyclicBarrier(7, () -> {
- System.out.println("召唤神龙!");
- });
- for (int i = 1; i <= 7; i++) {
- final int temp = i;
- new Thread(() -> {
- System.out.println(Thread.currentThread().getName()+"收集了"+temp+"颗龙珠");
- try {
- cyclicBarrier.await();
- } catch (InterruptedException e) {
- e.printStackTrace();
- } catch (BrokenBarrierException e) {
- e.printStackTrace();
- }
- }, String.valueOf(i)).start();
- }
- }
- }
- 8.3,Semaphore
Semaphore: 信号量
- package com.kuang;
- import java.util.concurrent.Semaphore;
- import java.util.concurrent.TimeUnit;
- public class SemaphoreDemo {
- public static void main(String[] args) {
- // 线程数量: 停车位! 限流!
- Semaphore semaphore = new Semaphore(3);
- for (int i = 1; i <= 6; i++) {
- new Thread(() -> {
- //acquire(); 得到
- try {
- semaphore.acquire();
- System.out.println(Thread.currentThread().getName()+"抢到车位");
- TimeUnit.SECONDS.sleep(2);
- System.out.println(Thread.currentThread().getName()+"离开车位");
- } catch (InterruptedException e) {
- e.printStackTrace();
- }finally {
- semaphore.release();
- }
- //release() 释放
- }, String.valueOf(i)).start();
- }
- }
- }
9, 读写锁
ReadWriteLock
- package com.kuang;
- import java.util.HashMap;
- import java.util.Map;
- import java.util.concurrent.locks.ReadWriteLock;
- import java.util.concurrent.locks.ReentrantReadWriteLock;
- /**
- * 独占锁 (写锁) 一次只能被一个线程占有
- * 共享锁 (读锁) 多个线程可以同时占有
- */
- public class ReadWriteLockDemo {
- public static void main(String[] args) {
- MyCache myCache = new MyCache();
- for (int i = 1; i <= 10; i++) {
- final int temp = i;
- new Thread(() -> {
- myCache.put(temp+"",temp+"");
- }, String.valueOf(i)).start();
- }
- for (int i = 1; i <= 10; i++) {
- final int temp = i;
- new Thread(() -> {
- myCache.get(temp+"");
- }, String.valueOf(i)).start();
- }
- }
- }
- class MyCache{
- private volatile Map<String,String> map = new HashMap<>();
- private ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
- public void put(String key,String value){
- readWriteLock.writeLock().lock();
- try {
- System.out.println(Thread.currentThread().getName()+"写入");
- map.put(key,value);
- System.out.println(Thread.currentThread().getName()+"写入成功");
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- readWriteLock.writeLock().unlock();
- }
- }
- public void get(String key){
- readWriteLock.readLock().lock();
- try {
- System.out.println(Thread.currentThread().getName()+"读取");
- map.get(key);
- System.out.println(Thread.currentThread().getName()+"读取成功");
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- readWriteLock.readLock().unlock();
- }
- }
- }
10, 阻塞队列
阻塞队列:
BlockingQueue BlockingQueue 不是新的东西
SynchronousQueue 同步队列
代码略
11, 线程池 (重点)
线程池: 三大方法, 7 大参数, 4 种拒绝策略
线程池: 三大方法
- package com.kuang;
- import java.util.concurrent.Executor;
- import java.util.concurrent.ExecutorService;
- import java.util.concurrent.Executors;
- public class Demo {
- public static void main(String[] args) {
- // Executors.newSingleThreadExecutor();
- ExecutorService threadPool = Executors.newCachedThreadPool();
- // Executors.newFixedThreadPool(3);
- try {
- for (int i = 0; i <10; i++) {
- threadPool.execute(() -> {
- System.out.println(Thread.currentThread().getName());
- });
- }
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- threadPool.shutdown();
- }
- }
- }
7 大参数
源码探究
- public static ExecutorService newSingleThreadExecutor() {
- return new FinalizableDelegatedExecutorService
- (new ThreadPoolExecutor(1, 1,
- 0L, TimeUnit.MILLISECONDS,
- new LinkedBlockingQueue<Runnable>()));
- }
- public static ExecutorService newCachedThreadPool() {
- return new ThreadPoolExecutor(0, Integer.MAX_VALUE,//21 亿
- 60L, TimeUnit.SECONDS,
- new SynchronousQueue<Runnable>());
- }
- public static ExecutorService newFixedThreadPool(int nThreads) {
- return new ThreadPoolExecutor(nThreads, nThreads,
- 0L, TimeUnit.MILLISECONDS,
- new LinkedBlockingQueue<Runnable>());
- }
- // 本质 ThreadPoolExecutor()
- public ThreadPoolExecutor(int corePoolSize,// 核心线程池大小
- int maximumPoolSize,// 最大核心线程池大小
- long keepAliveTime,// 超时了没有人调用就会释放
- TimeUnit unit,// 超时单位
- BlockingQueue<Runnable> workQueue,// 阻塞队列
- ThreadFactory threadFactory,// 线程工厂, 创建线程的, 一般不用动
- RejectedExecutionHandler handler// 拒绝策略) {
- if (corePoolSize <0 ||
- maximumPoolSize <= 0 ||
- maximumPoolSize < corePoolSize ||
- keepAliveTime < 0)
- throw new IllegalArgumentException();
- if (workQueue == null || threadFactory == null || handler == null)
- throw new NullPointerException();
- this.corePoolSize = corePoolSize;
- this.maximumPoolSize = maximumPoolSize;
- this.workQueue = workQueue;
- this.keepAliveTime = unit.toNanos(keepAliveTime);
- this.threadFactory = threadFactory;
- this.handler = handler;
- }
手动创建一个线程池
- package com.kuang;
- import java.util.concurrent.*;
- public class Demo {
- public static void main(String[] args) {
- ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
- 2,
- 5,
- 3,
- TimeUnit.SECONDS,
- new LinkedBlockingDeque<>(3),
- Executors.defaultThreadFactory(),
- new ThreadPoolExecutor.AbortPolicy()
- );
- try {
- for (int i = 0; i <10; i++) {
- threadPool.execute(() -> {
- System.out.println(Thread.currentThread().getName());
- });
- }
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- threadPool.shutdown();
- }
- }
- }
四种拒绝策略
小结和拓展
了解: IO 密集型, CPU 密集型 (调优)
- package com.kuang;
- import java.util.concurrent.*;
- public class Demo {
- public static void main(String[] args) {
- //CPU 密集型
- ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
- 2,
- Runtime.getRuntime().availableProcessors(),
- 3,
- TimeUnit.SECONDS,
- new LinkedBlockingDeque<>(3),
- Executors.defaultThreadFactory(),
- new ThreadPoolExecutor.AbortPolicy()
- );
- try {
- for (int i = 0; i <10; i++) {
- threadPool.execute(() -> {
- System.out.println(Thread.currentThread().getName());
- });
- }
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- threadPool.shutdown();
- }
- }
- }
12, 四大函数式接口 (必须掌握)
新时代的程序员: lambda 表达式, 链式编程, 函数式接口, Stream 流式计算
函数式接口 有且只有一个方法的接口
- @FunctionalInterface
- public interface Runnable {
- public abstract void run();
- }
- // 超级多 FunctionalInterface
- // 简化编程模型, 在新版本的框架底层大量应用!
- //foreach(消费者类的函数式接口)
代码测试:
Function 函数式接口
断定型接口 有一个输入参数, 返回值只能式 布尔值!
Consumer 消费型接口
Supplier 供给型接口
13,Stream 流式计算
什么是 Stream 流式计算
大数据: 存储 + 计算
存储: 集合, MySQL 本质就是存储东西的
计算都应该交给流来操作!
14,ForkJoin
什么是 ForkJoin
ForkJoin 在 JDK1.7, 并行执行任务! 提高效率. 大数据量!
大数据: Map Reduce(把大任务拆分为小任务)
ForkJoin 特点: 工作窃取
这里面维护的都是双端队列
ForkJoin
测试:
15, 异步回调
16,JMM
线程 工作内存 主内存
8 种操作:
问题: 程序不知道主内存的值已经被修改过了
17,Volatile
保证可见性
不保证原子性
原子性: 不可分割
线程 A 在执行任务的时候, 不能被打扰的, 也不能被分割. 要么同时成功, 要么同时失败
如果不加 lock 和 synchronized , 怎么样保证原子性
使用原子类, 解决 原子性问题
这些类的底层都直接和操作系统挂钩! 在内存中修改值! Unsafe 类是一个很特殊的存在!
指令重排
什么是指令重排: 你写的程序, 计算机并不是按照你写的那样去执行的.
源代码 --> 编译器优化的重排 --> 指令并行也可能重排 --> 内存系统也会重排 --> 执行
18, 彻底玩转单例模式
饿汉式 DCL 懒汉式, 探究!
饿汉式
DCL 懒汉式
静态内部类
单例不安全, 反射
枚举
枚举类型的最终反编译源码:
19, 深入理解 CAS
什么是 CAS
大厂你必须要深入研究底层! 有所突破! 修内功, 操作系统, 计算机网络原理
Unsafe 类
CAS: 比较当前工作内存中的值和主内存中的值, 如果这个值是期望的, 那么则执行操作! 如果不是就一直循环!
缺点:
循环会耗时
一次性只能保证一个共享变量的原子性
存在 ABA 问题
CAS 之 ABA 问题 (狸猫换太子)
20, 原子引用
解决 ABA 问题, 引入原子引用! 对应的思想: 乐观锁!
带版本号的原子操作!
注意:
21, 各种锁的理解
公平锁, 非公平锁
非公平锁: 非常不公平, 可以插队 (默认都是非公平)
可重入锁
可重入锁 (递归锁)
synchronized 版 (代码略)
lock 版 (代码略)
自旋锁
spinlock
自定义的锁
测试:
死锁
死锁是什么?
死锁测试, 怎么排除死锁:
解决问题:
面试, 工作中! 排查问题:
日志 9 人 (领导不喜欢, 运维不喜欢)
堆栈 1 人 (领导喜欢, 此方法不用麻烦运维)
来源: http://www.bubuko.com/infodetail-3493972.html