有时候要测试一下某个功能的并发能力, 又不要想借助于其他测试工具, 索性就自己写简单的 demo 模拟一个并发请求就最方便了. 如果熟悉 jemter 的测试某接口的并发能力其实更专业, 此处只是自己折腾着玩.
CountDownLatch 和 CyclicBarrier 是 jdk concurrent 包下非常有用的两个并发工具类, 它们提供了一种控制并发流程的手段. 其实查看源码它们都是在内部维护了一个计数器控制流程的
CountDownLatch: 一个或者多个线程, 等待其他多个线程完成某件事情之后才能执行;
CyclicBarrier: 多个线程互相等待, 直到到达同一个同步点, 再继续一起执行.
CountDownLatch 和 CyclicBarrier 的区别
CountDownLatch 的计数器, 线程完成一个记录一个, 计数器是递减 计数器, 只能使用一次
CyclicBarrier 的计数器 更像是一个阀门, 需要所有线程都到达, 阀门才能打开, 然后继续执行, 计数器是递增 计数器提供 reset 功能, 可以多次使用
另外 Semaphore 可以控同时访问的线程个数, 通过 acquire() 获取一个许可, 如果没有就等待, 而 release() 释放一个许可.
通常我们模拟并发请求, 一般都是多开几个线程, 发起请求就好了. 但是方式, 一般会存在启动的先后顺序了, 算不得真正的同时并发! 怎么样才能做到真正的同时并发呢? 是本文想说的点, java 中提供了闭锁 CountDownLatch, CyclicBarrier 刚好就用来做这种事就最合适了.
下面分别使用 CountDownLatch 和 CyclicBarrier 来模拟并发的请求
CountDownLatch 模拟
- package com.test;
- import java.io.BufferedReader;
- import java.io.IOException;
- import java.io.InputStream;
- import java.io.InputStreamReader;
- import java.io.OutputStream;
- import java.NET.HttpURLConnection;
- import java.NET.MalformedURLException;
- import java.NET.URL;
- import java.util.concurrent.CountDownLatch;
- public class LatchTest {
- public static void main(String[] args) throws InterruptedException {
- Runnable taskTemp = new Runnable() {
- // 注意, 此处是非线程安全的, 留坑
- private int iCounter;
- @Override
- public void run() {
- for(int i = 0; i < 10; i++) {
- // 发起请求
- // HttpClientOp.doGet("https://www.baidu.com/");
- iCounter++;
- System.out.println(System.nanoTime() + "[" + Thread.currentThread().getName() + "] iCounter =" + iCounter);
- try {
- Thread.sleep(100);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
- };
- LatchTest latchTest = new LatchTest();
- latchTest.startTaskAllInOnce(5, taskTemp);
- }
- public long startTaskAllInOnce(int threadNums, final Runnable task) throws InterruptedException {
- final CountDownLatch startGate = new CountDownLatch(1);
- final CountDownLatch endGate = new CountDownLatch(threadNums);
- for(int i = 0; i < threadNums; i++) {
- Thread t = new Thread() {
- public void run() {
- try {
- // 使线程在此等待, 当开始门打开时, 一起涌入门中
- startGate.await();
- try {
- task.run();
- } finally {
- // 将结束门减 1, 减到 0 时, 就可以开启结束门了
- endGate.countDown();
- }
- } catch (InterruptedException IE) {
- IE.printStackTrace();
- }
- }
- };
- t.start();
- }
- long startTime = System.nanoTime();
- System.out.println(startTime + "[" + Thread.currentThread() + "] All thread is ready, concurrent going...");
- // 因开启门只需一个开关, 所以立马就开启开始门
- startGate.countDown();
- // 等等结束门开启
- endGate.await();
- long endTime = System.nanoTime();
- System.out.println(endTime + "[" + Thread.currentThread() + "] All thread is completed.");
- return endTime - startTime;
- }
- }
执行结果
CyclicBarrier 模拟
- // 与 闭锁 结构一致
- public class LatchTest {
- public static void main(String[] args) throws InterruptedException {
- Runnable taskTemp = new Runnable() {
- private int iCounter;
- @Override
- public void run() {
- // 发起请求
- // HttpClientOp.doGet("https://www.baidu.com/");
- iCounter++;
- System.out.println(System.nanoTime() + "[" + Thread.currentThread().getName() + "] iCounter =" + iCounter);
- }
- };
- LatchTest latchTest = new LatchTest();
- // latchTest.startTaskAllInOnce(5, taskTemp);
- latchTest.startNThreadsByBarrier(5, taskTemp);
- }
- public void startNThreadsByBarrier(int threadNums, Runnable finishTask) throws InterruptedException {
- // 设置栅栏解除时的动作, 比如初始化某些值
- CyclicBarrier barrier = new CyclicBarrier(threadNums, finishTask);
- // 启动 n 个线程, 与栅栏阀值一致, 即当线程准备数达到要求时, 栅栏刚好开启, 从而达到统一控制效果
- for (int i = 0; i < threadNums; i++) {
- Thread.sleep(100);
- new Thread(new CounterTask(barrier)).start();
- }
- System.out.println(Thread.currentThread().getName() + "out over...");
- }
- }
- class CounterTask implements Runnable {
- // 传入栅栏, 一般考虑更优雅方式
- private CyclicBarrier barrier;
- public CounterTask(final CyclicBarrier barrier) {
- this.barrier = barrier;
- }
- public void run() {
- System.out.println(Thread.currentThread().getName() + "-" + System.currentTimeMillis() + "is ready...");
- try {
- // 设置栅栏, 使在此等待, 到达位置的线程达到要求即可开启大门
- barrier.await();
- } catch (InterruptedException e) {
- e.printStackTrace();
- } catch (BrokenBarrierException e) {
- e.printStackTrace();
- }
- System.out.println(Thread.currentThread().getName() + "-" + System.currentTimeMillis() + "started...");
- }
- }
执行结果
并发请求操作流程示意图如下:
此处设置了一道门, 以保证所有线程可以同时生效. 但是, 此处的同时启动, 也只是语言层面的东西, 也并非绝对的同时并发. 具体的调用还要依赖于 CPU 个数, 线程数及操作系统的线程调度功能等, 不过咱们也无需纠结于这些了, 重点在于理解原理!
来源: http://www.bubuko.com/infodetail-2802517.html