并发包
java.util.concurrent 从 jdk1.5 开始新加入的一个包, 致力于解决并发编程的线程安全问题, 使用户能够更为快捷方便的编写多线程情况下的并发程序.
同步容器
同步容器只有包括 Vector 和 HashTable, 相比其他容器类只是多用了 Synchronize 的技术
Vector 与 ArrayList 区别
1.ArrayList 是最常用的 List 实现类, 内部是通过数组实现的, 它允许对元素进行快速随机访问. 数组的缺点是每个元素之间不能有间隔, 当数组大小不满足时需要增加存储能力, 就要讲已经有数组的数据复制到新的存储空间中. 当从 ArrayList 的中间位置插入或者删除元素时, 需要对数组进行复制, 移动, 代价比较高. 因此, 它适合随机查找和遍历, 不适合插入和删除.
2.Vector 与 ArrayList 一样, 也是通过数组实现的, 不同的是它支持线程的同步, 即某一时刻只有一个线程能够写 Vector, 避免多线程同时写而引起的不一致性, 但实现同步需要很高的花费, 因此, 访问它比访问 ArrayList 慢
注意: Vector 线程安全, ArrayList 线程不安全
HasTable 与 HasMap 区别
1.HashMap 不是线程安全的
HastMap 是一个接口 是 map 接口的子接口, 是将键映射到值的对象, 其中键和值都是对象, 并且不能包含重复键, 但可以包含重复值. HashMap 允许 null key 和 null value, 而 hashtable 不允许.
2.HashTable 是线程安全的一个 Collection.
3.HashMap 是 Hashtable 的轻量级实现 (非线程安全的实现), 他们都完成了 Map 接口, 主要区别在于 HashMap 允许空(null) 键值(key), 由于非线程安全, 效率上可能高于 Hashtable.
HashMap 允许将 null 作为一个 entry 的 key 或者 value, 而 Hashtable 不允许.
HashMap 把 Hashtable 的 contains 方法去掉了, 改成 containsvalue 和 containsKey.
注意: HashTable 线程安全, HashMap 线程不安全.
synchronizedMap synchronizedList
Collections.synchronized*(m) 将线程不安全集合变为线程安全集合
- Map m = Collections.synchronizedMap(new HashMap());
- List l = Collections.synchronizedList(new ArrayList());
- ConcurrentHashMap
ConcurrentHashMap 内部使用段 (Segment) 来表示这些不同的部分, 每个段其实就是一个小的 HashTable, 它们有自己的锁. 只要多个修改操作发生在不同的段上, 它们就可以并发进行. 把一个整体分成了 16 个段 (Segment) 也就是最高支持 16 个线程的并发修改操作.
这也是在重线程场景时减小锁的粒度从而降低锁竞争的一种方案. 并且代码中大多共享变量使用 volatile 关键字声明, 目的是第一时间获取修改的内容, 性能非常好.
大家可以点击加入群:[Java 高级架构进阶群] :854180697 里面有 Java 高级大牛直播讲解知识点 走的就是高端路线,(如果你想跳槽换工作 但是技术又不够 或者工作上遇到了瓶颈 , 我这里有一个 JAVA 的免费直播课程 , 讲的是高端的知识点基础不好的误入哟, 只要你有 1-5 年的开发经验可以加群找我要课堂链接 注意: 是免费的 没有开发经验误入哦)
并发容器
CountDownLatch
CountDownLatch 是 JAVA 提供在 java.util.concurrent 包下的一个辅助类, 可以把它看成是一个计数器, 其内部维护着一个 count 计数, 只不过对这个计数器的操作都是原子操作, 同时只能有一个线程去操作这个计数器, CountDownLatch 通过构造函数传入一个初始计数值, 调用者可以通过调用 CounDownLatch 对象的 cutDown()方法, 来使计数减 1; 如果调用对象上的 await()方法, 那么调用者就会一直阻塞在这里, 直到别人通过 cutDown 方法, 将计数减到 0, 才可以继续执行.
public class Test002 { public static void main(String[] args) throws InterruptedException { System.out.println("等待子线程执行完毕..."); CountDownLatch countDownLatch = new CountDownLatch(2); new Thread(new Runnable() { @Override public void run() { System.out.println("子线程," +Thread.currentThread().getName() + "开始执行..."); countDownLatch.countDown();// 每次减去 1 System.out.println("子线程," + Thread.currentThread().getName() + "结束执行..."); } }).start(); new Thread(new Runnable() { @Override public void run() { System.out.println("子线程," + Thread.currentThread().getName() + "开始执行..."); countDownLatch.countDown(); System.out.println("子线程," + Thread.currentThread().getName() + "结束执行..."); } }).start(); countDownLatch.await();// 调用当前方法主线程阻塞 countDown 结果为 0, 阻塞变为运行状态 System.out.println("两个子线程执行完毕...."); System.out.println("继续主线程执行.."); }}
CyclicBarrier
一个同步辅助类, 它允许一组线程互相等待, 直到到达某个公共屏障点 (common barrier point). 在涉及一组固定大小的线程的程序中, 这些线程必须不时地互相等待, 此时 CyclicBarrier 很有用. 因为该 barrier 在释放等待线程后可以重用, 所以称它为循环 的 barrier.
使用场景
需要所有的子任务都完成时, 才执行主任务, 这个时候就可以选择使用 CyclicBarrier.
public class CyclicBarrierTest { public static void main(String[] args) throws IOException, InterruptedException { // 如果将参数改为 4, 但是下面只加入了 3 个选手, 这永远等待下去 //Waits until all parties have invoked await on this barrier. CyclicBarrier barrier = new CyclicBarrier(3); ExecutorService executor = Executors.newFixedThreadPool(3); executor.submit(new Thread(new Runner(barrier, "1 号选手"))); executor.submit(new Thread(new Runner(barrier, "2 号选手"))); executor.submit(new Thread(new Runner(barrier, "3 号选手"))); executor.shutdown(); }}class Runner implements Runnable { // 一个同步辅助类, 它允许一组线程互相等待, 直到到达某个公共屏障点 (common barrier point) private CyclicBarrier barrier; private String name; public Runner(CyclicBarrier barrier, String name) { super(); this.barrier = barrier; this.name = name; } @Override public void run() { try { Thread.sleep(1000 * (new Random()).nextInt(8)); System.out.println(name + "准备好了..."); // barrier 的 await 方法, 在所有参与者都已经在此 barrier 上调用 await 方法之前, 将一直等待. barrier.await(); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } System.out.println(name + "起跑!"); }}
Semaphore
Semaphore 是计数信号量. Semaphore 管理一系列许可证. 每个 acquire 方法阻塞, 直到有一个许可证可以获得然后拿走一个许可证; 每个 release 方法增加一个许可证, 这可能会释放一个阻塞的 acquire 方法. 然而, 其实并没有实际的许可证这个对象, Semaphore 只是维持了一个可获得许可证的数量.
Semaphore 经常用于限制获取某种资源的线程数量
需求: 一个厕所只有 3 个坑位, 但是有 10 个人来上厕所, 那怎么办? 假设 10 个人的编号分别为 1-10, 并且 1 号先到厕所, 10 号最后到厕所. 那么 1-3 号来的时候必然有可用坑位, 顺利如厕, 4 号来的时候需要看看前面 3 人是否有人出来了, 如果有人出来, 进去, 否则等待. 同样的道理, 4-10 号也需要等待正在上厕所的人出来后才能进去, 并且谁先进去这得看等待的人是否有素质, 是否能遵守先来先上的规则.
class Parent implements Runnable { private String name; private Semaphore wc; public Parent(String name,Semaphore wc){ this.name=name; this.wc=wc; } @Override public void run() { try { // 剩下的资源(剩下的茅坑) int availablePermits = wc.availablePermits(); if (availablePermits> 0) { System.out.println(name+"天助我也, 终于有茅坑了..."); } else { System.out.println(name+"怎么没有茅坑了..."); } // 申请茅坑 如果资源达到 3 次, 就等待 wc.acquire(); System.out.println(name+"终于轮我上厕所了.. 爽啊"); Thread.sleep(new Random().nextInt(1000)); // 模拟上厕所时间. System.out.println(name+"厕所上完了..."); wc.release(); } catch (Exception e) { } }}public class TestSemaphore02 { public static void main(String[] args) { Semaphore semaphore = new Semaphore(3); for (int i = 1; i <=10; i++) { Parent parent = new Parent("第"+i+"个人,",semaphore); new Thread(parent).start(); } }}
并发队列
ConcurrentLinkedQueue
ConcurrentLinkedQueue : 是一个适用于高并发场景下的队列, 通过无锁的方式, 实现了高并发状态下的高性能, 通常 ConcurrentLinkedQueue 性能好于 BlockingQueue. 它是一个基于链接节点的无界线程安全队列. 该队列的元素遵循先进先出的原则. 头是最先加入的, 尾是最近加入的, 该队列不允许 null 元素.
add 和 offer() 都是加入元素的方法(在 ConcurrentLinkedQueue 中这俩个方法没有任何区别) poll() 和 peek() 都是取头元素节点, 区别在于前者会删除元素, 后者不会.
- public class ConcurrentLinkedQueueTest {
- private static ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue();
- private static int count = 2; // 线程个数
- //CountDownLatch, 一个同步辅助类, 在完成一组正在其他线程中执行的操作之前, 它允许一个或多个线程一直等待.
- private static CountDownLatch latch = new CountDownLatch(count); public static void main(String[] args) throws InterruptedException { long timeStart = System.currentTimeMillis(); ExecutorService es = Executors.newFixedThreadPool(4); ConcurrentLinkedQueueTest.offer(); for (int i = 0; i <count; i++) { es.submit(new Poll()); } latch.await(); // 使得主线程 (main) 阻塞直到 latch.countDown()为零才继续执行 System.out.println("cost time" + (System.currentTimeMillis() - timeStart) + "ms"); es.shutdown(); } /** * 生产 */ public static void offer() { for (int i = 0; i < 100000; i++) { queue.offer(i); } } static class Poll implements Runnable { public void run() { // while (queue.size()>0) { while (!queue.isEmpty()) { System.out.println(queue.poll()); } latch.countDown(); } }}
- BlockingQueue
阻塞队列 (BlockingQueue) 是一个支持两个附加操作的队列. 这两个附加的操作是:
在队列为空时, 获取元素的线程会等待队列变为非空.
当队列满时, 存储元素的线程会等待队列可用.
阻塞队列常用于生产者和消费者的场景, 生产者是往队列里添加元素的线程, 消费者是从队列里拿元素的线程. 阻塞队列就是生产者存放元素的容器, 而消费者也只从容器里拿元素
ArrayBlockingQueue
ArrayBlockingQueue 是一个有边界的阻塞队列, 它的内部实现是一个数组. 有边界的意思是它的容量是有限的, 我们必须在其初始化的时候指定它的容量大小, 容量大小一旦指定就不可改变.
ArrayBlockingQueue 是以先进先出的方式存储数据, 最新插入的对象是尾部, 最新移出的对象是头部.
LinkedBlockingQueue
LinkedBlockingQueue 阻塞队列大小的配置是可选的, 如果我们初始化时指定一个大小, 它就是有边界的, 如果不指定, 它就是无边界的. 说是无边界, 其实是采用了默认大小为 Integer.MAX_VALUE 的容量 . 它的内部实现是一个链表
SynchronousQueue
SynchronousQueue 队列内部仅允许容纳一个元素. 当一个线程插入一个元素后会被阻塞, 除非这个元素被另一个线程消费
- public class BlockingQueueTest2 { /** * * 定义装苹果的篮子 * */ public class Basket { // 篮子, 能够容纳 3 个苹果 BlockingQueue basket = new LinkedBlockingQueue(3); // 生产苹果, 放入篮子 public void produce() throws InterruptedException { // put 方法放入一个苹果, 若 basket 满了, 等到 basket 有位置 basket.put("An apple"); } // 消费苹果, 从篮子中取走 public String consume() throws InterruptedException { // take 方法取出一个苹果, 若 basket 为空, 等到 basket 有苹果为止(获取并移除此队列的头部) return basket.take(); } } // 定义苹果生产者 class Producer implements Runnable { private String instance; private Basket basket; public Producer(String instance, Basket basket) { this.instance = instance; this.basket = basket; } public void run() { try { while (true) { // 生产苹果 System.out.println("生产者准备生产苹果:" + instance); basket.produce(); System.out.println("! 生产者生产苹果完毕:" + instance); // 休眠 300ms Thread.sleep(300); } } catch (InterruptedException ex) { System.out.println("Producer Interrupted"); } } } // 定义苹果消费者 class Consumer implements Runnable { private String instance; private Basket basket; public Consumer(String instance, Basket basket) { this.instance = instance; this.basket = basket; } public void run() { try { while (true) { // 消费苹果 System.out.println("消费者准备消费苹果:" + instance); System.out.println(basket.consume()); System.out.println("! 消费者消费苹果完毕:" + instance); // 休眠 1000ms Thread.sleep(1000); } } catch (InterruptedException ex) { System.out.println("Consumer Interrupted"); } } } public static void main(String[] args) { BlockingQueueTest2 test = new BlockingQueueTest2(); // 建立一个装苹果的篮子 Basket basket = test.new Basket(); ExecutorService service = Executors.newCachedThreadPool(); Producer producer = test.new Producer("生产者 001", basket); Producer producer2 = test.new Producer("生产者 002", basket); Consumer consumer = test.new Consumer("消费者 001", basket); service.submit(producer); service.submit(producer2); service.submit(consumer); // 程序运行 5s 后, 所有任务停止 // try {// Thread.sleep(1000 * 5);// } catch (InterruptedException e) {// e.printStackTrace();// }// service.shutdownNow(); }}
- PriorityBlockingQueue
优先级阻塞队列, 该实现类需要自己实现一个继承了 Comparator 接口的类, 在插入资源时会按照自定义的排序规则来对资源数组进行排序. 其中值大的排在数组后面 , 取值时从数组头开始取
public class TestQueue{ static Logger logger = LogManager.getLogger(); static Random random = new Random(47); public static void main(String args[]) throws InterruptedException { PriorityBlockingQueue queue = new PriorityBlockingQueue(); ExecutorService executor = Executors.newCachedThreadPool(); executor.execute(new Runnable() { public void run() { int i = 0; while (true) { queue.put(new PriorityEntity(random.nextInt(10), i++)); try { TimeUnit.MILLISECONDS.sleep(random.nextInt(1000)); } catch (InterruptedException e) { logger.error(e); } } } }); executor.execute(new Runnable() { public void run() { while (true) { try { System.out.println("take--" + queue.take() + "left:-- [" + queue.toString() + "]"); try { TimeUnit.MILLISECONDS.sleep(random.nextInt(3000)); } catch (InterruptedException e) { logger.error(e); } } catch (InterruptedException e) { logger.error(e); } } } }); try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { logger.error(e); } } static class PriorityEntity implements Comparable { private static int count = 0; private int id = count++; private int priority; private int index = 0; public PriorityEntity(int _priority, int _index) { System.out.println("_priority :" + _priority); this.priority = _priority; this.index = _index; } public String toString() { return id + "# [index=" + index + "priority=" + priority + "]"; } // 数字小, 优先级高 public int compareTo(PriorityEntity o) { return this.priority> o.priority ? 1 : this.priority <o.priority ? -1 : 0; } }}
线程池
开发过程中, 合理地使用线程池可以带来 3 个好处:
降低资源消耗: 通过重复利用已创建的线程降低线程创建和销毁造成的消耗.
提高响应速度: 当任务到达时, 任务可以不需要等到线程创建就能立即执行.
提高线程的可管理性: 线程是稀缺资源, 如果无限制地创建, 不仅会消耗系统资源, 还会降低系统的稳定性, 使用线程池可以进行统一分配, 调优和监控.
线程池作用
线程池作用就是限制系统中执行线程的数量.
根据系统的环境情况, 可以自动或手动设置线程数量, 达到运行的最佳效果; 少了浪费了系统资源, 多了造成系统拥挤效率不高. 用线程池控制线程数量, 其他线程排队等候. 一个任务执行完毕, 再从队列的中取最前面的任务开始执行. 若队列中没有等待进程, 线程池的这一资源处于等待. 当一个新任务需要运行时, 如果线程池中有等待的工作线程, 就可以开始运行了; 否则进入等待队列.
线程池的分类
newCachedThreadPool
创建一个可缓存线程池, 如果线程池长度超过处理需要, 可灵活回收空闲线程, 若无可回收, 则新建线程
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue()); }public class ThreadPoolExecutorTest { public static void main(String[] args) { ExecutorService cachedThreadPool = Executors.newCachedThreadPool(); for (int i = 0; i < 10; i++) { final int index = i; try { Thread.sleep(index * 1000); } catch (InterruptedException e) { e.printStackTrace(); } cachedThreadPool.execute(new Runnable() { public void run() { System.out.println(index); } }); } } }
线程池为无限大, 当执行第二个任务时第一个任务已经完成, 会复用执行第一个任务的线程, 而不用每次新建线程
newFixedThreadPool
创建一个定长线程池, 可控制线程最大并发数, 超出的线程会在队列中等待.
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue());}public class ThreadPoolExecutorTest { public static void main(String[] args) { ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3); for (int i = 0; i < 10; i++) { final int index = i; fixedThreadPool.execute(new Runnable() { public void run() { try { System.out.println(index); Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } } }); } } }
因为线程池大小为 3, 每个任务输出 index 后 sleep 2 秒, 所以每两秒打印 3 个数字. 定长线程池的大小最好根据系统资源进行设置. 如 Runtime.getRuntime().availableProcessors()
newScheduledThreadPool
创建一个定长线程池, 支持定时及周期性任务执行.
public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue()); }public class ThreadPoolExecutorTest { public static void main(String[] args) { ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5); scheduledThreadPool.schedule(new Runnable() { public void run() { System.out.println("delay 3 seconds"); } }, 3, TimeUnit.SECONDS); } }
表示延迟 3 秒执行
public class ThreadPoolExecutorTest { public static void main(String[] args) { ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5); scheduledThreadPool.scheduleAtFixedRate(new Runnable() { public void run() { System.out.println("delay 1 seconds, and excute every 3 seconds"); } }, 1, 3, TimeUnit.SECONDS); } }
表示延迟 1 秒后每 3 秒执行一次
newSingleThreadExecutor
创建一个单线程化的线程池, 它只会用唯一的工作线程来执行任务, 保证所有任务按照指定顺序 (FIFO, LIFO, 优先级) 执行.
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue())); }
阿里发布的 Java 开发手册中强制线程池不允许使用 Executors 去创建, 而是通过 ThreadPoolExecutor 的方式, 这样的处理方式让写的同学更加明确线程池的运行规则, 规避资源耗尽的风险
- ThreadPoolExecutor public ThreadPoolExecutor(int corePoolSize,
- int maximumPoolSize,
- long keepAliveTime,
- TimeUnit unit,
- BlockingQueue workQueue,
- ThreadFactory threadFactory,
- RejectedExecutionHandler handler)
corePoolSize - 线程池核心池的大小.
maximumPoolSize - 线程池的最大线程数.
keepAliveTime - 当线程数大于核心时, 此为终止前多余的空闲线程等待新任务的最长时间.
unit - keepAliveTime 的时间单位.
workQueue - 用来储存等待执行任务的队列.
threadFactory - 线程工厂.
handler - 拒绝策略.
线程优先级:
corePoolSize> workQueue> maximumPoolSize>handler(拒绝)
拒绝策略:
CallerRunsPolicy : 这个策略重试添加当前的任务, 他会自动重复调用 execute() 方法, 直到成功.
AbortPolicy : 对拒绝任务抛弃处理, 并且抛出异常.(默认使用的)
DiscardPolicy : 对拒绝任务直接无声抛弃, 没有异常信息.
DiscardOldestPolicy : 对拒绝任务不抛弃, 而是抛弃队列里面等待最久的一个线程, 然后把拒绝任务加到队列.
合理配置线程池
要想合理的配置线程池, 就必须首先分析任务特性, 可以从以下几个角度来进行分析:
任务的性质: CPU 密集型任务, IO 密集型任务和混合型任务.
任务的优先级: 高, 中和低.
任务的执行时间: 长, 中和短.
任务的依赖性: 是否依赖其他系统资源, 如数据库连接.
任务性质不同的任务可以用不同规模的线程池分开处理. CPU 密集型任务配置尽可能少的线程数量, 如配置 Ncpu+1 个线程的线程池. IO 密集型任务则由于需要等待 IO 操作, 线程并不是一直在执行任务, 则配置尽可能多的线程, 如 2*Ncpu. 混合型的任务, 如果可以拆分, 则将其拆分成一个 CPU 密集型任务和一个 IO 密集型任务, 只要这两个任务执行的时间相差不是太大, 那么分解后执行的吞吐率要高于串行执行的吞吐率, 如果这两个任务执行时间相差太大, 则没必要进行分解. 我们可以通过 Runtime.getRuntime().availableProcessors()方法获得当前设备的 CPU 个数.
优先级不同的任务可以使用优先级队列 PriorityBlockingQueue 来处理. 它可以让优先级高的任务先得到执行, 需要注意的是如果一直有优先级高的任务提交到队列里, 那么优先级低的任务可能永远不能执行.
执行时间不同的任务可以交给不同规模的线程池来处理, 或者也可以使用优先级队列, 让执行时间短的任务先执行.
依赖数据库连接池的任务, 因为线程提交 SQL 后需要等待数据库返回结果, 如果等待的时间越长 CPU 空闲时间就越长, 那么线程数应该设置越大, 这样才能更好的利用 CPU.
CPU 密集型时, 任务可以少配置线程数, 大概和机器的 cpu 核数相当, 这样可以使得每个线程都在执行任务 IO 密集型时, 大部分线程都阻塞, 故需要多配置线程数, 2*cpu 核数 操作系统之名称解释: 某些进程花费了绝大多数时间在计算上, 而其他则在等待 I/O 上花费了大多是时间, 前者称为计算密集型(CPU 密集型)computer-bound, 后者称为 I/O 密集型, I/O-bound.
写在最后: 欢迎留言讨论, 加关注, 持续更新!!!
来源: http://www.jianshu.com/p/21d4e64bad49