上周的面试中,被问及了几个关于 Java 并发编程的问题,自己回答的都不是很系统和全面,可以说是 "头皮发麻",哈哈。因此果断购入《Java 并发编程的艺术》一书,学习后的体会是要想快速上手 Java 并发编程,最需要掌握的是线程、线程池概念的理解和 Executor 框架的使用。
Tip:
实践请见 github-multiThread,不会介绍 Java 内存模型等更底层的内容。看看下图的 "糙汉" 身上错综复杂的线 [程],愿通过学习,能化繁为简,[高效] 的编出 [高效] 的多线程代码。
在实践中,为了更好的利用资源提高系统整体的吞吐量,会选择并发编程。但由于上下文切换和死锁等问题,并发编程不一定能提高性能,因此如何合理的进行并发编程时本文的重点,接下来介绍关于锁最基本的一些知识(选学)。
的代码。
- monitorenter, monitorexit
指令减少自旋带来的开销;只能保证一个共享变量的原子操作,通过
- pause
保证引用对象间的原子性,接下来看一个最简单的 CAS 操作示例。
- AtomicRefence
- protected void safeCount() {
- for (;;) {
- int i = atomicI.get();
- if (atomicI.compareAndSet(i, ++i)) break;
- }
- }
这部分和之后的锁是基础部分的核心内容,需要好好理解。一般来说,线程都是操作系统最小的调度单元,一个进程中可以包含多个线程,每个线程都拥有自己的计数器、堆栈和局部变量。系统会采用分时的形式调度运行的线程,OS 会分出一个个的时间片到线程,此外还可以给线程设置优先级,来保证优先级高的线程获得更多的 CPU 时间。通过下面的示例代码可以发现,java 程序的运行不仅就是 main 线程,还有清楚 Reference 的线程、调用对象 finalize 方法的线程、分发处理发送给 JVM 信息的线程、Attach Listener 线程等。
- // 获取管理线程的MXbean
- ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();
- ThreadInfo[] threadInfos = threadMXBean.dumpAllThreads(true, true);
- // 打印线程信息
- for (ThreadInfo threadInfo: threadInfos) {
- System.out.println("[" + threadInfo.getThreadId() + "]" + threadInfo.getThreadName());
- }
初始状态,线程被构建但未 start;
- NEW
运行状态,Java 线程将 OS 中的就绪和运行两种状态都称作 "运行中";
- RUNNABLE
阻塞状态,表示线程阻塞于锁;
- BLOCKED
等待状态,表示线程进入等待状态,进入该状态表示当前线程需要等待其他线程做出特定动作(通知或中断);
- WAITING
超时等待状态,该状态不同于
- TIME_WAITING
,其会在指定的时候后返回;
- WAITING
终止状态,可以使用
- TERMINATED
合理的终止线程,表示当前线程已经执行完毕,之后通过一张 Java 线程状态图来做个形象的了解。
- interrupt()
- //等待方:1.获取对象的锁 2.如果条件不满足,那么调用对象的wait方法,被通知后要检查条件
- //3.条件满足则执行对应的逻辑
- synchronized(lock) {
- while (!flag) {
- lock.wait();
- }
- }
- //通知方:1.获取对象的锁 2.改变条件 3.通知所有等待在对象上线程
- synchronized(lock) {
- flag = true;
- lock.notifyAll();
- }
线程变量是以
- ThreadLocal
对象为键,任意对象为值的存储结构。此外,这部分常见的应用实例包括等待超时模式,数据库线程池,基于线程池的简单 web 服务器等。
- ThreadLocal
锁是用来控制多个线程访问共享资源的方式,在 Lock 接口出现前都是通过
来处理线程间同步问题。锁的主要方法包括
- synchronized
,
- lock
,
- tryLock
,
- unlock
获取等待通知组件等方法。其相关的实现包括队列同步器
- newCondition
、重入锁
- AbstractQueuedSynchronizer
、读写锁
- ReentrantLock
、LockSupport 和 Condition 接口,这部分的重点讲是可重入锁 ReenterLock。
- ReentrantReadWriteLock
表示该锁可以支持一个线程对资源的重复加锁,并支持获取琐时的公平性的选择。默认是非公平锁,其特点是性能要远高于公平锁(严格按照请求时间顺序获取所,FIFO)。
- ReentrantLock
- ReentrantLock lock = new ReentrantLock(true);
- lock.lock();
- try {
- // TODO
- } finally {
- lock.unlock();
- }
同时维护一个读锁和一个写锁,允许多个读线程同时访问共享数据,只会在写线程访问时阻塞,和数据库的锁机制很类似,该方式使得并发性等到很大提升。其除了公平性选择、可重入等特性外,还支持锁降级,遵循获取写锁、获取读锁再释放写锁的次序,写锁能降级为读锁。
- ReentrantReadWriteLock
阻塞,
- park
唤醒的静态方法。
- unpark
、
- wait()
等,这些方法与
- notify()
关键字配合可以实现等待 / 通知模式,
- synchronized
接口也提供了类似的监视器方法,但功能更加强大。
- Condition
对
- Segment
进行包装,达到了记录级别的锁粒度,和数据库相关知识类似。HashTable 由于只支持 [表] 级锁,因此性能比较低下。
- HashEntry
则是队列的线程安全版本,没有什么特别要说的。
- ConcurrentLinkedQueue
,
- ArrayBlockingQueue
,
- LinkedBlockingQueue
等,不是重点。
- DelayQueue
算法,可以使得线程可以从其他队列里窃取任务来执行,优点是充分利用线程进行并行计算,减少了线程间的竞争;缺点是在某些情况下存在竞争,比如双端队列里只有一个任务时,该算法会消耗更多的系统资源。
- work-stealing
这部分的内容非常重要,之后介绍的一些常见模式可以很好的应用在日常的开发场景中,一定要掌握牢靠。
和
- AtomicBoolean
,
- AtomicInteger
,
- AtomicIntegerArray
等,接下来选择一个比较复杂的作为示例。
- AutomicReference
- User user = new User("xionger", 30);
- atomicUserRef.set(user);
- User updateUser = new User("xiongerda", 32);
- atomicUserRef.compareAndSet(user, updateUser);
- System.out.println(atomicUserRef.get().getName());
- System.out.println(atomicUserRef.get().getOld());
- static CountDownLatch latch = new CountDownLatch(3);
- public static void main(String[] args) throws InterruptedException {
- new Thread(new Runnable() {@Override public void run() {
- latch.countDown();
- }
- }).start();
- new Thread(new Runnable() {@Override public void run() {
- latch.countDown();
- }
- }).start();
- latch.await();
- }
有些相似,不过其特点是可以使用
- CountDownLatch
方法重置,并通过
- reset
判断线程是否中断。
- isBroken()
- private static ExecutorService executorService = Executors.newFixedThreadPool(50);
- private static Semaphore sema = new Semaphore(15);
- public static void main(String[] args) {
- for (int i = 0; i < 50; i++) {
- executorService.execute(new Runnable() {@Override public void run() {
- try {
- sema.acquire();
- System.out.println("save data");
- sema.release();
- } catch(InterruptedException e) {
- e.printStackTrace();
- }
- }
- });
- }
- executorService.shutdown();
- }
- private static final Exchanger < String > exchanger = new Exchanger < >();
- private static ExecutorService threadPool = Executors.newFixedThreadPool(2);
- public static void main(String[] args) {
- threadPool.execute(new Runnable() {@Override public void run() {
- String a = "银行流水A";
- try {
- exchanger.exchange(a);
- } catch(InterruptedException e) {
- e.printStackTrace();
- }
- }
- });
- threadPool.execute(new Runnable() {@Override public void run() {
- String b = "银行流水B";
- try {
- String a = exchanger.exchange(b);
- System.out.println("a和b是否数据一致:" + a.equals(b) + ",a录入的是: " + a + ",b录入的是" + b);
- } catch(InterruptedException e) {
- e.printStackTrace();
- }
- }
- });
- }
时,尽可能的避免了获取全局锁,大部分的可能都会执行步骤 2,而无需获取全局锁。 在引入 Executor 框架前,Java 线程既是工作单元,也是执行机制。而在 Executor 框架中,工作单元和执行机制被分离开来,前者包括
- execute()
和
- Runnable
,而执行机制由 Executor 框架提供。该框架是一个两级的调度模型,在上层,通过调度器 Executor 将多个任务映射到固定数量的线程;在底层,操作系统内核将这些线程再映射到处理器上。而我们的应用程序只需通过 E 该框架控制上层的调度即可。 Tip: 在合理配置线程池时,需要根据具体场景给出对应的解决方案,总体来说,推荐使用有界队列,便于控制。 CPU 密集型:配置尽可能少的线程,如
- Callable
,可以通过
- cpu数量+1
获取 CPU 个数 IO 密集型:配置尽可能多的线程,如
- Runtime.getRuntime().availableProcessors()
,常见场景,等待数据库或服务接口的返回。 优先级:可以通过
- 2*cpu数量
来处理 监控:可以通过
- PriorityBlockingQueue
,
- taskCount
,
- completedTaskCount
等函数来监控线程池的运行。
- getActiveSize
和
- Runnable
- Callable
和其子类
- Executor
,相关的实现类包括
- ExecutorService
和
- ThreadPoolExecutor
。
- ScheduledThreadPoolExecutor
和其实现
- Future
。
- FutureTask
,
- corePool
,
- maximumPool
,
- BlockingQueue
4 部分组成,可以由工具类
- RejectedExecutionHandler
创建。具体老说,工具类可以创建
- Executors
固定线程数(最推荐)、
- FixedThreadPool
、
- SingleThreadExecutor
三种类型的
- CachedThreadPool
。
- ThreadPoolExecutor
对象更加全面,其通过
- Timer
来执行周期性或定时的任务。
- DelayQueue
(AQS),之前介绍的
- AbstractQueuedSynchronizer
、
- ReentrantLock
等其实都是基于 AQS 来实现的。AQS 是一个同步框架,提供通用机制来原子性的管理同步状态、阻塞 & 唤醒线程、维护被阻塞的线程队列。每个基于 AQS 的实现都会包含两类操作,acquire 用于阻塞调用线程,对应
- CountDownLatch
,知道 AQS 状态允许这个线程才能继续执行;另一个为 release,对应
- futureTask.get()
,该操作改变 AQS 状态,改变后的状态允许一个或多个阻塞线程解除阻塞。
- futureTask.cancel()&run()
- public static void main(String[] args) throws InterruptedException,
- ExecutionException {
- ExecutorService executor = Executors.newSingleThreadExecutor();
- Future < BigDecimal > result = executor.submit(new Callable < BigDecimal > () {@Override public BigDecimal call() throws Exception {
- return getSalaryByService();
- }
- });
- System.out.println(result.get());
- }
命令查看进程的情况,之后可以使用交互命令
- top
查看 CPU 性能,
- 1
查看每个线程的性能信息。 性能测试:比如使用 Jmeter 来做压测,可以通过
- H
来查看数据的压力情况。
- netstat -nat | grep 3306 -c
参考资料
来源: http://www.cnblogs.com/wanliwang01/p/javacore_multiThread.html