java 中常见的线程安全模块
同步容器. 它的原理是将状态封装起来, 并对每个公有方法都实行同步, 使得每次只有 1 个线程能够访问容器的状态.
Vector 和 HashTable
Collections.synchronizedXXX 方法
同步容器的问题
这种方式使得对容器的访问都串行化, 严重降低了并发性, 如果多个线程来竞争容器的锁时, 吞吐量严重降低
对容器的多个方法的复合操作, 是线程不安全的, 比如一个线程负责删除, 另一个线程负责查询, 有可能出现越界的异常
并发容器. java.util.concurrent 包里面的一系列实现
Concurrent 开头系列. 以 ConcurrentHashMap 为例, 它的实现原理为分段锁. 默认情况下有 16 个, 每个锁守护 1/16 的散列数据, 这样保证了并发量能达到 16
分段锁缺陷在于虽然一般情况下只要一个锁, 但是遇到需要扩容等类似的事情, 只能去获取所有的锁
ConcurrentHashMap 一些问题
需要对整个容器中的内容进行计算的方法, 比如 size,isEmpty,contains 等等. 由于并发的存在, 在计算的过程中可能已进过期了, 它实际上就是个估计值, 但是在并发的场景下, 需要使用的场景是很少的.
以 ConcurrentHashMap 的 size 方法为例:
- /**
- * Returns the number of key-value mappings in this map. If the
- * map contains more than <tt>Integer.MAX_VALUE</tt> elements, returns
- * <tt>Integer.MAX_VALUE</tt>.
- *
- * @return the number of key-value mappings in this map
- */
- public int size() {
- // 为了能够算准数量, 会算 2 次, 如果两次算的不准, 就锁住再算
- final Segment<K,V>[] segments = this.segments;
- int size;
- boolean overflow; // true if size overflows 32 bits
- long sum; // sum of modCounts
- long last = 0L; // previous sum
- int retries = -1; // 第一轮的计算总数不重试
- try {
- for (;;) {
- if (retries++ == RETRIES_BEFORE_LOCK) {
- //RETRIES_BEFORE_LOCK 默认是 2
- for (int j = 0; j <segments.length; ++j)
- ensureSegment(j).lock(); // force creation
- }
- sum = 0L;
- size = 0;
- overflow = false;
- for (int j = 0; j < segments.length; ++j) {
- Segment<K,V> seg = segmentAt(segments, j);
- if (seg != null) {
- sum += seg.modCount;
- int c = seg.count;
- if (c <0 || (size += c) < 0)
- overflow = true;
- }
- }
- // 第一次计算的时候
- if (sum == last)
- break; // 如果前后两次数数一致, 就认为已经算好了
- last = sum;
- }
- } finally {
- if (retries> RETRIES_BEFORE_LOCK) {
- for (int j = 0; j <segments.length; ++j)
- segmentAt(segments, j).unlock();
- }
- }
- return overflow ? Integer.MAX_VALUE : size;
- }
复制代码
不能提供线程独占的功能
CopyOnWrite 系列. 以 CopyOnWriteArrayList 为例, 只在每次修改的时候, 进行加锁控制, 修改会创建并重新发布一个新的容器副本, 其它时候由于都是事实上不可变的, 也就不会出现线程安全问题
CopyOnWrite 的问题
每次修改都复制底层数组, 存在开销, 因此使用场景一般是迭代操作远多于修改操作
CopyOnWriteArrayList 的读写示例
- /**
- * Appends the specified element to the end of this list.
- *
- * @param e element to be appended to this list
- * @return <tt>true</tt> (as specified by {@link Collection#add})
- */
- public boolean add(E e) {
- final ReentrantLock lock = this.lock;
- lock.lock();
- try {
- Object[] elements = getArray();
- int len = elements.length;
- Object[] newElements = Arrays.copyOf(elements, len + 1);
- newElements[len] = e;
- setArray(newElements);
- return true;
- } finally {
- lock.unlock();
- }
- }
- /**
- * {@inheritDoc}
- *
- * @throws IndexOutOfBoundsException {@inheritDoc}
- */
- public E get(int index) {
- return get(getArray(), index);
- }
- /**
- * Gets the array. Non-private so as to also be accessible
- * from CopyOnWriteArraySet class.
- */
- final Object[] getArray() {
- return array;
- }
- private E get(Object[] a, int index) {
- return (E) a[index];
- }
复制代码
java 中的同步工具类
阻塞队列, BlockingQueue. 它提供了 put 和 take 方法, 在队列不满足各自条件时将产生阻塞
BlockingQueue 使用示例, 生产者 - 消费者
- public static void main(String[] args) throws Exception {
- BlockingQueue queue = new ArrayBlockingQueue(1024);
- Producer producer = new Producer(queue);
- Consumer consumer = new Consumer(queue);
- new Thread(producer).start();
- new Thread(consumer).start();
- }
- }
- public class Producer implements Runnable{
- protected BlockingQueue queue = null;
- public Producer(BlockingQueue queue) {
- this.queue = queue;
- }
- public void run() {
- try {
- queue.put("1");
- Thread.sleep(1000);
- queue.put("2");
- Thread.sleep(2000);
- queue.put("3");
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
- public class Consumer implements Runnable{
- protected BlockingQueue queue = null;
- public Consumer(BlockingQueue queue) {
- this.queue = queue;
- }
- public void run() {
- try {
- System.out.println(queue.take());
- System.out.println("Wait 1 sec");
- System.out.println(queue.take());
- System.out.println("Wait 2 sec");
- System.out.println(queue.take());
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
复制代码
输出为
- 1
- Wait 1 sec
- 2
- Wait 2 sec
- 3
复制代码
闭锁
CountDownLatch. 使多个线程等待一组事件发生, 它包含一个计数器, 表示需要等待的事件的数量, 每发生一个事, 就递减一次, 当减为 0 时, 所有事情发生, 允许 "通行"
CountDownLatch 示例:
- public class TestHarness{
- public long timeTasks(int nThreads,final Runnable task) throws InterruptedException {
- final CountDownLatch startGate = new CountDownLatch(1);
- final CountDownLatch endGate = new CountDownLatch(nThreads);
- for (int i=0;i<nThreads;i++){
- Thread t = new Thread(){
- public void run(){
- try {
- startGate.await();
- try {
- task.run();
- }finally {
- endGate.countDown();
- }
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- };
- t.start();
- }
- long start = System.nanoTime();
- startGate.countDown();
- endGate.await();
- long end=System.nanoTime();
- return end-start;
- }
- }
复制代码
启动门使主线程能够同时释放所有的工作线程, 结束门使得主线程能够等待最后一个线程执行完
FutureTask.Future.get 的如果任务执行完成, 则立即返回, 否则将阻塞直到任务完结, 再返回结果或者是抛出异常
信号量, Semaphore . 它管理着一组虚拟的许可, 许可的数量可通过构造函数指定, 在执行操作时首先获得许可, 并在使用后释放许可, 如果没有, 那么 accquire 将阻塞直到有许可.
Semaphore 示例
- public class BoundedHashSet<T>{
- private final Set<T> set;
- private final Semaphore sem;
- public BoundedHashSet(int bound) {
- this.set = Collections.synchronizedSet(new HashSet<T>());
- this.sem = new Semaphore(bound);
- }
- public boolean add(T o) throws InterruptedException {
- sem.acquire();
- boolean wasAdded = false;
- try {
- wasAdded = set.add(o);
- return wasAdded;
- }finally {
- if (!wasAdded){
- sem.release();
- }
- }
- }
- public boolean remove(Object o){
- boolean wasRemoved = set.remove(o);
- if(wasRemoved){
- sem.release();
- }
- return wasRemoved;
- }
- }
复制代码
栅栏. 它能阻塞一组线程直到某个事件发生. 与闭锁的区别:
所有线程必须同时到达栅栏位置, 才能继续执行. 闭锁用于等待事件, 而栅栏用于等待其它线程.
闭锁一旦进入终止状态, 就不能被重置, 它是一次性对象, 而栅栏可以重置
CyclicBarrier. 可以使一定数量的参与方反复地在栅栏位置汇集
CyclicBarrier 使用示例
- public static void main(String[] args) {
- // 第 k 步执行完才能执行第 k+1 步
- CyclicBarrier barrier = new CyclicBarrier(3,new StageKPlusOne());
- StageK[] stageKs = new StageK[3];
- for (int i=0;i<3;i++){
- stageKs[i] = new StageK(barrier,"k part"+(i+1));
- }
- for (int i=0;i<3;i++){
- new Thread(stageKs[i]).start();
- }
- }
- class StageKPlusOne implements Runnable{
- @Override
- public void run() {
- System.out.println("stage k over");
- System.out.println("stage k+1 start counting");
- }
- }
- class StageK implements Runnable{
- private CyclicBarrier barrier;
- private String stage;
- public StageK(CyclicBarrier barrier, String stage) {
- this.barrier = barrier;
- this.stage = stage;
- }
- @Override
- public void run() {
- System.out.println("stage"+stage+"counting...");
- try {
- TimeUnit.MILLISECONDS.sleep(500);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- System.out.println("stage"+stage+"count over");
- try {
- barrier.await();
- } catch (InterruptedException e) {
- e.printStackTrace();
- } catch (BrokenBarrierException e) {
- e.printStackTrace();
- }
- }
- }
复制代码
输出为
- stage k part 1 counting...
- stage k part 3 counting...
- stage k part 2 counting...
- stage k part 2 count over
- stage k part 3 count over
- stage k part 1 count over
- stage k over
- stage k+1 start counting
复制代码
Exchanger. 它是一种两方栅栏, 各方在栅栏位置交换数据
Exchanger 使用示例:
- public static void main(String[] args) {
- Exchanger exchanger = new Exchanger();
- ExchangerRunnable er1 = new ExchangerRunnable(exchanger,"1");
- ExchangerRunnable er2 = new ExchangerRunnable(exchanger,"2");
- new Thread(er1).start();
- new Thread(er2).start();
- }
- class ExchangerRunnable implements Runnable{
- private Exchanger e;
- private Object o;
- public ExchangerRunnable(Exchanger e, Object o) {
- this.e = e;
- this.o = o;
- }
- @Override
- public void run() {
- Object pre=o;
- try {
- o=e.exchange(o);
- System.out.println("pre:"+pre+"now:"+o);
- } catch (InterruptedException e1) {
- e1.printStackTrace();
- }
- }
- }
来源: https://juejin.im/post/5b9cbc1ee51d450e4437b670