引言
中后台仪表盘是一个非常复杂, 特别是当需要全面屏运用时, 数据的实时性需求非常高. webSocket 不管在什么环境中使用其实都是非常简单, 各现代浏览器实现标准都很统一, 而且接口也足够简单.
即便是在 Angular 也是如此, 只需要简单几行代码就能使用 WebSocket.
- const ws = new WebSocket('wss://echo.websocket.org');
- ws.onmessage = (e) => {
- console.log('message', e);
- }
若需要向服务端发送消息, 则:
ws.send(`content`);
在 Angular 里绝大多数的人都会根据上述代码进一步拓展, 比如统一消息解析, 错误处理, 多路复用等, 并最终将其封装成一个服务类.
事实上, RxJS 也包裹了一个 WebSocket Subject, 位于 rxjs/websocket.
如何使用
假如将上面的示例使用 RxJS 来写, 则:
- import { webSocket, WebSocketSubject } from 'rxjs/webSocket';
- const ws = webSocket('wss://echo.websocket.org');
- ws.subscribe(res => {
- console.log('message', res);
- });
- ws.next(`content`);
webSocket 是一个工厂函数, 所生产出来的 WebSocketSubject 对象可被多次订阅, 若未订阅或取消最后一个订阅时都会导致 WebSocket 连接中断, 当再一次订阅时会重新自动连接.
WebSocketSubjectConfig
webSocket 除了接收字符串 (WebSocket 服务远程地址) 外, 还允许指定更复杂的配置项.
默认情况下, 消息是使用 JSON.parse 和 JSON.stringify 对消息格式序列化和反序列化操作, 所以不管消息发送或接收都以 JSON 为准, 可通过 serializer,deserializer 属性来改变.
若需要关心 WebSocket 什么时候开始或结束(closeObserver), 则:
- const open$ = new Subject();
- const ws = webSocket({
- url: 'wss://echo.websocket.org',
- openObserver: open$
- });
- // 订阅打开事件
- open$.subscribe(() => {});
消息
WebSocketSubject 也是 Subject 的变体之一, 因此订阅它表示接收消息, 反之则利用 next,complete,error 来维护消息的推送.
使用 next 来发送消息
使用 complete 会尝试检测是否最后一个订阅, 若是将会关闭连接
使用 error 相当于原始 close 方法且必须提供
{ code: number, reason?: string}
参数, 注意 code 务必遵守取值范围
可被重放
调用 next 发送消息时若 WebSocket 连接中断(例如: 没人订阅时), 消息会被缓存当下一次重新连接以后会按顺序发送. 这对于异步世界里非常方便, 我们只需要确保 Angular 启动前初始化好 WebSocket 不管什么时候订阅接收消息, 都可以随时发送也无须等待.
事实上这一点是 RxJS WebSocket 默认情况下是通过 webSocket 所生产的 WebSocketSubject 其本质上是 ReplaySubject 的 "重放" 能力. 当然你可以通过 webSocket 的第二个参数改变这种行为.
多路复用
一般来说我们不太可能只会一个 Web Socket 服务完成所有的事, 然而也不太可能针对每一个业务实例创建一个 webSocket. 往往我们会增加一层网关并将这些业务 WebSocket 进行汇总, 对于前端始终只需要一个连接, 这就是多路复用存在的意义.
而核心是必须要让后端知道, 什么时候发送什么消息给什么样的服务.
首先必须先使用 multiplex 方法来创建 Observable 以便订阅某一路消息, 它有三个参数来帮助我们区分消息:
subMsg 告知正在订阅哪一路消息
unsubMsg 告知取消订阅哪一路消息
messageFilter 过滤消息, 使订阅者只接收哪一路消息
- const ws = webSocket('wss://echo.websocket.org');
- const user$ = this.ws.multiplex(
- () => ({ type: 'subscribe', tag: 'user' }),
- () => ({ type: 'unsubscribe', tag: 'user' }),
- message => message.type === 'user'
- );
- user$.subscribe(message => console.log(message));
- const todo$ = this.ws.multiplex(
- () => ({ type: 'subscribe', tag: 'todo' }),
- () => ({ type: 'unsubscribe', tag: 'todo' }),
- message => message.type === 'todo'
- );
- todo$.subscribe(message => console.log(message));
user$ 流和 todo$ 流他们共用一个 WebSocket 连接, 这便是多路复用.
虽然订阅是通过 multiplex 创建的, 然后消息的推送依然还是需要使用 ws.next().
总结
这原本是对内部一个简单培训, 然而我发现竟然极少人会讨论 RxJS 里面 Web Socket 的实现.
其实一直有想着要给 https://ng-alain.com/ 内置 WebSocket, 只是就封装角度来讲完全没有价值, 因为已经足够优雅.
来源: https://segmentfault.com/a/1190000016494649