更改项目需求以及项目之前阻塞模式问题的叙述已经在上一篇说过了, 详情可参考: https://www.cnblogs.com/darope/p/10276213.html ,https://yq.aliyun.com/articles/679654 两篇文章的介绍.
一, 关于 brpop 为什么要更改, 这里简单分析一下原版本的阻塞代码.
- @Override
- public void readyForControl(Service.ControlRequest request, StreamObserver<Service.ControlResponse> responseObserver) {
- byte[] uuidByte = request.getH().getId().toByteArray();
- JUUID juuid = new JUUID(uuidByte);
- String uuid = juuid.toString();
- logger.info("readyForControl uuid:" + uuid);
- // agent 上线
- Long onlineTime = System.currentTimeMillis();
- redisService.set(ONLINE_PREFIX + uuid, String.valueOf(System.currentTimeMillis()));
- onlineAgent(uuid);
- while (true) {
- try {
- // 暂时没有更好的办法处理, 降低两个 while 同时守护任务 Redis 的可能性
- if (needBreak(uuid, onlineTime)) {
- break;
- }
- List<Task> tasks = taskRedisMap.brpop(uuid);
- if (Objects.isNull(tasks)) {
- continue;
- }
- for(Task task : tasks) {
- //agent 重启后丢失一个任务; 老的 rpc 通道收到任务放回机制
- if (needBreak(uuid, onlineTime)) {
- taskRedisMap.pushTask(task);
- continue;
- }
- logger.info("task get uuid:" + uuid + "nodeId:" + task.getNodeId());
- Service.ControlResponse.ControlCmd controlCmd = Service.ControlResponse.ControlCmd.forNumber(task.getTaskType());
- Service.ControlResponse response = null;
- assert controlCmd != null;
- // 根据任务类型分配任务
- response = getControlResponseOption(task, controlCmd, null);
- logger.info("cmd:" + controlCmd + "nodeId" + task.getNodeId());
- if (Objects.isNull(response)) {
- logger.info("empty response. nodeId" + task.getNodeId());
- return;
- }
- // 通知业务调用方
- readyForControlEvent(task);
- logger.info("readyForControlEvent");
- task.setTaskStatus(TaskStatusEnum.BUSY);
- task.setStartExecTimeout(System.currentTimeMillis());
- task.setReceiveEvent(true);
- taskRedisMap.update(task);
- logger.info("onNext ...");
- responseObserver.onNext(response);
- logger.info("onNext OK...");
- }
- } catch (Throwable e) {
- logger.error("readyForControl 异常, uuid={}", e, uuid);
- }
- }
- }
客户端在服务端注册好自己传送过来的数据后, 调用 readyForControl, 请求服务端下发命令, 有几个 agent 客户端主机, 就会调用几次. 相同 agent 再次上线这里就会出现一个很大的问题, 原来的 agent 没有下线, 相同的 agent 再次上线, 这里会再次调用 readyForControl. 意味着相同的 agent 调用了两次, 而且新上线的 agent 后调用 readyForControl. 如果采用 brpop 的方式, 意味着一开始上线的 agent 调用 readyForControl 已经拿走了消息列队的 task 任务, 后来的只能拿不到, 空指针异常. 这里采用了一个不是办法的办法, 就是写一个死循环, 监听 agent 上线动作, 比对一下, 如果这个 agent 是后来上线的, 就会 break 掉, 杜绝了异常的发生. 但是这个操作会显得很臃肿, 而且效率不太好.
二, 更改为订阅模式或许会解决以上问题, 原因如下:
a. readyForControl 中, 只有一个订阅方法, 简洁很多
b. 不需要判断是不是相同 agent 上线的问题, 虽然新上线的 agent 跟之前的 agent 是同一个 agent, 但是跟 Redis 的发布订阅模式不冲突, 老的 agent 也会订阅到消息, 新的 agent 也会订阅到消息. 避免了一个大的用于判断 agent 新旧问题的死循环.
c. 效率更高, Redis 底层是 c 语言实现的, 借助 Redis 的机制来解决问题, 往往比自己实现逻辑来解决问题, 从本质上看来要可取.
三, 更改的过程中遇到的坑:
很遗憾, 很多坑是我想当然的以为造成的, 并没有严谨的考虑软件工程的思想以及大型程序运行的理论情形. 对此只会让我以为我还有很多的东西要学, 现在的出错, 只是为了记忆更深刻吧. 下面由浅入深做简单总结:
a. 从简单订阅模式, 到多线程订阅模式.
订阅模式本身是 Redis 自带的方法, 但是订阅模式是恒阻塞的, 一旦进入订阅的方法, 就会一直监听发布方是否发布了消息, 导致监听阻塞, 无法使调用方程序顺序执行. 虽然订阅方法父类有 onMessage 方法可以终止订阅, 但是不满足需要监听 agent 上线的逻辑策略. 对此需要增加多线程实现, 把订阅方法写到线程空间中去.
- @Override
- public void readyForControl(Service.ControlRequest request, StreamObserver<Service.ControlResponse> responseObserver) {
- byte[] uuidByte = request.getH().getId().toByteArray();
- JUUID juuid = new JUUID(uuidByte);
- String uuid = juuid.toString();
- Long agentId = taskRedisMap.getIdByUuid(uuid);
- // 调用订阅者线程
- SubThread subThread;
- subThread = new SubThread(redisService.getJedisPool(), agentId, responseObserver, taskRedisMap, applicationContext);
- subThread.start();
- logger.info("readyForControl uuid:" + uuid);
- // agent 上线
- redisService.set(ONLINE_PREFIX + uuid, String.valueOf(System.currentTimeMillis()));
- onlineAgent(uuid);
- }
更改之后的代码采用多线程开启订阅方法, 删除死循环维护 agent 上线的问题. 当多 agent 上线时, 会为每一个 agent 客户端开启一个属于自己的订阅方法, 由于 brpop 方式采用的是 uuid 转化为 agentId 对比任务 agentId 的方式, 以此来保证任务下发的准确性, 我就把频道更改为 uuid, 保证了任务下发的准确性.
b. 从专用频道订阅模式, 到通用频道订阅模式.
企业级项目必须考虑到资源的损耗和浪费情况, 如果每一个上线 agent 客户端均使用专用频道, 会增加 Redis 的负荷, 严重会让 Redis 睡觉. 如此看来为每一个 agent 开一个以 agent 的 id 相关的字符串为该 agent 的通道的话, 是绝对不可取的. 在师兄的引导下, 为此我折腾了一个下午, 目的就是不采用专用通道, 采用通用通道, 即所有任务 shell 的发布和订阅都在一个频道, 是谁的谁自己来领取. 但是怎么领取, 最后我通过把 uuid 传到订阅线程中, 从 onMessage 中转化为任务序列对比发布中的任务序列号, 取到我需要的 task 然后 return 到调用方. 看起来还不错, 我比较满意.
c. 潜在危险正在注视着我, 程序不是我想让他怎么运行就怎么运行
如我所想, 数据我是拿到了, 接着我在 readyForControl 调用这个线程后, 取到 agentId 对应的所有任务列表, 这样我就可以使用这个任务列表 onNext 到客户端啦, 像下面这样:
- // 通知业务调用方
- readyForControlEvent(task);
- logger.info("readyForControlEvent");
- task.setTaskStatus(TaskStatusEnum.BUSY);
- task.setStartExecTimeout(System.currentTimeMillis());
- task.setReceiveEvent(true);
- // 不通
- taskRedisMap.update(task);
- logger.info("onNext ...");
- responseObserver.onNext(response);
- logger.info("onNext OK...");
但是下面的方法是取不到我的 task 任务列表的所有数据的, 原因是, 当我进入到我的线程后, 我执行订阅方法, 对比我传入的 uuid 拿到属于该 agent 的一个 task. 然后调用这个线程的方法就会顺序执行了. 线程仍然存在, 只是再也没人调用了, readyForControl 代码程序一旦顺序执行, 就回不到调用线程的那个代码位置了. 尴尬的是, 理论上, 我的 task 列表里面只会有一条 task.
d. 没法在 readyForControl 中拿到所有 task 的列表, 我必须在线程里面单个处理, 仔细想想, 效率好像还提升了
逆行思维真的是很好的方式, 他会使你在向左走不通的情况下会考虑向右走一走, 最终走出这个死胡同. 程序封装的目的在于统一处理, 正常的方式是我所有 task 存入到我的 list 列表中, return 到调用方, 在 readyForControl 中统一 onNext 到 agent 客户端. 线程方式这种走不通, 只能把接下来所有操作 task 的代码传到线程中去, 在线程中一个一个 onNext 到客户端. 首先要做的是把需要用到的类实例传到线程中去, 该传进去的传进去, 该注入的注入到线程空间中去. 然后每次收到订阅消息 message, 我都把这个 message 转化为对应 agent 的 task 最后 onNext 下发到客户端. 看起来还不错, 但是即将迎来一个大坑.
e. 程序没报错, 为什么线程空间中的实例, 会频繁的报空指针?
代码看着已经没什么问题, 逻辑上也是可行的, 但测试的时候, 老是空指针. 查阅资料, 发现 Spring 为了安全, 禁止向线程空间中注入 bean. 网上的解决办法很多, 我需要注入的就是两个操作 task 任务流的 bean, 所以就采用了最简单的传递参数的方式, 外层先注入我需要的 bean, 然后当成调用线程的方法的参数. 线程方使用私有变量初始化类, 不采用注入的方式, 然后通过构造方法拿到传进来的类实例.
f. 或许你认为最不应该有问题的地方出现了问题
最终代码已经差不多可以使用了, 但是偶尔会抛异常, 检查了一晚上发现是 jdk 中操作 list 的问题. 至今不是很明白, 也希望有读到的大神给与评论原因. 一开始的逻辑, 在对比是不是我这个上线 agent 的 task 的时候, 我采用一个 if 判断. 在任务列表 tasks 不可能为空的情况下, if( 上线 agentId.compareToIgnoreCase(发布方发布的 Task 中的 agentId) != 0 ) 从 tasks 列表中移除这个不匹配的 task, 采用 tasks.remove(task) 的方式, else 下发这个任务到客户端 ------------》 更改为 if( 上线 agentId.compareToIgnoreCase(发布方发布的 Task 中的 agentId) == 0 ) 下发任务到客户端, else 不做处理. 就解决了异常问题, 看似两个逻辑是一样的, 或许是 remove 操作列表有什么需要注意的吧.
最终所有操作都在线程空间中处理, 订阅线程继承的的 onMessage 方法中, 分布对订阅到的 task 单独处理, 肢解了圆来 readyForControl 的代码:
- @Override
- public void onMessage(String channel, String message) { // 收到消息会调用
- logger.info("收到了发布者的消息, 频道为: {}, 消息为: {}", channel, message);
- tasks.add(message);
- key = TASK_PENDING_PREFIX + agentId;
- List<Map> taskList = tasks.stream().map(k -> Json2.fromJson(k, Map.class)).collect(Collectors.toList());
- if (taskList.size() == 0) {
- return;
- }
- // 筛选出 ShellTask
- List<ShellTask> shellTaskList = taskList.stream().filter(t -> Objects.equals(t.get("execType"), ExecScriptType.SHELL.getCode())).map(t -> Json2.fromJson(Json2.toJson(t), ShellTask.class)).collect(Collectors.toList());
- if (shellTaskList.size() == 0) {
- return;
- }
- List<Task> task_ = shellTaskList.stream().filter(t -> Objects.equals(t.getTaskStatus(), TaskStatusEnum.NOT_OPERATED)).collect(Collectors.toList());
- logger.info("task list : {}", task_);
- // 返回携带特定 uuid 订阅者 agent 的 task
- for (Task task : task_) {
- String keyPub = TASK_PENDING_PREFIX + task.getAgentId();
- logger.info("keyPub {}", keyPub);
- if (key.compareToIgnoreCase(keyPub) == 0){
- logger.info("task get uuid:" + key + "nodeId:" + task.getNodeId());
- Service.ControlResponse.ControlCmd controlCmd = Service.ControlResponse.ControlCmd.forNumber(task.getTaskType());
- Service.ControlResponse response = null;
- assert controlCmd != null;
- // 根据任务类型分配任务
- response = getControlResponseOption(task, controlCmd, null);
- logger.info("cmd:" + controlCmd + "nodeId" + task.getNodeId());
- if (Objects.isNull(response)) {
- logger.info("empty response. nodeId" + task.getNodeId());
- return;
- }
- // 通知业务调用方
- readyForControlEvent(task);
- logger.info("readyForControlEvent");
- task.setTaskStatus(TaskStatusEnum.BUSY);
- task.setStartExecTimeout(System.currentTimeMillis());
- task.setReceiveEvent(true);
- // 不通
- taskRedisMap.update(task);
- logger.info("onNext ...");
- responseObserver.onNext(response);
- logger.info("onNext OK...");
- }
- }
- }
- View Code
接下来的优化策略是, 判断 agent 上线时间, 如果是相同 agent 再次上线, 可以考虑让以前的 agent 下线, 而非继续订阅, 虽然继续订阅不会影响程序正常使用, 也不需要像 brpop 的方式来维护消息列队中的 task, 但是当 agent 某个客户端反复上线下线, 也会造成不必要的订阅资源浪费, 所以程序还是需要判断哪些 agent 需要下线处理.
因为是实习第一阶段, 自己还算个小白, 很多思考不到的地方, 踩了不少坑, 特此记录.
来源: https://www.cnblogs.com/darope/p/10277849.html