webSocket
传统的浏览器和服务器之间的交互模式是基于请求 / 响应的模式, 虽然可以使用 JS 发送定时任务让浏览器在服务器中拉取但是弊端很明显, 首先就是不能避免的延迟, 其次就是频繁的请求, 让服务器的压力骤然提升
WebSocket 是 H5 新增的协议, 用于构建浏览器和服务器之间的不受限的长连接的通信模式, 不再局限于请求 / 响应式的模型, 服务端可以主动推送消息给客户端,(游戏有某个玩家得奖了的弹幕)基于这个特性我们可以构建我们的实时的通信程序
协议详情:
websocket 建立连接时, 是通过浏览器发送的 HTTP 请求, 报文如下:
- GET ws://localhost:3000/ws/chat HTTP/1.1
- Host: localhost
- Upgrade: websocket
- Connection: Upgrade
- Origin: http://localhost:3000
- Sec-WebSocket-Key: client-random-string
- Sec-WebSocket-Version: 13
首先 GET 请求是以 ws 开头的
其中请求头中的
Upgrade: websocket Connection: Upgrade
表示尝试建立 WebSocket 连接
对于服务端的相应数据
- HTTP/1.1 101 Switching Protocols
- Upgrade: websocket
- Connection: Upgrade
- Sec-WebSocket-Accept: server-random-string
其中的 101, 表示服务端支持 WebSocket 协议, 双方基于 Http 请求, 成功建立起 WebSocket 连接, 双方之间的通信也不再通过 HTTP
JS 对 WebSocket 的封装对象
对于 JS 的 WebSocket 对象来说, 它常用 4 个回调方法, 以及两个主动方法
方法名 | 作用 |
---|---|
onopen() | 和服务端成功建立连接后回调 |
onmessage(e) | 收到服务端的的消息后回调, e 为消息对象 |
onerror() | 链接出现异常回调, 如服务端关闭 |
onclose() | 客户端单方面断开连接时回调 |
send(e) | 主动向服务端推送消息 |
close() | 主动关闭通道 |
再次对 WebSocket 进行封装
知道了回调函数回调时机, 我们接下来要做的就是在他的整个生命周期的不同回调函数中, 添加我们指定的动作就 ok 了, 下面是通过 Windows 定义一个全局的聊天对象 CHAT
- Windows.CHAT={
- var socket = null;
- // 初始化 socket
- init:function(){
- // 判断当前的浏览器是否支持 WebSocket
- if(Windows.WebSocket){
- // 检验当前的 webSocket 是否存在, 以及连接的状态, 如已经连接, 直接返回
- if(CHAT.socket!=null&&CHAT.socket!=undefined&&CHAT.socket.readyState==WebSocket.OPEN){
- return false;
- }else{// 实例化 , 第二个 ws 是我们可以自定义的, 根据后端的路由来
- CHAT.socket=new WebSocket("ws://192.168.43.10:9999/ws");
- // 初始化 WebSocket 原生的方法
- CHAT.socket.onopen=CHAT.myopen();
- CHAT.socket.onmessage=CHAT.mymessage();
- CHAT.socket.onerror=CHAT.myerror();
- CHAT.socket.onclose=CHAT.myclose();
- }
- }else{
- alert("当前设备不支持 WebSocket");
- }
- }
- // 发送聊天消息
- chat:function(msg){
- // 如果的当前的 WebSocket 是连接的状态, 直接发送 否则从新连接
- if(CHAT.socket.readyState==WebSocket.OPEN&&CHAT.socket!=null&&CHAT.socket!=undefined){
- socket.send(msg);
- }else{
- // 重新连接
- CHAT.init();
- // 延迟一会, 从新发送
- setTimeout(1000);
- CHAT.send(msg);
- }
- }
- // 当连接建立完成后对调
- myopen:function(){
- // 拉取连接建立之前的未签收的消息记录
- // 发送心跳包
- }
- mymessage:function(msg){
- // 因为服务端可以主动的推送消息, 我们提前定义和后端统一 msg 的类型, 如, 拉取好友信息的消息, 或 聊天的消息
- if(msg== 聊天内容){
- // 发送请求签收消息, 改变请求的状态
- // 将消息缓存到本地
- // 将 msg 转换成消息对象, 植入 html 进行渲染
- }else if(msg== 拉取好友列表){
- // 发送请求更新好友列表
- }
- }
- myerror:function(){
- console.log("连接出现异常...");
- }
- myclose:function(){
- console.log("连接关闭...");
- }
- keepalive: function() {
- // 构建对象
- var dataContent = new App.DataContent(App.KEEPALIVE, null, null);
- // 发送心跳
- CHAT.chat(JSON.stringify(dataContent));
- // 定时执行函数, 其他操作
- // 拉取未读消息
- // 拉取好友信息
- }
- }
对消息类型的约定
WebSocket 对象通过 send(msg); 方法向后端提交数据, 常见的数据如下:
客户端发送聊天消息
客户端签收消息
客户端发送心跳包
客户端请求建立连接
为了使后端接收到不同的类型的数据做出不同的动作, 于是我们约定发送的 msg 的类型;
- // 消息 action 的枚举, 这个枚举和后端约定好, 统一值
- CONNECT: 1, // 第一次 (或重连) 初始化连接
- CHAT: 2, // 聊天消息
- SIGNED: 3, // 消息签收
- KEEPALIVE: 4, // 客户端保持心跳
- PULL_FRIEND:5, // 重新拉取好友
- // 消息模型的构造函数
- ChatMsg: function(senderId, receiverId, msg, msgId){
- this.senderId = senderId;
- this.receiverId = receiverId;
- this.msg = msg;
- this.msgId = msgId;
- }
- // 进一步封装两个得到最终版消息模型的构造函数
- DataContent: function(action, chatMsg, extand){
- this.action = action;
- this.chatMsg = chatMsg;
- this.extand = extand;
- }
如何发送数据?
我们使用 JS, 给发送按钮绑定点击事件, 一经触发, 从缓存中获取出我们需要的参数, 调用
CHAT.chat(JSON.stringify(dataContent));
后端 netty 会解析 dataContent 的类型, 进一步处理
如何签收未与服务器连接时好友发送的消息?
消息的签收时机:
之所以会有未签收的信息, 是因为客户端未与服务端建立 WebSocket 连接, 当服务端判断他维护的 channel 组中没有接受者的 channel 时, 不会发送数据, 而是把数据持久化到数据库, 并且标记 flag = 未读, 所以我们签收信息自然放在客户端和服务端建立起连接时的回调函数中执行
步骤:
客户端通过 JS 请求, 拉取全部的和自己相关的 flag = 未读的消息实体列表
从回调函数数中, 把列表中的数据取出, 缓存在本地
将列表中的数据回显在 HTML 页面中
和后端约定, 将该列表中所有的实例的 id 取出, 用逗号分隔拼接成字符串, 以 action=SIGNED 的方式发送给后端, 让其进行签收
Netty 对 WebSocket 的支持
首先每一个 Netty 服务端的程序都是神似的, 想创建不同的服务端, 就得给 Netty 装配的 pipeline 不同的 Handler
针对聊天程序, 处理 String 类型的 JSON 信息, 我们选取 SimpleChannelInboundHandler, 他是个典型的入站处理器, 并且如果我们没有出来数据, 她会帮我们回收 重写它里面未实现抽象方法, 这些抽象方法同样是回调方法, 当一个新的 Channel 进来, 它注册进 Selector 上的过程中, 会回调不同的抽象方法
方法名 | 回调时机 |
---|---|
handlerAdded(ChannelHandlerContext ctx) | Pepiline 中的 Handler 添加完成回调 |
channelRegistered(ChannelHandlerContext ctx) | channel 注册进 Selector 后回调 |
channelActive(ChannelHandlerContext ctx) | channel 处于活动状态回调 |
channelReadComplete(ChannelHandlerContext ctx) | channel, read 结束后回调 |
userEventTriggered(ChannelHandlerContext ctx, Object evt) | 当出现用户事件时回调, 如 读 / 写 |
channelInactive(ChannelHandlerContext ctx) | 客户端断开连接时回调 |
channelUnregistered(ChannelHandlerContext ctx) | 客户端断开连接后, 取消 channel 的注册时回调 |
handlerRemoved(ChannelHandlerContext ctx) | 取消 channel 的注册后, 将 channel 移除 ChannelGroup 后回调 |
exceptionCaught(ChannelHandlerContext ctx, Throwable cause) | 出现异常时回调 |
handler 的设计编码
要做到点对点的聊天, 前提是服务端拥有全部的 channel 因为所有数据的读写都依赖于它, 而 netty 为我们提供了 ChannelGroup 用来保存所有新添加进来的 channel, 此外点对点的聊天, 我们需要将用户信息和它所属的 channel 进行一对一的绑定, 才可以精准的匹配出两个 channel 进而数据交互, 因此添加 UserChannel 映射类
- public class UserChanelRelationship {
- private static HashMap<String, Channel> manager = new HashMap<>();
- public static void put(String sendId,Channel channel){
- manager.put(sendId,channel);
- }
- public static Channel get(String sendId){
- return manager.get(sendId);
- }
- public static void outPut(){
- for (HashMap.Entry<String,Channel> entry:manager.entrySet()){
- System.out.println("UserId:"+entry.getKey() + "channelId:"+entry.getValue().id().asLongText());
- }
- }
- }
我们把 User 和 Channel 之间的关系以键值对的形式存放进 Map 中, 服务端启动后, 程序就会维护这个 map, 那么问题来了? 什么时候添加两者之间的映射关系呢? 看上 handler 的回调函数, 我们选择 channelRead0() 当我们判断出 客户端发送过来的信息是 CONNECT 类型时, 添加映射关系
下面是 handler 的处理编码
- public class MyHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
- // 用于管理整个客户端的 组
- public static ChannelGroup users = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
- @Override
- protected void channelRead0(ChannelHandlerContext channelHandlerContext, TextWebSocketFrame frame) throws Exception {
- Channel currentChanenl = channelHandlerContext.channel();
- // 1. 获取客户端发送的消息
- String content = frame.text();
- System.out.println("content:"+content);
- // 2. 判断不同的消息的类型, 根据不同的类型进行不同的处理
- // 当建立连接时, 第一次 open , 初始化 channel, 将 channel 和数据库中的用户做一个唯一的关联
- DataContent dataContent = JsonUtils.jsonToPojo(content,DataContent.class);
- Integer action = dataContent.getAction();
- if (action == MsgActionEnum.CHAT.type) {
- // 3. 把聊天记录保存到数据库
- // 4. 同时标记消息的签收状态 [未签收]
- // 5. 从我们的映射中获取接受方的 chanel 发送消息
- // 6. 从 chanelGroup 中查找 当前的 channel 是否存在于 group, 只有存在, 我们才进行下一步发送
- // 6.1 如果没有接受者用户 channel 就不 writeAndFlush, 等着用户上线后, 通过 JS 发起请求拉取未接受的信息
- // 6.2 如果没有接受者用户 channel 就不 writeAndFlush, 可以选择推送
- }else if (action == MsgActionEnum.CONNECT.type){
- // 当建立连接时, 第一次 open , 初始化 channel, 将 channel 和数据库中的用户做一个唯一的关联
- String sendId = dataContent.getChatMsg().getSenderId();
- UserChanelRelationship.put(sendId,currentChanenl);
- }else if(action == MsgActionEnum.SINGNED.type){
- // 7. 当用户没有上线时, 发送消息的人把要发送的消息持久化在数据库, 但是却没有把信息写回到接受者的 channel, 把这种消息称为未签收的消息
- // 8. 签收消息, 就是修改数据库中消息的签收状态, 我们和前端约定, 前端如何签收消息在上面有提到
- String extend = dataContent.getExtand();
- // 扩展字段在 signed 类型代表 需要被签收的消息的 id, 用逗号分隔
- String[] msgIdList = extend.split(",");
- List<String> msgIds = new ArrayList<>();
- Arrays.asList(msgIdList).forEach(s->{
- if (null!=s){
- msgIds.add(s);
- }
- });
- if (!msgIds.isEmpty()&&null!=msgIds&&msgIds.size()>0){
- // 批量签收
- }
- }else if (action == MsgActionEnum.KEEPALIVE.type){
- // 6. 心跳类型
- System.out.println("收到来自 channel 为" +currentChanenl+"的心跳包...");
- }
- }
- // handler 添加完成后回调
- @Override
- public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
- // 获取链接, 并且若想要群发的话, 就得往每一个 channel 中写数据, 因此我们得在创建连接时, 把 channel 保存起来
- System.err.println("handlerAdded");
- users .add(ctx.channel());
- }
- // 用户关闭了浏览器回调
- @Override
- public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
- // 断开连接后, channel 会自动移除 group
- // 我们主动的关闭进行, channel 会被移除, 但是我们如果是开启的飞行模式, 不会被移除
- System.err.println("客户端 channel 被移出:"+ctx.channel().id().asShortText());
- users.remove(ctx.channel());
- }
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
- // 发生异常关闭 channel, 并从 ChannelGroup 中移除 Channel
- ctx.channel().close();
- users.remove(ctx.channel());
- }
... 其他方法
前后端的心跳维持
双方建立起 WebSocket 连接后, 服务端需要明确的知道, 自己维护的诸多 channel 中, 谁已经挂掉了, 为了提高性能, 需要及早把废弃的 channel 移除 ChanelGroup
客户端杀掉了进程, 或者开启了飞行模式, 这时服务端是感知不到它维护的 channel 中已经有一个不能使用了, 首先来说, 维护一个不能使用的 channel 会影响性能, 而且当这个 channel 的好友给他发送消息时, 服务端认为用户在线, 于是向一个不存在的 channel 写入刷新数据, 会带来额外的麻烦
这时我们就需要添加心跳机制, 客户端设置定时任务, 每个一段时间就往服务端发送心跳包, 心跳包的内容是什么不是重点, 它的作用就是告诉服务端自己还 active, N 多个客户端都要向服务端发送心跳, 这并不会增加服务端的请求, 因为这个请求是通过 WebSocket 的 send 方法发送过去的, 只不过 dataContent 的类型是 KEEPALIVE , 同样这是我们提前约定好的(此外, 服务端向客户端发送心跳看起来是没有必要的)
于是对于后端来说, 我们发送的心跳包, 会使得当前客户端对应的 channel 的 channelRead0()方法回调, netty 为我们提供了心跳相关的 handler, 每一次的 chanelRead0()的回调, 都是 read/write 事件, 下面是 netty 对心跳的支持的实现
- /**
- * @Author: Changwu
- * @Date: 2019/7/2 9:33
- * 我们的心跳 handler 不需要实现 handler0 方法, 我们选择, 直接继承 SimpleInboundHandler 的父类
- */
- public class HeartHandler extends ChannelInboundHandlerAdapter {
- // 我们重写 EventTrigger 方法
- @Override
- public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
- // 当出现 read/write 读写写空闲时触发
- if(evt instanceof IdleStateEvent){
- IdleStateEvent event = (IdleStateEvent) evt;
- if (event.state()== IdleState.READER_IDLE){ // 读空闲
- System.out.println(ctx.channel().id().asShortText()+"读空闲...");
- }else if (event.state()==IdleState.WRITER_IDLE){
- System.out.println(ctx.channel().id().asShortText()+"写空闲...");
- }else if (event.state()==IdleState.ALL_IDLE){
- System.out.println("channel 读写空闲, 准备关闭当前 channel , 当前 UsersChanel 的数量:"+MyHandler.users.size());
- Channel channel = ctx.channel();
- channel.close();
- System.out.println("channel 关闭后, UsersChanel 的数量:"+MyHandler.users.size());
- }
- }
- }
Handler 我们不再使用 SimpleChannelInboundHandler 了, 因为它当中的方法都是抽象方法, 而我们需要回调的函数时机是, 每次当有用户事件时回调, 比如 read,write 事件, 这些事件可以证明 channel 还活着, 对应的方法是 userEventTriggered()
此外, ChannelInboundHandlerAdapter 是 netty 中, 适配器模式的体现, 它实现了全都抽象方法, 然后他的实现方法中并不是在干活, 而是把这个事件往下传播下去了, 现在我们重写 userEventTriggered() 执行的就是我们的逻辑
另外, 我们需要在 pipeline 中添加 handler
...
/ 添加 netty 为我们提供的 检测空闲的处理器, 每 20 40 60 秒, 会触发 userEventTriggered 事件的回调
- pipeline.addLast(new IdleStateHandler(10,20,30));
- // todo 添加心跳的支持
- pipeline.addLast("heartHandler",new HeartHandler());
服务端主动向客户端推送数据
如, 添加好友的操作中, A 向 B 发送添加好友请求的过程, 会经过如下几步
A 向服务端发送 Ajax 请求, 将自己的 id, 目标朋友的 id 持久化到 数据库, 请求 friend_request 表
用户 B 上线, 通过 JS, 向后端拉取 friend_request 表中有没有关于自己的信息, 于是服务端把 A 的请求给 B 推送过去
在 B 的前端回显 A 的请求, B 进一步处理这个信息, 此时两种情况
B 拒绝了 A 的请求: 后端把 friend_request 表关于 AB 的信息清除
B 同意了 A 的请求: 后端在 firend_List 表中, 将 AB 双方的信息都持久化进去, 这时我们可以顺势在后端的方法中, 给 B 推送最新的联系人信息, 但是这不属于主动推送, 因为这次会话是客户端主动发起的
但是 A 却不知道, B 已经同意了, 于是需要给 A 主动的推送数据, 怎么推送呢? 我们需要在上面的 UserChannel 的关系中, 拿出发送者的 channel, 然后往回 writeAndFlush 内容, 这时 A 就得知 B 已经同意了, 重新加载好友列表
来源: https://www.cnblogs.com/ZhuChangwu/p/11184654.html