在我初学编程的时候, 还没写过完整点的项目就看过了一些高阶概念. 在没有实践时, 这些概念的神奇和强大之处很难被完全体会的. 而一旦自己在摸索中应用了, 瞬间觉得打开了一扇大门, 技能又提升了一个层次. 控制反转 (Inversion of Control) 就是这些强大概念之一. 一年前在 MPJ 老师的频道上了解到了, 但一直没自己独立创造场景用过. 直到最近在项目中遇到个坑才用起来.
其实控制反转或者依赖注入 (这两个感觉是同一个东西, 看你从什么角度看) 在前端框架中已经大量使用了. 最早由 Angular 普及, 后续的现代框架都有应用. 比如 React 开发中目前最火的组件设计模式 Render Props, 就是控制反转的一种应用. 离开框架, 在日常开发中, 应用这种技巧可以帮助我们解决很多棘手的问题, 今天就讲下我在开发中的一次应用.
项目场景是这样的: 技术栈是 Nuxt + vuex. 项目需要连接 web Socket, 然后根据 Socket 传来的数据, 对 Vuex 里面相应的数据进行修改. 公司为了节约成本, 将 Socket 数据压缩了, 而且不是全量推送, 这要求前端收到数据后对数据进行解压, 然后对数据进行遍历查找, 更新, 重新计算和排序, 总之对 Socket 数据的处理非常复杂. 为了不影响性能, 我把 Socket 连接和数据处理放进了 Web Worker. 先来看下项目结构.
下面是我封装的一个 Socket 工厂函数:
- // utils/socket.JS
- export default function Socket() {
- let heartBeat; // 心跳记录
- let lost = 0; // 心跳失败次数
- function decLost() {
- lost -= 1;
- }
- function connect() {
- const socket = new WebSocket("wss://xx.com");
- socket.onopen = () => {
- heartBeat = setInterval(() => {
- socket.send(2);
- lost += 1;
- if (lost === 3) {
- // 心跳失败超过 3 次, 断开重连
- clearInterval(heartBeat);
- socket.close();
- connect();
- }
- }, 5000);
- };
- socket.onerror = () => {
- clearInterval(heartBeat);
- socket.close();
- };
- socket.onclose = () => {
- setTimeout(() => {
- clearInterval(heart);
- connect();
- }, 3000);
- };
- return socket;
- }
- return Object.freeze({ decLost, connect });
- }
Socket 连接实现了心跳机制. onopen 之后, 每隔 5 秒向服务器发送 2, 并把心跳失败次数加 1; 服务器收到 2 之后会返回 3, 客户端收到 3 之后再把心跳失败次数减 1. 工厂函数暴露的 decLost 方法是为了外部在收到 3 之后把心跳次数减 1.
在 Web Worker 文件里面, 调用 Socket 工厂函数, 并连接 socket:
- // workers/socket.JS
- import Socket from "~/utils/socket";
- const socket = Socket();
- const socketConnection = socket.connect();
- socketConnection.onmessage = ({ data }) => {
- // 处理 socket 接收到的数据,
- // 处理完后通过 Web worker 接口发出去
- postMessage(result);
- };
- // Web worker 收到外部的数据后, 把数据发给 socket
- onmessage = ({ data }) => {
- socketConnection.send(data);
- };
然后在一个 Nuxt 插件里, 引入 socket worker, 收到 worker 里传来的数据后, 把数据交给 Vuex Store, 反之, 监听到相关 Vuex Mutation 后, 把 payload 传给 worker:
- // plugins/socket.JS
- // webpack 下导入 Web worker 的方式:
- import SocketWorker from "worker-loader!~/workers/socket.JS";
- const socketWorker = new SocketWorker();
- export default ({ store }) => {
- store.subscribe((mutation, state) => {
- // 监听到相关 Vuex Mutation
- });
- socketWorker.onmessage = ({ data }) => {
- // 监听 socket 发来的数据, 收到数据后,
- // 通过 store.commit() 来把数据存入 vuex store
- };
- };
这是我一开始写的 naive 版本, 看起来主要功能实现了, 而且封装和 Separation of concerns 做的也不错. 写完刚跑起来, 问题出现了.
挑战一: 等 socket 连接成功后再发起订阅
当应用打开后, 需要立即订阅推送数据, 包括用户登录状态下的私有数据和其它基础数据等. 但是当发起订阅时, socket 可能连接成功了, 也可能还没连接成功. 一开始我想设置个定时器, 过两秒后再发起订阅. 可是想想这种做法也太挫了. 第二个思路是在 socket 连接的 onopen 事件里执行订阅. 可是这样子会直接把以前的 onopen 覆盖掉, 而且这样做违反了封装原则. 剩下就一个办法了, 等连接成功后再发请求. 来看怎么做的:
- // workers/socket.JS
- // ...
- // const socketConnection ...
- const waitForConnection = timeout =>
- new Promise((resolve, reject) => {
- const check = () => {
- if (socketConnection.readyState === 1) {
- resolve();
- } else if ((timeout -= 100) <0) {
- reject("socket connection timed out");
- } else {
- setTimeout(check, 100);
- }
- };
- setTimeout(check, 100);
- });
- // ... 其它细节
- onmessage = async ({ data }) => {
- try {
- await waitForConnection(2000);
- } catch (e) {
- console.error(e);
- return;
- }
- socketConnection.send(data);
- };
waitForConnection 函数会每隔 100 ms 检查 socket 连接状态, 如果连接状态是 1(成功), 则 resolve Promise, 否则一直隔 100 ms 检查一次, 直到连接成功或者超过指定时间.
在向 socket 发送数据之前, 先调用 waitForConnection, 并指定最多等 2 秒, 确保连接成功后再发送数据.
问题看起来解决了. 奇淫技巧都用上了, 让我满意了一会儿. 直到......
需求来了!
在 socket 断开重连后, 需要续订之前的订阅. 而包括用户 token 等订阅参数全都在 Vuex Store 里面. 那这下头疼了, Vuex store 里面是没法知道断开重连的, 而 worker 里面则根本没法读取 vuex store. 知道这个需求后我内心是崩溃的, 这根本没法写下去了啊! 就在我都快要打算调整架构重写时, 一拍脑袋灵光一闪, 试试控制反转!
首先要让 Socket 工厂函数有个判断重连的机制. 这个简单.
- // utils/socket.JS
- export default function Socket() {
- let connectCount = 0; // 连接成功次数
- // ... 细节, 见文章前面
- socket.onopen = () => {
- // 每次连接成功, 连接次数加 1
- connectCount += 1;
- if (connectCount> 1) {
- // 若连接次数超过一次, 则说明此次是重连
- // 在这里可以做些重连之后的操作了
- }
- };
- // ...
- }
重连之后具体做什么事, 这可以用依赖注入来实现. 先在 worker 文件里定义要做的事情, 然后在调用 Socket 工厂函数时注入方法:
- // worker/socket
- // 通过 postMessage 通知外部重连
- const notifyReconnect = () => {
- postMessage({ type: "reconnect" });
- };
- const socket = Socket(notifyReconnect);
然后在 Socket 函数里接收一下:
- // utils/socket.JS
- export default function Socket(notifyReconnect) {
- // ... 细节, 见文章前面
- socket.onopen = () => {
- connectCount += 1;
- if (connectCount> 1) {
- notifyReconnect();
- }
- };
- // ...
- }
我以为写到这里应该就可以了的, 然而我还是太天真了. 运行后, plugins/socket 文件里能接收到重连消息, 但是一直连接失败. 这个问题很诡异, 最后发现还是因为我对 Web Socket 掌握的不深导致的. 每次 socket 连接后, 生成的连接实例都是新的. 而我在 waitForConnection 方法里监听的 socketConnection 在关闭后, readyState 一直是 3(关闭状态), 导致 waitForConnection 方法一直报 timeout 错误.
剩下的最后问题是每次重连, 都更新连接实例. 方法如下:
- // worker/socket
- let socketConnection;
- const notifyReconnect = connection => {
- postMessage({ type: "reconnect" });
- socketConnection = connection;
- };
- const socket = Socket(notifyReconnect);
- socketConnection = socket.connect();
- // utils/socket
- export default function Socket(notifyReconnect) {
- // ... 细节, 见文章前面
- socket.onopen = () => {
- connectCount += 1;
- if (connectCount> 1) {
- notifyReconnect(socket);
- }
- };
- // ...
- }
Socket 函数在调用 notifyReconnect 时, 传入最新的连接实例 socket.
至此, 功能都实现了. 完整代码如下:
- // utils/socket.JS
- export default function Socket(notifyReconnect) {
- let heartBeat;
- let lost = 0;
- let connectCount = 0;
- function decLost() {
- lost -= 1;
- }
- function connect() {
- const socket = new WebSocket("wss://xx.com");
- socket.onopen = () => {
- connectCount += 1;
- if (connectCount> 1) {
- notifyReconnect(socket);
- }
- heartBeat = setInterval(() => {
- socket.send(2);
- lost += 1;
- if (lost === 3) {
- clearInterval(heartBeat);
- socket.close();
- connect();
- }
- }, 5000);
- };
- socket.onerror = () => {
- clearInterval(heartBeat);
- socket.close();
- };
- socket.onclose = () => {
- setTimeout(() => {
- clearInterval(heart);
- connect();
- }, 3000);
- };
- return socket;
- }
- return Object.freeze({ decLost, connect });
- }
- // workers/socket.JS
- import Socket from "~/utils/socket";
- let socketConnection;
- const notifyReconnect = connection => {
- postMessage({ type: "reconnect" });
- socketConnection = connection;
- };
- const socket = Socket(notifyReconnect);
- socketConnection = socket.connect();
- const waitForConnection = timeout =>
- new Promise((resolve, reject) => {
- const check = () => {
- if (socketConnection.readyState === 1) {
- resolve();
- } else if ((timeout -= 100) <0) {
- reject("socket connection timed out");
- } else {
- setTimeout(check, 100);
- }
- };
- setTimeout(check, 100);
- });
- socketConnection.onmessage = ({ data }) => {
- // 处理完数据后通过 Web worker 接口发出去
- postMessage(result);
- };
- onmessage = async ({ data }) => {
- try {
- await waitForConnection(2000);
- } catch (e) {
- console.error(e);
- return;
- }
- socketConnection.send(data);
- };
- // plugins/socket.JS
- import SocketWorker from "worker-loader!~/workers/socket.JS";
- const socketWorker = new SocketWorker();
- export default ({ store }) => {
- store.subscribe((mutation, state) => {});
- socketWorker.onmessage = ({ data }) => {
- if (data.type === "reconnect") {
- socketWorker.postMessage(/* 订阅参数 */);
- }
- };
- };
来源: https://juejin.im/post/5bac8e906fb9a05d1228133b