当网络中两个进程需要通信时, 我们往往会使用 Socket 来实现. Socket 都不陌生. 当三次握手成功后, 客户端与服务端就能通信, 并且, 彼此之间通信的数据包格式都是二进制, 由 TCP/IP 协议负责传输.
当客户端和服务端取得了二进制数据包后, 我们往往需要『萃取』出想要的数据, 这样才能更好的执行业务逻辑. 所以, 我们需要定义好数据结构来描述这些二进制数据的格式, 这就是通信网络协议. 简单讲, 就是需要约定好二进制数据包中每一段字节的含义, 比如从第 n 字节开始的 m 长度是核心数据, 有了这样的约定后, 我们就能解码出想要的数据, 执行业务逻辑, 这样我们就能畅通无阻的通信了.
网络协议的设计
概要划分
一个最基本的网络协议必须包含
数据的长度
数据
了解 TCP 协议的同学一定听说过粘包, 拆包 这两个术语. 因为 TCP 协议是数据流协议, 它的底层根据二进制缓冲区的实际情况进行包的划分. 所以, 不可避免的会出现粘包, 拆包 现象 . 为了解决它们, 我们的网络协议往往会使用一个 4 字节的 int 类型来表示数据的大小. 比如, Netty 就为我们提供了 LengthFieldBasedFrameDecoder 解码器, 它可以有效的使用自定义长度帧来解决上述问题.
同时一个好的网络协议, 还会将动作和业务数据分离. 试想一下, HTTP 协议的分为请求头, 请求体 --
请求头: 定义了接口地址, Http Method,HTTP 版本
请求体: 定义了需要传递的数据
这就是一种分离关注点的思想. 所以自定义的网络协议也可以包含:
动作指令: 比如定义 code 来分门别类的代表不同的业务逻辑
序列化算法: 描述了 JAVA 对象和二进制之间转换的形式, 提供多种序列化 / 反序列化方式. 比如 JSON,protobuf 等等, 甚至是自定义算法. 比如: rocketmq 等等.
同时, 协议的开头可以定义一个约定的魔数. 这个固定值 (4 字节), 一般用来判断当前的数据包是否合法. 比如, 当我们使用 telnet 发送错误的数据包时, 很显然, 它不合法, 会导致解码失败. 所以, 为了减轻服务器的压力, 我们可以取出数据包的前 4 个字节与固定的魔数对比, 如果是非法的格式, 直接关闭连接, 不继续解码.
网络协议结构如下所示:
+--------------+-----------+------------+-----------+----------+
| 魔数 (4) | code(1) | 序列化算法 (1) | 数据长度 (4) | 数据 (n) |
+--------------+-----------+------------+-----------+----------+
RocketMQ 通信网络协议的实现
RocketMQ 网络协议
这一小节, 我们从 RocketMQ 中, 分析优秀通信网络协议的实现. RocketMQ 项目中, 客户端和服务端的通信是基于 Netty 之上构建的. 同时, 为了更加有效的通信, 往往需要对发送的消息自定义网络协议.
RocketMQ 的网络协议, 从数据分类的角度上看, 可分为两大类
消息头数据 (Header Data)
消息体数据 (Body Data)
从左到右
第一段: 4 个字节整数, 等于 2,3,4 长度总和
第二段: 4 个字节整数, 等于 3 的长度. 特别的 byte[0] 代表序列化算法, byte[1~3] 才是真正的长度
第三段: 代表消息头数据, 结构如下
- {
- "code":0,
- "language":"JAVA",
- "version":0,
- "opaque":0,
- "flag":1,
- "remark":"hello, I am respponse /127.0.0.1:27603",
- "extFields":{
- "count":"0",
- "messageTitle":"HelloMessageTitle"
- }
- }
第四段: 代表消息体数据
RocketMQ 消息头协议详细如下:
Header 字段名 | 类型 | Request | Response |
---|---|---|---|
code | 整数 | 请求操作代码,请求接收方根据不同的代码做不同的操作 | 应答结果代码,0 表示成功,非 0 表示各种错误代码 |
language | 字符串 | 请求发起方实现语言,默认 JAVA | 应答接收方实现语言 |
version | 整数 | 请求发起方程序版本 | 应答接收方程序版本 |
opaque | 整数 | 请求发起方在同一连接上不同的请求标识代码,多线程连接复用使用 | 应答方不做修改,直接返回 |
flag | 整数 | 通信层的标志位 | 通信层的标志位 |
remark | 字符串 | 传输自定义文本信息 | 错误详细描述信息 |
extFields | HashMap<String,String> | 请求自定义字段 | 应答自定义字段 |
编码过程
RocketMQ 的通信模块是基于 Netty 的. 通过定义 NettyEncoder 来实现对每一个 Channel 的 出栈数据进行编码, 如下所示:
- @ChannelHandler.Sharable
- public class NettyEncoder extends MessageToByteEncoder<RemotingCommand> {
- @Override
- public void encode(ChannelHandlerContext ctx, RemotingCommand remotingCommand, ByteBuf out)
- throws Exception {
- try {
- ByteBuffer header = remotingCommand.encodeHeader();
- out.writeBytes(header);
- byte[] body = remotingCommand.getBody();
- if (body != null) {
- out.writeBytes(body);
- }
- } catch (Exception e) {
- ...
- }
- }
- }
其中, 核心的编码过程位于 RemotingCommand 对象中, encodeHeader 阶段, 需要统计出消息总长度, 即:
定义消息头长度, 一个整数表示: 占 4 个字节
定义消息头数据, 并计算其长度
定义消息体数据, 并计算其长度
额外再加 4 是因为需要加入消息总长度, 一个整数表示: 占 4 个字节
- public ByteBuffer encodeHeader(final int bodyLength) {
- // 1> 消息头长度, 一个整数表示: 占 4 个字节
- int length = 4;
- // 2> 消息头数据
- byte[] headerData;
- headerData = this.headerEncode();
- // 再加消息头数据长度
- length += headerData.length;
- // 3> 再加消息体数据长度
- length += bodyLength;
- // 4> 额外加 4 是因为需要加入消息总长度, 一个整数表示: 占 4 个字节
- ByteBuffer result = ByteBuffer.allocate(4 + length - bodyLength);
- // 5> 将消息总长度加入 ByteBuffer
- result.putInt(length);
- // 6> 将消息的头长度加入 ByteBuffer
- result.put(markProtocolType(headerData.length, serializeTypeCurrentRPC));
- // 7> 将消息头数据加入 ByteBuffer
- result.put(headerData);
- result.flip();
- return result;
- }
其中, encode 阶段会将 CommandCustomHeader 数据转换 HashMap<String,String>, 方便序列化
- public void makeCustomHeaderToNet() {
- if (this.customHeader != null) {
- Field[] fields = getClazzFields(customHeader.getClass());
- if (null == this.extFields) {
- this.extFields = new HashMap<String, String>();
- }
- for (Field field : fields) {
- if (!Modifier.isStatic(field.getModifiers())) {
- String name = field.getName();
- if (!name.startsWith("this")) {
- Object value = null;
- try {
- field.setAccessible(true);
- value = field.get(this.customHeader);
- } catch (Exception e) {
- log.error("Failed to access field [{}]", name, e);
- }
- if (value != null) {
- this.extFields.put(name, value.toString());
- }
- }
- }
- }
- }
- }
特别的, 消息头序列化支持两种算法:
- JSON
- RocketMQ
- private byte[] headerEncode() {
- this.makeCustomHeaderToNet();
- if (SerializeType.ROCKETMQ == serializeTypeCurrentRPC) {
- return RocketMQSerializable.rocketMQProtocolEncode(this);
- } else {
- return RemotingSerializable.encode(this);
- }
- }
这儿需要值得注意的是, encode 阶段将当前 RPC 类型和 headerData 长度编码到一个 byte[4] 数组中, byte[0] 位序列化类型.
- public static byte[] markProtocolType(int source, SerializeType type) {
- byte[] result = new byte[4];
- result[0] = type.getCode();
- result[1] = (byte) ((source>> 16) & 0xFF);
- result[2] = (byte) ((source>> 8) & 0xFF);
- result[3] = (byte) (source & 0xFF);
- return result;
- }
其中, 通过与运算 & 0xFF 取低八位数据.
所以, 最终 length 长度等于序列化类型 + header length + header data + body data 的字节的长度.
解码过程
RocketMQ 解码通过 NettyDecoder 来实现, 它继承自 LengthFieldBasedFrameDecoder, 其中调用了父类 LengthFieldBasedFrameDecoder 的构造函数
super(FRAME_MAX_LENGTH, 0, 4, 0, 4);
这些参数设置 4 个字节代表 length 总长度, 同时解码时跳过最开始的 4 个字节:
frame = (ByteBuf) super.decode(ctx, in);
所以, 得到的 frame= 序列化类型 + header length + header data + body data . 解码如下所示:
- public static RemotingCommand decode(final ByteBuffer byteBuffer) {
- // 总长度
- int length = byteBuffer.limit();
- // 原始的 header length,4 位
- int oriHeaderLen = byteBuffer.getInt();
- // 真正的 header data 长度. 忽略 byte[0] 的 serializeType
- int headerLength = getHeaderLength(oriHeaderLen);
- byte[] headerData = new byte[headerLength];
- byteBuffer.get(headerData);
- RemotingCommand cmd = headerDecode(headerData, getProtocolType(oriHeaderLen));
- int bodyLength = length - 4 - headerLength;
- byte[] bodyData = null;
- if (bodyLength> 0) {
- bodyData = new byte[bodyLength];
- byteBuffer.get(bodyData);
- }
- cmd.body = bodyData;
- return cmd;
- }
- private static RemotingCommand headerDecode(byte[] headerData, SerializeType type) {
- switch (type) {
- case JSON:
- RemotingCommand resultJson = RemotingSerializable.decode(headerData, RemotingCommand.class);
- resultJson.setSerializeTypeCurrentRPC(type);
- return resultJson;
- case ROCKETMQ:
- RemotingCommand resultRMQ = RocketMQSerializable.rocketMQProtocolDecode(headerData);
- resultRMQ.setSerializeTypeCurrentRPC(type);
- return resultRMQ;
- default:
- break;
- }
- return null;
- }
其中, getProtocolType, 右移 24 位, 拿到 serializeType:
- public static SerializeType getProtocolType(int source) {
- return SerializeType.valueOf((byte) ((source>> 24) & 0xFF));
- }
getHeaderLength 拿到 0-24 位代表的 headerData length:
- public static int getHeaderLength(int length) {
- return length & 0xFFFFFF;
- }
小结
对于诸多中间件而言, 底层的网络通信模块往往会使用 Netty.Netty 提供了诸多的编解码器, 可以快速方便的上手. 本文从如何设计一个网络协议入手, 最终切入到 RocketMQ 底层网络协议的实现. 可以看到, 它并不复杂. 仔细研读几遍变能理解其奥义. 具体参考类 NettyEncoder,NettyDecoder,RemotingCommand.
来源: https://www.cnblogs.com/OceanEyes/p/protocol_design.html