前面将报警规则的制定加载解析, 以及报警执行器的定义加载和扩展进行了讲解, 基本上核心的内容已经完结, 接下来剩下内容就比较简单了
报警频率的统计
报警线程池
对外封装统一可用的解耦
I. 报警频率统计
1. 设计
前面在解析报警规则时, 就有一个 count 参数, 用来确定具体选择什么报警执行器的核心参数, 我们维护的方法也比较简单:
针对报警类型, 进行计数统计, 没调用一次, 则计数 + 1
每分钟清零一次
2. 实现
因为每种报警类型, 都维护一个独立的计数器
定义一个 map 来存储对应关系
private ConcurrentHashMap<String, AtomicInteger> alarmCountMap;
每分钟执行一次清零
- // 每分钟清零一把报警计数
- ScheduledExecutorService scheduleExecutorService = Executors.newScheduledThreadPool(1);
- scheduleExecutorService.scheduleAtFixedRate(() -> {
- for (Map.Entry<String, AtomicInteger> entry : alarmCountMap.entrySet()) {
- entry.getValue().set(0);
- }
- }, 0, 1, TimeUnit.MINUTES);
注意上面的实现, 就有什么问题?
有没有可能因为 map 中的数据过大 (或者 gc 什么原因), 导致每次清零花不少的时间, 而导致计数不准呢? (先不给出回答)
计数加 1 操作
- /**
- * 线程安全的获取报警总数 并自动加 1
- *
- * @param key
- * @return
- */
- private int getAlarmCount(String key) {
- if (!alarmCountMap.containsKey(key)) {
- synchronized (this) {
- if (!alarmCountMap.containsKey(key)) {
- alarmCountMap.put(key, new AtomicInteger(0));
- }
- }
- }
- return alarmCountMap.get(key).addAndGet(1);
- }
II. 报警线程池
目前也只是提供了一个非常简单的线程池实现, 后面的考虑是抽象一个基于 forkjoin 的并发框架来处理 (主要是最近接触到一个大神基于 forkjoin 写的并发器组件挺厉害的, 所以等我研究透了, 山寨一个)
- // 报警线程池
- private ExecutorService alarmExecutorService = new ThreadPoolExecutor(3, 5, 60,
- TimeUnit.SECONDS,
- new LinkedBlockingDeque<>(10),
- new DefaultThreadFactory("sms-sender"),
- new ThreadPoolExecutor.CallerRunsPolicy());
任务提交执行
- private void doSend(final ExecuteHelper executeHelper,
- final AlarmContent alarmContent) {
- alarmExecutorService.execute(() ->
- executeHelper.getIExecute().sendMsg(
- executeHelper.getUsers(),
- alarmContent.getTitle(),
- alarmContent.getContent()));
- }
III. 接口封装
这个就没什么好说的了
- public void sendMsg(String key, String content) {
- sendMsg(new AlarmContent(key, null, content));
- }
- public void sendMsg(String key, String title, String content) {
- sendMsg(new AlarmContent(key, title, content));
- }
- /**
- * 1. 获取报警的配置项
- * 2. 获取当前报警的次数
- * 3. 选择适当的报警类型
- * 4. 执行报警
- * 5. 报警次数 + 1
- *
- * @param alarmContent
- */
- private void sendMsg(AlarmContent alarmContent) {
- try {
- // get alarm config
- AlarmConfig alarmConfig = confLoader.getAlarmConfig(alarmContent.key);
- // get alarm count
- int count = getAlarmCount(alarmContent.key);
- alarmContent.setCount(count);
- ExecuteHelper executeHelper;
- if (confLoader.alarmEnable()) { // get alarm execute
- executeHelper = AlarmExecuteSelector.getExecute(alarmConfig, count);
- } else { // 报警关闭, 则走空报警流程, 将报警信息写入日志文件
- executeHelper = AlarmExecuteSelector.getDefaultExecute();
- }
- // do send msg
- doSend(executeHelper, alarmContent);
- } catch (Exception e) {
- logger.error("AlarmWrapper.sendMsg error! content:{}, e:{}", alarmContent, e);
- }
- }
接口封装完毕之后如何使用呢?
我们使用单例模式封装了唯一对外使用的类 AlarmWrapper, 使用起来也比较简单, 下面就是一个测试 case
- @Test
- public void sendMsg() throws InterruptedException {
- String key = "NPE";
- String title = "NPE 异常";
- String msg = "出现 NPE 异常了!!!";
- AlarmWrapper.getInstance().sendMsg(key, title, msg); // 微信报警
- // 不存在异常配置类型, 采用默认报警, 次数较小, 则直接部署出
- AlarmWrapper.getInstance().sendMsg("zzz", "不存在 xxx 异常配置", "报警嗒嗒嗒嗒");
- Thread.sleep(1000);
- }
使用起来比较简单, 就那么一行即可, 从这个使用也可以知道, 整个初始化, 就是在这个对象首次被访问时进行
构造函数内容如下:
- private AlarmWrapper() {
- // 记录每种异常的报警数
- alarmCountMap = new ConcurrentHashMap<>();
- // 加载报警配置信息
- confLoader = ConfLoaderFactory.loader();
- // 初始化线程池
- initExecutorService();
- }
所有如果你希望在自己的应用使用之前就加载好所有的配置, 不妨提前执行一下
AlarmWrapper.getInstance()
IV. 小结
基于此, 整个系统设计基本上完成, 当然代码层面也 ok 了, 剩下的就是使用手册了
再看一下我们的整个逻辑, 基本上就是下面这个流程了
提交报警
封装报警内容 (报警类型, 报警主题, 报警内容)
维护报警计数 (每分钟计数清零, 每个报警类型对应一个报警计数)
选择报警
根据报警类型选择报警规则
根据报警规则, 和当前报警频率选择报警执行器
若不开启区间映射, 则返回默认执行器
否则遍历所有执行器的报警频率区间, 选择匹配的报警规则
执行报警
封装报警任务, 提交线程池
报警执行器内部实现具体报警逻辑
V. 其他
来源: https://juejin.im/post/5a9a2189f265da238b7d75cd