一, 传统通信
- public static void main(String[] args) {
- //volatile 实现两个线程间数据可见性
- private volatile static List list = new ArrayList();
- Thread t1 = new Thread(new Runnable() { // (1)
- public void run() {
- try {
- for(int i = 0; i <10; i++){
- list.add(i);
- System.out.println(Thread.currentThread().getName() + "线程添加第" + (i + 1) + "个元素..");
- Thread.sleep(500);
- }
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }, "t1");
- Thread t2 = new Thread(new Runnable() { // (2)
- public void run() {
- while(true){
- if(list.size() == 5){
- //do something
- throw new RuntimeException(Thread.currentThread().getName() +
- "线程接到通知 size =" + list.size() + "线程停止..");
- }
- }
- }
- }, "t2");
- t1.start();
- t2.start();
- }
t1 线程不断将生产的数据放入 list 集合中
t2 线程开启 while 循环监听 t1 线程, 虽然可以实现 list.size()==5 时实时通知 t2 线程, 但太浪费性能, 考虑用 await/notify 提高性能, 程序执行结果如下:
t1 线程添加第 1 个元素..
t1 线程添加第 2 个元素..
t1 线程添加第 3 个元素..
t1 线程添加第 4 个元素..
t1 线程添加第 5 个元素..
Exception in thread "t2" java.lang.RuntimeException: t2 线程接到通知 size = 5 线程停止..
- at com.github.binarylei.thread._2_1conn.ListAdvice1$2.run(ListAdvice1.java:35)
- at java.lang.Thread.run(Thread.java:745)
t1 线程添加第 6 个元素..
t1 线程添加第 7 个元素..
t1 线程添加第 8 个元素..
t1 线程添加第 9 个元素..
t1 线程添加第 10 个元素..
二, wait/notify 实现通信
- /**
- * 使用 wait/notify 方法实现线程单挑通信(注意这两个方法是 Object 类的方法)
- * 1. wait 和 notity 必须配合 synchronized 关键字使用
- * 2. wait 方法 (关闭线程) 释放锁, notify(唤醒线程)方法不释放锁
- * 缺点: 通知不实时, 使用 CountDownLatch 实现实时通知
- */
- public static void main(String[] args) {
- private volatile static List list = new ArrayList();
- final Object lock = new Object();
- Thread t1 = new Thread(new Runnable() { // (1)
- public void run() {
- try {
- synchronized (lock) {
- System.out.println("t1 启动..");
- for(int i = 0; i <10; i++){
- list.add(i);
- System.out.println(Thread.currentThread().getName() + "线程添加第" + (i + 1) + "个元素..");
- Thread.sleep(500);
- if(list.size() == 5){
- System.out.println("已经发出通知..");
- lock.notify();
- }
- }
- }
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }, "t1");
- Thread t2 = new Thread(new Runnable() { // (2)
- public void run() {
- synchronized (lock) {
- System.out.println("t2 启动..");
- if(list.size() != 5){
- try {
- lock.wait();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- //do something
- throw new RuntimeException(Thread.currentThread().getName() +
- "线程接到通知 size =" + list.size() + "线程停止..");
- }
- }
- }, "t2");
- }
t1 线程当 list.size()==5 时 lock.notify() 唤醒 t2 线程, 注意 wait/notify 必须配合 synchronized 使用.
t2 线程调用 lock.wait() 后处于一直阻塞状态, 直到 t1 线程调用 lock.notify() 唤醒该线程, 倘若没有线程唤醒 t2 线程, 那么 t2 线程就一直处于阻塞状态. 本例中若 t1 线程先启动, 那么 t2 线程调用 lock.wait() 就永远阻塞无法执行. 程序执行结果如下:.
t2 启动..
t1 启动..
t1 线程添加第 1 个元素..
t1 线程添加第 2 个元素..
t1 线程添加第 3 个元素..
t1 线程添加第 4 个元素..
t1 线程添加第 5 个元素..
已经发出通知..
t1 线程添加第 6 个元素..
t1 线程添加第 7 个元素..
t1 线程添加第 8 个元素..
t1 线程添加第 9 个元素..
t1 线程添加第 10 个元素..
Exception in thread "t2" java.lang.RuntimeException: t2 线程接到通知 size = 10 线程停止..
- at com.github.binarylei.thread._2_1conn.ListAdd2$2.run(ListAdd2.java:51)
- at java.lang.Thread.run(Thread.java:745)
由于 t1 线程 lock.notify() 后不会释放锁, t2 线程虽然被唤醒但不能获取锁, 所以通知就不那么实时, 只有等 t1 线程执行完成释放锁后 t2 线程才能获得锁执行相应操作, 解决方案: 使用 CountDownLatch
三, CountDownLatch 实现实时通信
- public static void main(String[] args) {
- private volatile static List list = new ArrayList();
- final CountDownLatch countDownLatch = new CountDownLatch(1); // (1)
- Thread t1 = new Thread(new Runnable() {
- public void run() {
- try {
- System.out.println("t1 启动..");
- for(int i = 0; i <10; i++){
- list.add(i);
- System.out.println(Thread.currentThread().getName() + "线程添加第" + (i + 1) + "个元素..");
- Thread.sleep(500);
- if(list.size() == 5){
- System.out.println("已经发出通知..");
- countDownLatch.countDown(); // (2)
- }
- }
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }, "t1");
- Thread t2 = new Thread(new Runnable() {
- public void run() {
- System.out.println("t2 启动..");
- if(list.size() != 5){
- try {
- countDownLatch.await(); // (3)
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- //do something
- throw new RuntimeException(Thread.currentThread().getName() +
- "线程接到通知 size =" + list.size() + "线程停止..");
- }
- }, "t2");
- t1.start();
- t2.start();
- }
CountDownLatch 同步工具类, 允许一个或多个线程一直等待, 直到其他线程的操作执行完后再执行, 参数 1 表示需要等待的线程数量, 具体来说就是参数为几就必须调用几次
- countDownLatch.countDown()
- countDownLatch.countDown()
唤醒线程
countDownLatch.await()
阻塞线程, 程序执行结果如下:
t1 启动..
t1 线程添加第 1 个元素..
t2 启动..
t1 线程添加第 2 个元素..
t1 线程添加第 3 个元素..
t1 线程添加第 4 个元素..
t1 线程添加第 5 个元素..
已经发出通知..
Exception in thread "t2" java.lang.RuntimeException: t2 线程接到通知 size = 5 线程停止..
t1 线程添加第 6 个元素..
- at com.github.binarylei.thread._2_1conn.ListAdd3$2.run(ListAdd3.java:47)
- at java.lang.Thread.run(Thread.java:745)
t1 线程添加第 7 个元素..
t1 线程添加第 8 个元素..
t1 线程添加第 9 个元素..
t1 线程添加第 10 个元素..
四, ThreadLocal
ThreadLocal 是线程局部变量, 是一种多线程间并发访问变量的无锁解决方案.
ThreadLocal 和 synchronized 比较?
与 synchronized 等加锁的方式不同, ThreadLocal 完全不提供锁, 而使用以空间换时间的手段, 为每个线程提供变量的独立副本, 以保障线程安全.
从性能上说, ThreadLocal 不具有绝对的优势, 在并发不是很高的时候, 加锁的性能会更好, 但作为一套无锁的解决方案, 在高并发量或者竞争激烈的场景, 使用 ThreadLocal 可以在一定程度上减少锁竞争.
- public static void main(String[] args) throws InterruptedException {
- final ThreadLocal<String> th = new ThreadLocal<String>();
- Thread t1 = new Thread(new Runnable() {
- public void run() {
- th.set("张三");
- System.out.println(th.get()); // => "张三"
- }
- }, "t1");
- Thread t2 = new Thread(new Runnable() {
- public void run() {
- try {
- Thread.sleep(1000);
- th.set("李四");
- System.out.println(th.get()); // => "李四"
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }, "t2");
- t1.start(); //t1: 张三
- t2.start(); //t2: 李四
- }
五, 自定义同步类窗口 - Queue
Java 提供了一些同步类容器, 它们是 线程安全 的, 如 Vector,HashTable 等. 这些同步类容器是由
Collections.synchronizedMap
等工厂方法去创建实现的, 底层使用 synchronized 关键字, 每次只有一个线程访问容器. 下面实现一个自己的同步类窗口.
- import java.util.LinkedList;
- public class MyQueue {
- private LinkedList list = new LinkedList();
- private int max = 5;
- private int min = 1;
- private Object lock = new Object();
- public void put(Object obj) { // (1)
- synchronized (lock) {
- while (list.size() == max) {
- try {
- lock.wait();
- } catch (InterruptedException e) {
- ;
- }
- }
- list.add(obj);
- lock.notify();
- System.out.println("put 元素:" + obj);
- }
- }
- public Object take() { // (2)
- Object obj;
- synchronized (lock) {
- while (list.size() == min) {
- try {
- lock.wait();
- } catch (InterruptedException e) {
- ;
- }
- }
- obj = list.removeFirst();
- lock.notify();
- System.out.println("take 元素:" + obj);
- }
- return obj;
- }
- }
测试
- public static void main(String[] args) {
- final MyQueue myQueue = new MyQueue();
- myQueue.put("a");
- myQueue.put("b");
- myQueue.put("c");
- myQueue.put("d");
- myQueue.put("e");
- new Thread(new Runnable() {
- @Override
- public void run() {
- myQueue.put("f");
- myQueue.put("g");
- myQueue.put("h");
- myQueue.put("i");
- }
- }).start();
- new Thread(new Runnable() {
- @Override
- public void run() {
- myQueue.take();
- myQueue.take();
- myQueue.take();
- }
- }).start();
- }
每天用心记录一点点. 内容也许不重要, 但习惯很重要!
来源: http://www.bubuko.com/infodetail-2590220.html