java 是一种可以撰写跨平台应用软件的面向对象的程序设计语言,是由 Sun Microsystems 公司于 1995 年 5 月推出的 Java 程序设计语言和 Java 平台(即 JavaEE(j2ee), JavaME(j2me), JavaSE(j2se))的总称。
这篇文章主要介绍了 Java 多线程并发编程的相关资料,非常不错,具有参考借鉴价值, 需要的朋友可以参考下
一、多线程
1、操作系统有两个容易混淆的概念,进程和线程。
进程:一个计算机程序的运行实例,包含了需要执行的指令;有自己的独立地址空间,包含程序内容和数据;不同进程的地址空间是互相隔离的;进程拥有各种资源和状态信息,包括打开的文件、子进程和信号处理。
线程:表示程序的执行流程,是 CPU 调度执行的基本单位;线程有自己的程序计数器、寄存器、堆栈和帧。同一进程中的线程共用相同的地址空间,同时共享进进程锁拥有的内存和其他资源。
2、Java 标准库提供了进程和线程相关的 API,进程主要包括表示进程的 java.lang.Process 类和创建进程的 java.lang.ProcessBuilder 类;
表示线程的是 java.lang.Thread 类,在虚拟机启动之后,通常只有 Java 类的 main 方法这个普通线程运行,运行时可以创建和启动新的线程;还有一类守护线程(damon thread),守护线程在后台运行,提供程序运行时所需的服务。当虚拟机中运行的所有线程都是守护线程时,虚拟机终止运行。
3、线程间的可见性:一个线程对进程中共享的数据的修改,是否对另一个线程可见
可见性问题:
a、CPU 采用时间片轮转等不同算法来对线程进行调度
- public class IdGenerator {
- private int value = 0;
- public int getNext() {
- return value++;
- }
- }
对于 IdGenerator 的 getNext() 方法,在多线程下不能保证返回值是不重复的:各个线程之间相互竞争 CPU 时间来获取运行机会,CPU 切换可能发生在执行间隙。
以上代码 getNext() 的指令序列:CPU 切换可能发生在 7 条指令之间,多个 getNext 的指令交织在一起。
- aload_0
- dup
- getfield #12
- dup_x1
- iconst_1
- iadd
- putfield #12
b、CPU 缓存:
目前 CPU 一般采用层次结构的多级缓存的架构,有的 CPU 提供了 L1、L2 和 L3 三级缓存。当 CPU 需要读取主存中某个位置的数据时,会一次检查各级缓存中是否存在对应的数据。如果有,直接从缓存中读取,这比从主存中读取速度快很多。当 CPU 需要写入时,数据先被写入缓存中,之后再某个时间点写回主存。所以某些时间点上,缓存中的数据与主存中的数据可能是不一致。
c、指令顺序重排
出行性能考虑,编译器在编译时可能会对字节代码的指令顺序进行重新排列,以优化指令的执行顺序,在单线程中不会有问题,但在多线程可能产生与可见性相关的问题。
二、Java 内存模型(Java Memory Model)
屏蔽了 CPU 缓存等细节,只关注主存中的共享变量;关注对象的实例域、静态域和数组元素;关注线程间的动作。
1、volatile 关键词:用来对共享变量的访问进行同步,上一次写入操作的结果对下一次读取操作是肯定可见的。(在写入 volatile 变量值之后,CPU 缓存中的内容会被写回内存;在读取 volatile 变量时,CPU 缓存中的对应内容会被置为失效,重新从主存中进行读取),volatile 不使用锁,性能优于 synchronized 关键词。
用来确保对一个变量的修改被正确地传播到其他线程中。
例子:A 线程是 Worker,一直跑循环,B 线程调用 setDone(true),A 线程即停止任务
- public class Worker {
- private volatile boolean done;
- public void setDone(boolean done) {
- this.done = done;
- }
- public void work() {
- while (!done) {
- //执行任务;
- }
- }
- }
例子:错误使用。因为没有锁的支持,volatile 的修改不能依赖于当前值,当前值可能在其他线程中被修改。(Worker 是直接赋新值与当前值无关)
- public class Counter {
- public volatile static int count = 0;
- public static void inc() {
- //这里延迟1毫秒,使得结果明显
- try {
- Thread.sleep(1);
- } catch (InterruptedException e) {
- }
- count++;
- }
- public static void main(String[] args) {
- //同时启动1000个线程,去进行i++计算,看看实际结果
- for (int i = 0; i < 1000; i++) {
- new Thread(new Runnable() {
- @Override
- public void run() {
- Counter.inc();
- }
- }).start();
- }
- //这里每次运行的值都有可能不同,可能不为1000
- System.out.println("运行结果:Counter.count=" + Counter.count);
- }
- }
2、final 关键词
final 关键词声明的域的值只能被初始化一次,一般在构造方法中初始化。。(在多线程开发中,final 域通常用来实现不可变对象)
当对象中的共享变量的值不可能发生变化时,在多线程中也就不需要同步机制来进行处理,故在多线程开发中应尽可能使用不可变对象。
另外,在代码执行时,final 域的值可以被保存在寄存器中,而不用从主存中频繁重新读取。
3、java 基本类型的原子操作
1)基本类型,引用类型的复制引用是原子操作;(即一条指令完成)
2)long 与 double 的赋值,引用是可以分割的,非原子操作;
3)要在线程间共享 long 或 double 的字段时,必须在 synchronized 中操作,或是声明成 volatile
三、Java 提供的线程同步方式
1、synchronized 关键字
方法或代码块的互斥性来完成实际上的一个原子操作。(方法或代码块在被一个线程调用时,其他线程处于等待状态)
所有的 Java 对象都有一个与 synchronzied 关联的监视器对象(monitor),允许线程在该监视器对象上进行加锁和解锁操作。
a、静态方法:Java 类对应的 Class 类的对象所关联的监视器对象。
b、实例方法:当前对象实例所关联的监视器对象。
c、代码块:代码块声明中的对象所关联的监视器对象。
注:当锁被释放,对共享变量的修改会写入主存;当活得锁,CPU 缓存中的内容被置为无效。编译器在处理 synchronized 方法或代码块,不会把其中包含的代码移动到 synchronized 方法或代码块之外,从而避免了由于代码重排而造成的问题。
例:以下方法 getNext() 和 getNextV2() 都获得了当前实例所关联的监视器对象
- public class SynchronizedIdGenerator {
- private int value = 0;
- public synchronized int getNext() {
- return value++;
- }
- public int getNextV2() {
- synchronized(this) {
- return value++;
- }
- }
- }
2、Object 类的 wait、notify 和 notifyAll 方法
生产者和消费者模式,判断缓冲区是否满来消费,缓冲区是否空来生产的逻辑。如果用 while 和 volatile 也可以做,不过本质上会让线程处于忙等待,占用 CPU 时间,对性能造成影响。
wait: 将当前线程放入,该对象的等待池中,线程 A 调用了 B 对象的 wait() 方法,线程 A 进入 B 对象的等待池,并且释放 B 的锁。(这里,线程 A 必须持有 B 的锁,所以调用的代码必须在 synchronized 修饰下,否则直接抛出 java.lang.IllegalMonitorStateException 异常)。
notify:将该对象中等待池中的线程,随机选取一个放入对象的锁池,当当前线程结束后释放掉锁, 锁池中的线程即可竞争对象的锁来获得执行机会。
notifyAll:将对象中等待池中的线程,全部放入锁池。
(notify 锁唤醒的线程选择由虚拟机实现来决定,不能保证一个对象锁关联的等待集合中的线程按照所期望的顺序被唤醒,很可能一个线程被唤醒之后,发现他所要求的条件并没有满足,而重新进入等待池。因为当等待池中包含多个线程时,一般使用 notifyAll 方法,不过该方法会导致线程在没有必要的情况下被唤醒,之后又马上进入等待池,对性能有影响,不过能保证程序的正确性)
工作流程:
a、Consumer 线程 A 来 看产品,发现产品为空,调用产品对象的 wait(),线程 A 进入产品对象的等待池并释放产品的锁。
b、Producer 线程 B 获得产品的锁,执行产品的 notifyAll(),Consumer 线程 A 从产品的等待池进入锁池,Producer 线程 B 生产产品,然后退出释放锁。
c、Consumer 线程 A 获得产品锁,进入执行,发现有产品,消费产品,然后退出。
例子:
- public synchronized String pop() {
- this.notifyAll(); // 唤醒对象等待池中的所有线程,可能唤醒的就是 生产者(当生产者发现产品满,就会进入对象的等待池,这里代码省略,基本略同)
- while (index == -1) { //如果发现没产品,就释放锁,进入对象等待池
- this.wait();
- } //当生产者生产完后,消费者从this.wait()方法再开始执行,第一次还会执行循环,万一产品还是为空,则再等待,所以这里必须用while循环,不能用if
- String good = buffer[index];
- buffer[index] = null;
- index--;
- return good; // 消费完产品,退出。
- }
注:wait() 方法有超时和不超时之分,超时的在经过一段时间,线程还在对象的等待池中,那么线程也会推出等待状态。
3、线程状态转换:
已经废弃的方法:stop、suspend、resume、destroy,这些方法在实现上时不安全的。
线程的状态:NEW、RUNNABLE、BLOCKED、WAITING、TIMED_WAITING(有超时的等待)、TERMINATED。
a、方法 sleep() 进入的阻塞状态,不会释放对象的锁(即大家一起睡,谁也别想执行代码),所以不要让 sleep 方法处在 synchronized 方法或代码块中,否则造成其他等待获取锁的线程长时间处于等待。
b、方法 join() 则是主线程等待子线程完成,再往下执行。例如 main 方法新建两个线程 A 和 B
- public static void main(String[] args) throws InterruptedException {
- Thread t1 = new Thread(new ThreadTesterA());
- Thread t2 = new Thread(new ThreadTesterB());
- t1.start();
- t1.join(); // 等t1执行完再往下执行
- t2.start();
- t2.join(); // 在虚拟机执行中,这句可能被忽略
- }
c、方法 interrupt(),向被调用的对象线程发起中断请求。如线程 A 通过调用线程 B 的 d 的 interrupt 方法来发出中断请求,线程 B 来处理这个请求,当然也可以忽略,这不是必须的。Object 类的 wait()、Thread 类的 join() 和 sleep 方法都会抛出受检异常 java.lang.InterruptedException,通过 interrupt 方法中断该线程会导致线程离开等待状态。对于 wait() 调用来说,线程需要重新获取监视器对象上的锁之后才能抛出 InterruptedException 异常,并致以异常的处理逻辑。
可以通过 Thread 类的 isInterrupted 方法来判断是否有中断请求发生,通常可以利用这个方法来判断是否退出线程(类似上面的 volatitle 修饰符的例子);
Thread 类还有个方法 Interrupted(),该方法不但可以判断当前线程是否被中断,还会清楚线程内部的中断标记,如果返回 true,即曾被请求中断,同时调用完后,清除中断标记。
如果一个线程在某个对象的等待池,那么 notify 和 interrupt 都可以使该线程从等待池中被移除。如果同时发生,那么看实际发生顺序。如果是 notify 先,那照常唤醒,没影响。如果是 interrupt 先,并且虚拟机选择让该线程中断,那么即使 nofity,也会忽略该线程,而唤醒等待池中的另一个线程。
e、yield(),尝试让出所占有的 CPU 资源,让其他线程获取运行机会,对操作系统上的调度器来说是一个信号,不一定立即切换线程。(在实际开发中,测试阶段频繁调用 yeid 方法使线程切换更频繁,从而让一些多线程相关的错误更容易暴露出来)。
四、非阻塞方式
线程之间同步机制的核心是监视对象上的锁,竞争锁来获得执行代码的机会。当一个对象获取对象的锁,然后其他尝试获取锁的对象会处于等待状态,这种锁机制的实现方式很大程度限制了多线程程序的吞吐量和性能(线程阻塞),且会带来死锁(线程 A 有 a 对象锁,等着获取 b 对象锁,线程 B 有 b 对象锁,等待获取 a 对象锁)和优先级倒置(优先级低的线程获得锁,优先级高的只能等待对方释放锁)等问题。
如果能不阻塞线程,又能保证多线程程序的正确性,就能有更好的性能。
在程序中,对共享变量的使用一般遵循一定的模式,即读取、修改和写入三步组成。之前碰到的问题是,这三步执行中可能线程执行切换,造成非原子操作。锁机制是把这三步变成一个原子操作。
目前 CPU 本身实现 将这三步 合起来 形成一个原子操作,无需线程锁机制干预,常见的指令是 "比较和替换"(compare and swap,CAS),这个指令会先比较某个内存地址的当前值是不是指定的旧指,如果是,就用新值替换,否则什么也不做,指令返回的结果是内存地址的当前值。通过 CAS 指令可以实现不依赖锁机制的非阻塞算法。一般做法是把 CAS 指令的调用放在一个无限循环中,不断尝试,知道 CAS 指令成功完成修改。
java.util.concurrent.atomic 包中提供了 CAS 指令。(不是所有 CPU 都支持 CAS,在某些平台,java.util.concurrent.atomic 的实现仍然是锁机制)
atomic 包中提供的 Java 类分成三类:
1、支持以原子操作来进行更新的数据类型的 Java 类(AtomicBoolean、AtomicInteger、AtomicReference),在内存模型相关的语义上,这四个类的对象类似于 volatile 变量。
类中的常用方法:
a、compareAndSet:接受两个参数,一个是期望的旧值,一个是替换的新值。
b、weakCompareAndSet:效果同 compareAndSet(JSR 中表示 weak 原子方式读取和有条件地写入变量但不创建任何 happen-before 排序,但在源代码中和 compareAndSet 完全一样,所以并没有按 JSR 实现)
c、get 和 set:分别用来直接获取和设置变量的值。
d、lazySet:与 set 类似,但允许编译器把 lazySet 方法的调用与后面的指令进行重排,因此对值得设置操作有可能被推迟。
例:
- public class AtomicIdGenerator {
- private final AtomicInter counter = new AtomicInteger(0);
- public int getNext() {
- return counter.getAndIncrement();
- }
- }
- // getAndIncrement方法的内部实现方式,这也是CAS方法的一般模式,CAS方法不一定成功,所以包装在一个无限循环中,直到成功
- public final int getAndIncrement() {
- for (;;) {
- int current = get();
- int next = current + 1;
- if (compareAndSet(current, next)) return current;
- }
- }
2、提供对数组类型的变量进行处理的 Java 类,AtomicIntegerArray、AtomicLongArray 和 AtomicReferenceArray 类。(同上,只是放在类数组里,调用时也只是多了一个操作元素索引的参数)
3、通过反射的方式对任何对象中包含的 volatitle 变量使用 CAS 方法,AtomicIntegerFieldUpdater、AtomicLongFieldUpdater、AtomicReferenceFieldUpdater。他们提供了一种方式把 CAS 的功能扩展到了任何 Java 类中声明为 volatitle 的域上。(灵活,但语义较弱,因为对象的 volatitle 可能被非 atomic 的其他方式被修改)
- public class TreeNode{
- private volatile TreeNode parent;
- // 静态工厂方法
- private static final AtomicReferenceFieldUpdater<TreeNode, TreeNode> parentUpdater = AtomicReferenceFieldUpdater.newUpdater(TreeNode.class,TreeNode.class,"parent");
- public boolean compareAndSetParent(TreeNode expect, TreeNode update){
- return parentUpdater.compareAndSet(this, expect, update);
- }
- }
注:java.util.concurrent.atomic 包中的 Java 类属于比较底层的实现,一般作为 java.util.concurrent 包中很多非阻塞的数据结构的实现基础。
比较多的用 AtomicBoolean、AtomicInteger、AtomicLong 和 AtomicReference。在实现线程安全的计数器时,AtomicInteger 和 AtomicLong 类时最佳的选择。
五、高级同步机制(比 synchronized 更灵活的加锁机制)
synchronized 和 volatile,以及 wait、notify 等方法抽象层次低,在程序开发中使用比较繁琐,易出错。
而多线程之间的交互来说,存在某些固定的模式,如生产者 - 消费者和读者 - 写者模式,把这些模式抽象成高层 API,使用起来会非常方便。
java.util.concurrent 包为多线程提供了高层的 API,满足日常开发中的常见需求。
常用接口
1、Lock 接口,表示一个锁方法:
a、lock(),获取所,如果无法获取所锁,会处于等待状态
b、unlock(),释放锁。(一般放在 finally 代码块中)
c、lockInterruptibly(),与 lock() 类似,但允许当前线程在等待获取锁的过程中被中断。(所以要处理 InterruptedException)
d、tryLock(),以非阻塞方式获取锁,如果无法获取锁,则返回 false。(tryLock() 的另一个重载可以指定超时,如果指定超时,当无法获取锁,会等待而阻塞,同时线程可以被中断)
2、ReadWriteLock 接口,表示两个锁,读取的共享锁和写入的排他锁。(适合常见的读者 -- 写者场景)
ReadWriteLock 接口的 readLock 和 writeLock 方法来获取对应的锁的 Lock 接口的实现。
在多数线程读取,少数线程写入的情况下,可以提高多线程的性能,提高使用该数据结构的吞吐量。
如果是相反的情况,较多的线程写入,则接口会降低性能。
3、ReentrantLock 类和 ReentrantReadWriteLock,分别为上面两个接口的实现类。
他们具有重入性:即允许一个线程多次获取同一个锁(他们会记住上次获取锁并且未释放的线程对象,和加锁的次数,getHoldCount())
同一个线程每次获取锁,加锁数 + 1,每次释放锁,加锁数 - 1,到 0,则该锁被释放,可以被其他线程获取。
- public class LockIdGenrator {
- //new ReentrantLock(true)是重载,使用更加公平的加锁机制,在锁被释放后,会优先给等待时间最长的线程,避免一些线程长期无法获得锁
- private int ReentrantLock lock = ReentrantLock();
- privafte int value = 0;
- public int getNext() {
- lock.lock(); //进来就加锁,没有锁会等待
- try {
- return value++; //实际操作
- } finally {
- lock.unlock(); //释放锁
- }
- }
- }
注:重入性减少了锁在各个线程之间的等待,例如便利一个 HashMap,每次 next() 之前加锁,之后释放,可以保证一个线程一口气完成便利,而不会每次 next() 之后释放锁,然后和其他线程竞争,降低了加锁的代价, 提供了程序整体的吞吐量。(即,让一个线程一口气完成任务,再把锁传递给其他线程)。
4、Condition 接口,Lock 接口代替了 synchronized,Condition 接口替代了 object 的 wait、nofity。
a、await(),使当前线程进入等待状态,知道被唤醒或中断。重载形式可以指定超时时间。
b、awaitNanos(),以纳秒为单位等待。
c、awaitUntil(),指定超时发生的时间点,而不是经过的时间,参数为 java.util.Date。
d、awaitUninterruptibly(),前面几种会响应其他线程发出的中断请求,他会无视,直到被唤醒。
注:与 Object 类的 wait() 相同,await() 会释放其所持有的锁。
e、signal() 和 signalAll, 相当于 notify 和 notifyAll
- Lock lock = new ReentrantLock();
- Condition condition = lock.newCondition();
- lock.lock();
- try{
- while(/*逻辑条件不满足*/){
- condition.await();
- }
- }finally{
- lock.unlock();
- }
六、底层同步器
多线程程序中,线程之间存在多种不同的同步方式。除了 Java 标准库提供的同步方式之外,程序中特有的同步方式需要由开发人员自己来实现。
常见的一种需求是 对有限个共享资源的访问,比如多台个人电脑,2 台打印机,当多个线程在等待同一个资源时,从公平角度出发,会用 FIFO 队列。
如果程序中的同步方式可以抽象成对有限个资源的访问,那么可以使用 java.util.concurrent.locks 包中的 AbstractQueuedSynchronizer 类和 AbstractQueuedLongSynchronizer 类作为实现的基础,前者用 int 类型的变量来维护内部状态,而后者用 long 类型。(可以将这个变量理解为共享资源个数)
通过 getState、setState、和 compareAndSetState3 个方法更新内部变量的值。
AbstractQueuedSynchronizer 类是 abstract 的,需要覆盖其中包含的部分方法,通常做法是把其作为一个 Java 类的内部类,外部类提供具体的同步方式,内部类则作为实现的基础。有两种模式,排他模式和共享模式,分别对应方法 tryAcquire()、tryRelease 和 tryAcquireShared、tryReleaseShared,在这些方法中,使用 getState、setState、compareAndSetState3 个方法来修改内部变量的值,以此来反应资源的状态。
- public class SimpleResourceManager {
- private final InnerSynchronizer synchronizer;
- private static class InnerSynchronizer extends AbstractQueuedSynchronizer {
- InnerSynchronizer(int numOfResources) {
- setState(numOfResources);
- }
- protected int tryAcquireShared(int acquires) {
- for (;;) {
- int available = getState();
- int remain = available - acquires;
- if (remain < 0 || comapreAndSetState(available, remain) {
- return remain;
- }
- }
- }
- protected boolean
- try ReleaseShared(int releases) {
- for (;;) {
- int available = getState();
- int next = available + releases;
- if (compareAndSetState(available, next) {
- return true;
- }
- }
- }
- }
- public SimpleResourceManager(int numOfResources) {
- synchronizer = new InnerSynchronizer(numOfResources);
- }
- public void acquire() throws InterruptedException {
- synchronizer.acquireSharedInterruptibly(1);
- }
- pubic void release() {
- synchronizer.releaseShared(1);
- }
- }
七、高级同步对象(提高开发效率)
atomic 和 locks 包提供的 Java 类可以满足基本的互斥和同步访问的需求,但这些 Java 类的抽象层次较低,使用比较复杂。
更简单的做法是使用 java.util.concurrent 包中的高级同步对象。
1、信号量。
信号量一般用来数量有限的资源,每类资源有一个对象的信号量,信号量的值表示资源的可用数量。
在使用资源时,需要从该信号量上获取许可,成功获取许可,资源的可用数 - 1;完成对资源的使用,释放许可,资源可用数 + 1; 当资源数为 0 时,需要获取资源的线程以阻塞的方式来等待资源,或过段时间之后再来检查资源是否可用。(上面的 SimpleResourceManager 类实际上时信号量的一个简单实现)
java.util.concurrent.Semaphore 类,在创建 Semaphore 类的对象时指定资源的可用数
a、acquire(),以阻塞方式获取许可
b、tryAcquire(),以非阻塞方式获取许可
c、release(),释放许可。
d、accquireUninterruptibly(),accquire() 方法获取许可以的过程可以被中断,如果不希望被中断,使用此方法。
- public class PrinterManager {
- private final Semphore semaphore;
- private final List < Printer > printers = new ArrayList < >() : public PrinterManager(Collection < ?extends Printer > printers) {
- this.printers.addAll(printers);
- //这里重载方法,第二个参数为true,以公平竞争模式,防止线程饥饿
- this.semaphore = new Semaphore(this.printers.size(), true);
- }
- public Printer acquirePrinter() throws InterruptedException {
- semaphore.acquire();
- return getAvailablePrinter();
- }
- public void releasePrinter(Printer printer) {
- putBackPrinter(pinter);
- semaphore.release();
- }
- private synchronized Printer getAvailablePrinter() {
- printer result = printers.get(0);
- printers.remove(0);
- return result;
- }
- private synchronized void putBackPrinter(Printer printer) {
- printers.add(printer);
- }
- }
2、倒数闸门
多线程协作时,一个线程等待另外的线程完成任务才能继续进行。
java.util.concurrent.CountDownLatch 类,创建该类时,指定等待完成的任务数;当一个任务完成,调用 countDonw(),任务数 - 1。等待任务完成的线程通过 await(),进入阻塞状态,直到任务数量为 0。CountDownLatch 类为一次性,一旦任务数为 0,再调用 await() 不再阻塞当前线程,直接返回。
例:
- public class PageSizeSorter {
- // 并发性能远远优于HashTable的 Map实现,hashTable做任何操作都需要获得锁,同一时间只有有个线程能使用,而ConcurrentHashMap是分段加锁,不同线程访问不同的数据段,完全不受影响,忘记HashTable吧。
- private static final ConcurrentHashMap < String,
- Interger > sizeMap = new ConcurrentHashMap < >();
- private static class GetSizeWorker implements Runnable {
- private final String urlString;
- public GetSizeWorker(String urlString, CountDownLatch signal) {
- this.urlString = urlStirng;
- this.signal = signal;
- }
- public void run() {
- try {
- InputStream is = new URL(urlString).openStream();
- int size = IOUtils.toByteArray(is).length;
- sizeMap.put(urlString, size);
- } catch(IOException e) {
- sizeMap.put(urlString, -1);
- } finally {
- signal.countDown() : //完成一个任务 , 任务数-1
- }
- }
- }
- private void sort() {
- List < Entry < String,
- Integer > list = new ArrayList < sizeMap.entrySet());
- Collections.slort(list, new Comparator < Entry < String, Integer >> () {
- public int compare(Entry < String, Integer > o1, Entry < Sting, Integer > o2) {
- return Integer.compare(o2.getValue(), o1.getValue());
- };
- System.out.println(Arrays.deepToString(list.toArray()));
- }
- public void sortPageSize(Collection < String > urls) throws InterruptedException {
- CountDownLatch sortSignal = new CountDownLatch(urls.size());
- for (String url: urls) {
- new Thread(new GetSizeWorker(url, sortSignal)).start();
- }
- sortSignal.await() : //主线程在这里等待,任务数归0,则继续执行
- sort();
- }
- }
3、循环屏障
循环屏障在作用上类似倒数闸门,不过他不像倒数闸门是一次性的,可以循环使用。另外,线程之间是互相平等的,彼此都需要等待对方完成,当一个线程完成自己的任务之后,等待其他线程完成。当所有线程都完成任务之后,所有线程才可以继续运行。
当线程之间需要再次进行互相等待时,可以复用同一个循环屏障。
类 java.uti.concurrent.CyclicBarrier 用来表示循环屏障,创建时指定使用该对象的线程数目,还可以指定一个 Runnable 接口的对象作为每次循环后执行的动作。(当最后一个线程完成任务之后,所有线程继续执行之前,被执行。如果线程之间需要更新一些共享的内部状态,可以利用这个 Runnalbe 接口的对象来处理)。
每个线程任务完成之后,通过调用 await 方法进行等待,当所有线程都调用 await 方法之后,处于等待状态的线程都可以继续执行。在所有线程中,只要有一个在等待中被中断,超时或是其他错误,整个循环屏障会失败,所有等待中的其他线程抛出 java.uti.concurrent.BrokenBarrierException。
例:每个线程负责找一个数字区间的质数,当所有线程完成后,如果质数数目不够,继续扩大范围查找
- public class PrimeNumber {
- private static final int TOTAL_COUTN = 5000;
- private static final int RANGE_LENGTH = 200;
- private static final int WORKER_NUMBER = 5;
- private static volatitle boolean done = false;
- private static int rangeCount = 0;
- private static final List < Long > results = new ArrayList < Long > () : private static final CyclicBarrier barrier = new CyclicBarrier(WORKER_NUMBER, new Runnable() {
- public void run() {
- if (results.size() >= TOTAL_COUNT) {
- done = true;
- }
- }
- });
- private static class PrimeFinder implements Runnable {
- public void run() {
- while (!done) { // 整个过程在一个 while循环下,await()等待,下次循环开始,会再次判断 执行条件
- int range = getNextRange();
- long start = rang * RANGE_LENGTH;
- long end = (range + 1) * RANGE_LENGTH;
- for (long i = start; i < end; i++) {
- if (isPrime(i)) {
- updateResult(i);
- }
- }
- try {
- barrier.await();
- } catch(InterruptedException | BokenBarrierException e) {
- done = true;
- }
- }
- }
- }
- private synchronized static void updateResult(long value) {
- results.add(value);
- }
- private synchronized static int getNextRange() {
- return rangeCount++;
- }
- private static boolean isPrime(long number) {
- //找质数的代码
- }
- public void calculate() {
- for (int i = 0; i < WORKER_NUMBER; i++) {
- new Thread(new PrimeFinder()).start();
- }
- while (!done) {
- }
- //计算完成
- }
- }
4、对象交换器
适合于两个线程需要进行数据交换的场景。(一个线程完成后,把结果交给另一个线程继续处理)
java.util.concurrent.Exchanger 类,提供了这种对象交换能力,两个线程共享一个 Exchanger 类的对象,一个线程完成对数据的处理之后,调用 Exchanger 类的 exchange() 方法把处理之后的数据作为参数发送给另外一个线程。而 exchange 方法的返回结果是另外一个线程锁提供的相同类型的对象。如果另外一个线程未完成对数据的处理,那么 exchange() 会使当前线程进入等待状态,直到另外一个线程也调用了 exchange 方法来进行数据交换。
例:
- public class SendAndReceiver {
- private final Exchanger < StringBuilder > exchanger = new Exchanger < StringBuilder > ();
- private class Sender implements Runnable {
- public void run() {
- try {
- StringBuilder content = new StringBuilder("Hello");
- content = exchanger.exchange(content);
- } catch(InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- }
- }
- private class Receiver implements Runnable {
- public void run() {
- try {
- StringBuilder content = new StringBuilder("World");
- content = exchanger.exchange(content);
- } catch(InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- }
- }
- public void exchange() {
- new Thread(new Sender()).start();
- new Thread(new Receiver()).start();
- }
- }
八、数据结构(多线程程序使用的高性能数据结构)
java.util.concurrent 包中提供了一些适合多线程程序使用的高性能数据结构,包括队列和集合类对象等。
1、队列
a、BlockingQueue 接口:线程安全的阻塞式队列;当队列已满时,想队列添加会阻塞;当队列空时,取数据会阻塞。(非常适合消费者 - 生产者模式)
阻塞方式:put()、take()。
非阻塞方式:offer()、poll()。
实现类:基于数组的固定元素个数的 ArrayBolockingQueue 和基于链表结构的不固定元素个数的 LinkedBlockQueue 类。
b、BlockingDeque 接口: 与 BlockingQueue 相似,但可以对头尾进行添加和删除操作的双向队列;方法分为两类,分别在队首和对尾进行操作。
实现类:标准库值提供了一个基于链表的实现,LinkedBlockgingDeque。
2、集合类
在多线程程序中,如果共享变量时集合类的对象,则不适合直接使用 java.util 包中的集合类。这些类要么不是线程安全,要么在多线程下性能比较差。
应该使用 java.util.concurrent 包中的集合类。
a、ConcurrentMap 接口: 继承自 java.util.Map 接口
putIfAbsent():只有在散列表不包含给定键时,才会把给定的值放入。
remove():删除条目。
replace(key,value):把 value 替换到给定的 key 上。
replace(key, oldvalue, newvalue):CAS 的实现。
实现类:ConcurrentHashMap:
创建时,如果可以预估可能包含的条目个数,可以优化性能。(因为动态调整所能包含的数目操作比较耗时,这个
HashMap 也一样,只是多线程下更耗时)。创建时,预估进行更新操作的线程数,这样实现中会根据这个数把内部空间划分为对应数量的部分。(默认是 16,如果只有一个线程进行写操作,其他都是读取,那么把值设为 1 可以提高性能)。
注:当从集合中创建出迭代器遍历 Map 元素时,不一定能看到正在添加的数据,只能和集合保证弱一致性。(当然使用迭代器不会因为查看正在改变的 Map,而抛出 java.util.ConcurrentModifycationException)
b、CopyOnWriteArrayList 接口:继承自 java.util.List 接口。
顾名思义,在 CopyOnWriteArrayList 的实现类,所有对列表的更新操作都会新创建一个底层数组的副本,并使用副本来存储数据;对列表更新操作加锁,读取操作不加锁。
适合多读取少修改的场景,如果更新操作多,那么不适合用,同样迭代器只能表示创建时列表的状态,更新后使用了新的底层数组,迭代器还是引用旧的底层数组。
九、多线程任务的执行
过去线程的执行,是先创建 Thread 类的想,再调用 start 方法启动,这种做法要求开发人员对线程进行维护,在线程较多时,一般创建一个线程池同一管理,同时降低重复创建线程的开销
在 J2SE5.0 中,java.util.concurrent 包提供了丰富的用来管理线程和执行任务的实现。
1、基本接口(描述任务)
a、Callable 接口:
Runnable 接口受限于 run 方法的类型签名,而 Callable 只有一个方法 call(),可以有返回值,可以抛出受检异常。
b、Future 接口:
过去,需要异步线程的任务执行结果,要求主线程和任务执行线程之间进行同步和数据传递。
Future 简化了任务的异步执行,作为异步操作的一个抽象。调用 get() 方法可以获取异步的执行结果,如果任务没有执行完,会等待,直到任务完成或被取消,cancel() 可以取消。
c、Delayed 接口:
延迟执行任务,getDelay() 返回当前剩余的延迟时间,如果不大于 0,说明延迟时间已经过去,应该调度并执行该任务。
2、组合接口(描述任务)
a、RunnableFuture 接口:继承自 Runnable 接口和 Future 接口。
当来自 Runnalbe 接口中的 run 方法成功执行之后,相当于 Future 接口表示的异步任务已经完成,可以通过 get() 获取运行结果。
b、ScheduledFuture 接口:继承 Future 接口和 Delayed 接口,表示一个可以调用的异步操作。
c、RunnableScheduledFuture 接口:继承自 Runnable、Delayed 和 Future,接口中包含 isPeriodic,表明该异步操作是否可以被重复执行。
3、Executor 接口、ExcutorServer 接口、ScheduleExecutorService 接口和 CompletionService 接口(描述任务执行)
a、executor 接口,execute() 用来执行一个 Runnable 接口的实现对象,不同的 Executor 实现采取不同执行策略,但提供的任务执行功能比较弱。
b、excutorServer 接口,继承自 executor;
提供了对任务的管理:submit(),可以吧 Callable 和 Runnable 作为任务提交,得到一个 Future 作为返回,可以获取任务结果或取消任务。
提供批量执行:invokeAll() 和 invokeAny(),同时提交多个 Callable;invokeAll(),会等待所有任务都执行完成,返回一个包含每个任务对应 Future 的列表;invokeAny(),任何一个任务成功完成,即返回该任务结果。
提供任务关闭:shutdown()、shutdownNow() 来关闭服务,前者不允许新的任务提交,后者试图终止正在运行和等待的任务,并返回已经提交单没有被运行的任务列表。(两个方法都不会等待服务真正关闭,只是发出关闭请求。)。
shutdownDow,通常做法是向线程发出中断请求,所以确保提交的任务实现了正确的中断处理逻辑。
c、ScheduleExecutorService 接口,继承自 excutorServer 接口:支持任务的延迟执行和定期执行,可以执行 Callable 或 Runnable。
schedule(),调度一个任务在延迟若干时间之后执行;
scheduleAtFixedRate():在初始延迟后,每隔一段时间循环执行;在下一次执行开始时,上一次执行可能还未结束。(同一时间,可能有多个)
scheduleWithFixedDelay:同上,只是在上一次任务执行完后,经过给定的间隔时间再开始下一次执行。(同一时间,只有一个)
以上三个方法都返回 ScheduledFuture 接口的实现对象。
d、CompletionService 接口,共享任务执行结果。
通常在使用 ExecutorService 接口,通过 submit 提交任务,并得到一个 Future 接口来获取任务结果,如果任务提交者和执行结果的使用者是程序的不同部分,那就要把 Future 在不同部分进行传递;而 CompletionService 就是解决这个问题,程序不同部分可以共享 CompletionService,任务提交后,执行结果可以通过 take(阻塞),poll(非阻塞)来获取。
标准库提供的实现是 ExecutorCompletionService,在创建时,需要提供一个 Executor 接口的实现作为参数,用来实际执行任务。
例:多线程方式下载文件
- public class FileDownloader {
- // 线程池
- private final ExecutorService executor = Executors.newFixedThreadPool(10);
- public boolean download(final URL url, final Path path) {
- Future < Path > future = executor.submit(new Callable < Path > () { //submit提交任务
- public Path call() {
- //这里就省略IOException的处理了
- InputStream is = url.openStream();
- Files.copy(is, path, StandardCopyOption.REPLACE_EXISTING);
- return path;
- });
- try {
- return future.get() != null ? true: false;
- } < span style = "font-family: Arial, Helvetica, sans-serif;" >
- catch(InterruptedException | ExecutionException e) { < /span>
- return false;
- }
- }
- public void close(){/ / 当不再使用FileDownloader类的对象时,应该使用close方法关闭其中包含的ExecutorService接口的实现对象,否则虚拟机不会退出,占用内存不释放executor.shutdown(); // 发出关闭请求,此时不会再接受新任务
- try {
- if (!executor.awaitTermination(3, TimeUnit.MINUTES)) { // awaitTermination 来等待一段时间,使正在执行的任务或等待的任务有机会完成
- executor.shutdownNow(); // 如果等待时间过后还有任务没完成,则强制结束
- executor.awaitTermination(1, TimeUnit.MINUTES); // 再等待一段时间,使被强制结束的任务完成必要的清理工作
- }
- } catch(InterruptedException e) {
- executor.shutdownNow();
- Thread.currentThread().interrupt();
- }
- }
- }
十、Java SE 7 新特性
对 java.util.concurrent 包进行更新,增加了新的轻量级任务执行框架 fork/join 和多阶段线程同步工具。
1、轻量级任务执行框架 fork/join
这个框架的目的主要是更好地利用底层平台上的多核和多处理器来进行并行处理。
通过分治算法或 map/reduce 算法来解决问题。
fork/join 类比于 map/reduce。
fork 操作是把一个大的问题划分为若干个较小的问题,划分过程一般为递归,直到可以直接进行计算的粒度适合的子问题;子问题在结算后,可以得到整个问题的部分解
join 操作收集子结果,合并,得到完整解,也可能是 递归进行的。
相对一般的线程池实现,F/J 框架的优势在任务的处理方式上。在一般线程池中,一个线程由于某些原因无法运行,会等待;而在 F/J,某个子问题由于等待另外一个子问题的完成而无法继续运行,那么处理该子问题的线程会主动寻找其他尚未运行的子问题来执行。这种方式减少了等待时间,提高了性能。
为了 F/J 能高效,在每个子问题视线中应避免使用 synchronized 或其他方式进行同步,也不应使用阻塞式 IO 或过多访问共享变量。在理想情况下,每个子问题都应值进行 CPU 计算,只使用每个问题的内部对象,唯一的同步应只发生在子问题和创建它的父问题之间。(这完全就是 Hadoop 的 MapReduce 嘛)
a、ForkJoinTask 类:表示一个由 F/J 框架执行的任务,该类实现了 Future 接口,可以按照 Future 接口的方式来使用。(表示任务)
fork(),异步方式启动任务的执行。
join(),等待任务完成并返回执行结果。
在创建自己的任务时,最好不要直接继承自 ForkJoinTask,而是继承其子类,RecuriveTask 或 RecursiveAction,前者可以返回结果,后者不行。
b、ForkJoinPool 类:表示任务执行,实现了 ExecutorService 接口,除了可以执行 ForkJoinTask,也可以执行 Callable 和 Runnable。(任务执行)
执行任务的两大类:
第一类:execute、invoke 或 submit 方法:直接提交任务。
第二类:fork():运行 ForkJoinTask 在执行过程中的子任务。
一般作法是表示整个问题的 ForkJoinTask 用第一类提交,执行过程中产生的子任务不需要处理,ForkJoinPool 会负责子任务执行。
例:查找数组中的最大值
- private static class MaxValueTask extends RecursiveTask < Long > {
- private final long[] array;
- private final int start;
- private final int end;
- MaxValueTask(long[] array, int start, int end) {
- this.array = array;
- this.start = start;
- this.end = end;
- }
- //compute是RecursiveTask的主方法
- protected long compute() {
- long max = Long.MIN_VALUE;
- if (end - start < RANG_LENGTH) { //寻找最大值
- for (int i = start; i < end; i++{
- if (array[i] > max) {
- max = array[i];
- }
- }
- } else { // 二分任务
- int mid = (start + end) / 2;
- MaxValueTask lowTask = new MaxValueTask(array, start, mid);
- MaxValueTask highTask = new MaxValueTask(array, mid, end);
- lowTask.fork(); // 异步启动任务
- highTask.fork();
- max = Math.max(max, lowTask.join()); //等待执行结果
- max = Math.max(max, highTask.join();
- }
- return max;
- }
- public Long calculate(long[] array) {
- MaxValueTask task = new MaxValueTask(array, 0, array.length);
- Long result = forkJoinPool.invoke(task);
- return result;
- }
- }
注:这个例子是示例,但从性能上说直接对整个数组顺序比较效率高,毕竟多线程所带来的额外开销过大。
在实际中,F/J 框架发挥作用的场合很多,比如在一个目录包含的所有文本中搜索某个关键字,可以每个文件创建一个子任务。
如果相关的功能可以用递归和分治来解决,就适合 F/J。
2、多阶段线程同步工具
Phaser 类是 Java SE 7 中新增的一个使用同步工具,功能和灵活性比倒数闸门和循环屏障要强很多。
在 F/J 框架中的子任务之间要进行同步时,应优先考虑 Phaser。
Phaser 把多个线程写作执行的任务划分成多个阶段(phase),编程时要明确各个阶段的任务,每个阶段都可以有任意个参与者,线程可以随时注册并参与到某个阶段,当一个阶段中所有线程都成功完成之后,Phaser 的 onAdvance() 被调用,可以通过覆盖添加自定义处理逻辑(类似循环屏障的使用的 Runnable 接口),然后 Phaser 类会自动进入下个阶段。如此循环,知道 Phaser 不再包含任何参与者。
Phaser 创建后,初始阶段编号为 0,构造函数中指定初始参与个数。
register(),bulkRegister(),动态添加一个或多个参与者。
arrive(),某个参与者完成任务后调用
arriveAndDeregister(),任务完成,取消自己的注册。
arriveAndAwaitAdvance(),自己完成等待其他参与者完成。,进入阻塞,直到 Phaser 成功进入下个阶段。
awaitAdvance()、awaitAdvanceInterruptibly(),等待 phaser 进入下个阶段,参数为当前阶段的编号,后者可以设置超时和处理中断请求。
另外,Phaser 的一个重要特征是多个 Phaser 可以组成树形结构,Phaser 提供了构造方法来指定当前对象的父对象;当一个子对象参与者 > 0,会自动注册到父对象中;当 = 0,自动解除注册。
例:从指定网址,下载 img 标签的照片
阶段
1、处理网址对应的 html 文本,和抽取 img 的链接;
2、创建图片下载子线程,主线程等待;
3、子线程下载图片,主线程等待;
4、任务完成退出
- public class WebPageImageDownloader {
- private final Phaser phaser = new Phaser(1); //初始参与数1,代表主线程。
- public void download(URL url, final Path path) throws IOException {
- String content = getContent(url); //获得HTML文本,省略。
- List < URL > imageUrls = extractImageUrls(content); //获得图片链接,省略。
- for (final URL imageUrl: imageUrls) {
- phaser.register(); //子线程注册
- new Thread() {
- public void run() {
- phaser.arriveAndAwaitAdvance(); //第二阶段的等待,等待进入第三阶段
- try {
- InputStream is = imageUrl.openStream();
- File.copy(is, getSavePath(path, imageUrl), StandardCopyOption.REPLACE_EXISTING);
- } catch(IOException e) {
- e.printStackTrace() :
- } finally {
- phaser.arriveAndDeregister(); //子线程完成任务,退出。
- }
- }
- }.start();
- }
- phaser.arriveAndAwaitAdvance(); //第二阶段等待,子线程在注册
- phaser.arriveAndAwaitAdvance(); //第三阶段等待,子线程在下载
- phaser.arriveAndDeregister(); //所有线程退出。
- }
- }
十一、ThreadLocal 类
java.lang.ThreadLocal,线程局部变量,把一个共享变量变为一个线程的私有对象。不同线程访问一个 ThreadLocal 类的对象时,锁访问和修改的事每个线程变量各自独立的对象。通过 ThreadLocal 可以快速把一个非线程安全的对象转换成线程安全的对象。(同时也就不能达到数据传递的作用了)。
a、get() 和 set() 分别用来获取和设置当前线程中包含的对象的值。
b、remove(),删除。
c、initialValue(),初始化值。如果没有通过 set 方法设置值,第一个调用 get,会通过 initValue 来获取对象的初始值。
ThreadLoacl 的一般用法,创建一个 ThreadLocal 的匿名子类并覆盖 initalValue(),把 ThreadLoacl 的使用封装在另一个类中
- public class ThreadLocalIdGenerator {
- private static final ThreadLocal < IdGenerator > idGenerator = new ThreadLocal < IdGenerator > () {
- protected IdGenerator initalValue() {
- return new IdGenerator(); //IdGenerator 是个初始int value =0,然后getNext(){ return value++}
- }
- };
- public static int getNext() {
- return idGenerator.get().getNext();
- }
- }
ThreadLoal 的另外一个作用是创建线程唯一的对象,在有些情况,一个对象在代码中各个部分都需要用到,传统做法是把这个对象作为参数在代码间传递,如果使用这个对 I 昂的代码都在同一个线程,可以封装在 ThreadLocal 中。
如:在多线程中,生成随机数
java.util.Random 会带来竞争问题,java.util.concurrent.ThreadLocalRandom 类提供多线程下的随机数声场,底层是 ThreadLoacl。
总结:
多线程开发中应该优先使用高层 API,如果无法满足,使用 java.util.concurrent.atomic 和 java.util.concurrent.locks 包提供的中层 API,而 synchronized 和 volatile,以及 wait,notify 和 notifyAll 等低层 API 应该最后考虑。
以上所述是小编给大家介绍的 Java 多线程并发编程,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 PHPERZ 网站的支持!
来源: http://www.phperz.com/article/17/1229/356872.html