1. 需求描述
1.1 场景说明:
由于, 微信端的业务需求量越来越大. 将业务与微信第三方事件处理耦合在一起的单一项目的结构已经逐渐暴露出, 承载能力不足的缺点. 所以, 需要将与微信的交互从业务逻辑中分离出, 单独进行管理和处理. 这样做有以下几点好处:
可以达到业务解耦分离.
可以为业务系统微服务化做准备.
可以在解耦后针对性的对不同业务系统进行优化.
减少业务系统错误的影响面.
1.2 技术难点说明:
微信中的通过 http 调用客户配置的回调地址的方式来进行事件通知. 事件通知分为两种类型:
发送 http 请求和数据以后, 客户服务器默认回复 success 字符串. 然后, 业务系统业务处理完成后通过指定的 http 地址通知微信方
发送 http 请求和数据的同一个请求中需要客户服务器在 response 中返回业务处理的结果.
由于, 我们已经将事件通知从主业务系统中抽离出. 所以, 微信事件管理系统会在收到微信事件通知以后, 通过 mq 的方式进行发布事件. 但是, 事件通知的第二种类型需要在一个 http 请求中将业务处理数据带回微信方. 此时, 就需要微信事件管理系统阻塞微信方发送来的 http 请求, 直到业务系统处理完业务数据并返回给系统. 这样, 我们就需要有一个灵活, 可靠的容器对被阻塞的微信方请求进行管理.
2. 理论基础
2.1Future 多线程模型:
2.1.1 模型介绍:
future 多线程模型, 是一种比较常用的多线程阻塞回调的模式. 具体逻辑结构如下图所示:
具体处理逻辑是这样的. 当一个请求发送到一个 future 模式的入口以后, 此时这个线程是阻塞的. 这时 future 的线程会进行后面的回调业务, 或者是直接开始等待. 直到, 其他线程唤醒 future 线程或者 future 等待超时. 此时, future 线程唤醒, 然后将具体的结果返回给调用线程.
- ThreadHolder(消息载体):
- import lombok.Data;
- import java.util.concurrent.Callable;
- /**
- * <p>
- * Description: com.javanewb.service
- * </p>
- * date:2017/10/31
- *
- * @author Dean.Hwang
- */
- @Data
- public abstract class ThreadHolder<T> implements Callable<T> {
- protected abstract T proData();//TODO 正常逻辑处理, 以及默认数据返回
- private T defaultData;// 返回的默认数据
- private Object needProData;// 接受到需要处理的数据
- private Long createTime = System.currentTimeMillis();
- private Long maxWaitTime;
- private String mdc;
- private RequestHolder<T> holder;
- @Override
- public T call() throws Exception {
- waitThread();
- System.out.println("Thread mdc:" + mdc + "notify");
- if (needProData == null) {
- holder.removeThread(mdc, false);
- return defaultData;
- }
- return proData();
- }
- public synchronized void waitThread() throws InterruptedException {
- this.wait(maxWaitTime);
- }
- public synchronized void notifyThread(Object needProData) {
- this.needProData = needProData;
- this.notify();
- }
- public synchronized void notifyDefault() {
- this.notify();
- }
- }
- RequestHolder(请求管理容器):
- import java.util.List;
- import java.util.Map;
- import java.util.concurrent.*;
- import java.util.concurrent.atomic.AtomicBoolean;
- /**
- * <p>
- * Description: com.javanewb.entity
- * </p>
- * date:2017/10/26
- *
- * @author Dean.Hwang
- */
- public class RequestHolder<T> {
- private Integer maxSize;
- private Long waitTime;
- public RequestHolder(Integer maxSize, Long maxWait, ExecutorService executorService) {
- if (maxSize> 1000) {
- throw new BusinessException(1022, "Bigger than max size num");
- }
- this.maxSize = maxSize;
- this.waitTime = maxWait;
- if (executorService != null) {
- this.executorService = executorService;
- } else {
- this.executorService = new ThreadPoolExecutor(Math.max(1, maxSize / 5), maxSize, 10, TimeUnit.SECONDS, new ArrayBlockingQueue<>(maxSize));
- }
- }
- public RequestHolder(Integer maxSize, Long maxWait) {
- if (maxSize> 1000) {
- throw new BusinessException(1022, "Bigger than max size num");
- }
- this.waitTime = maxWait;
- this.maxSize = maxSize;
- this.executorService = new ThreadPoolExecutor(Math.max(1, maxSize / 5), maxSize, 10, TimeUnit.SECONDS, new ArrayBlockingQueue<>(maxSize));
- }
- private ExecutorService executorService;
- private final Map<String, ThreadHolder<T>> holderMap = new ConcurrentHashMap<>();
- private List<String> mdcOrderList = new CopyOnWriteArrayList<>();
- private AtomicBoolean isCleaning = new AtomicBoolean(false);
- public ThreadHolder<T> removeThread(String mdc, boolean needNotifyDefault) {
- mdcOrderList.remove(mdc);
- ThreadHolder<T> holder;
- synchronized (holderMap) {
- holder = holderMap.get(mdc);
- holderMap.remove(mdc);
- }
- if (holder != null && needNotifyDefault) {
- holder.notifyDefault();
- }
- return holder;
- }
- public void notifyThread(String mdc, Object data) {
- ThreadHolder<T> holder = removeThread(mdc, false);
- if (holder != null) {
- holder.notifyThread(data);
- }
- }
- public Future<T> getFuture(String mdcStr, Class<? extends ThreadHolder<T>> holder) {
- if (StringUtil.isEmpty(mdcStr) || holder == null) {
- throw new BusinessException(1020, "Mdc target missing!!!");
- }
- Future<T> future;
- try {
- ThreadHolder<T> thread = holder.newInstance();
- holderMap.put(mdcStr, thread);
- mdcOrderList.add(mdcStr);
- thread.setMaxWaitTime(waitTime);
- thread.setMdc(mdcStr);
- thread.setHolder(this);
- future = executorService.submit(thread);
- cleanThreadPool();
- } catch (InstantiationException | IllegalAccessException e) {
- holderMap.remove(mdcStr);
- mdcOrderList.remove(mdcStr);
- throw new BusinessException(1021, "Thread Holder initialized failed");
- }
- return future;
- }
- private void cleanThreadPool() {
- if (mdcOrderList.size()>= maxSize && isCleaning.compareAndSet(false, true)) {
- try {
- mdcOrderList.subList(0, mdcOrderList.size() - maxSize).forEach(// 看测试效率, 看是否用并行 stream 处理
- mdc -> removeThread(mdc, true)
- );
- } finally {
- isCleaning.set(false);
- }
- }
- }
- }
- TestController(测试入口):
- import com.javanewb.entity.TestThreadHolder;
- import com.javanewb.thread.tools.RequestHolder;
- import com.keruyun.portal.common.filter.LoggerMDCFilter;
- import io.swagger.annotations.Api;
- import io.swagger.annotations.ApiOperation;
- import lombok.extern.slf4j.Slf4j;
- import org.slf4j.MDC;
- import org.springframework.web.bind.annotation.RequestMapping;
- import org.springframework.web.bind.annotation.RequestMethod;
- import org.springframework.web.bind.annotation.RestController;
- import javax.servlet.http.HttpServletRequest;
- import javax.servlet.http.HttpServletResponse;
- import java.io.IOException;
- import java.util.ArrayList;
- import java.util.List;
- import java.util.concurrent.ExecutionException;
- import java.util.concurrent.Future;
- /**
- * <p>
- * Description: com.javanewb.controller
- * </p>
- * <p>
- * Copyright: Copyright (c) 2015
- * </p>
- * <p>
- * </p>
- * date:2017/10/25
- *
- * @author Dean.Hwang
- */
- @Api
- @RestController
- @Slf4j
- public class TestController {
- private RequestHolder<String> holder = new RequestHolder<>(100, 500000L);
- private List<String> mdcList = new ArrayList<>();
- @ApiOperation(value = "请求同步测试", notes = "请求同步测试")
- @RequestMapping(value = "/async", method = RequestMethod.GET)
- public void async(HttpServletRequest request, HttpServletResponse response, String id) {
- Long startTime = System.currentTimeMillis();
- String mdc = MDC.get(LoggerMDCFilter.IDENTIFIER);
- mdcList.add(mdc);
- Future<String> future = holder.getFuture(id, TestThreadHolder.class);
- log.info(Thread.currentThread().getName());
- try {
- System.out.println(mdc + "Thread Wait");
- String result = future.get();
- response.getOutputStream().print(result);
- System.out.println("time:" + (System.currentTimeMillis() - startTime));
- } catch (IOException | ExecutionException | InterruptedException e) {
- e.printStackTrace();
- }
- }
- @ApiOperation(value = "释放 list 第一个", notes = "请求同步测试")
- @RequestMapping(value = "/notify", method = RequestMethod.GET)
- public String notifyFirst() {
- String mdc = mdcList.get(0);
- mdcList.remove(0);
- holder.notifyThread(mdc, "");
- return mdc;
- }
- @ApiOperation(value = "释放 list 第一个", notes = "请求同步测试")
- @RequestMapping(value = "/notifyThis", method = RequestMethod.GET)
- public String notifyThis(String mdc) {
- int idx = 0;
- for (int i = 0; i < mdcList.size(); i++) {
- if (mdcList.get(i).equals(mdc)) {
- idx = i;
- break;
- }
- }
- mdcList.remove(idx);
- holder.notifyThread(mdc, "");
- return mdc;
- }
- }
来源: https://juejin.im/post/5b8f6b87e51d450e3d2c8e61