本文为该系列的第三篇文章, 设计需求为: 服务端程序和众多客户端程序通过 TCP 协议进行通信, 通信双方需通信的消息种类众多. 上一篇文章以一个具体的需求为例, 探讨了指定的 Java 消息对象与其相应的二进制数据帧相互转换的方法. 本文仍以该实例为例, 探讨该自定义通信协议的具体工作流程, 以及如何以注册的形式灵活插拔通信消息对象.
1. 以注册的形式实现通信消息对象的统一管理
通过该系列的第二篇文章可知, 各个消息对象的编解码器类均拥有一个静态工厂方法, 用于手动传入功能位及功能文字描述, 进而生成包含这些参数的编解码器. 如此设计, 使得所有消息的功能位和文字描述均能够统一管理, 降低维护成本.
根据上述需求, 可通过 Map 容器管理所有的编解码器, 有如下优点:
进行消息对象生成操作时, 可直接使用相应编解码器的消息对象静态创建方法.
进行消息对象的编码操作时, 已拥有该 Java 消息对象, 即可知道消息对象的功能位, 据此可获取相应的编解码器; 或者, 每个 Java 消息对象均内含相应编解码器的引用, 故可直接对该消息对象进行编码操作.
进行二进制数据帧的解码操作时, 数据帧中已包含了消息的功能位, 据此可获取相应的编解码器, 而后可以对该数据帧进行解析, 生成相应的 Java 消息对象.
通信消息对象注册方法如下所示:
- /**
- * 消息对象的注册
- *
- * @param toolkit 消息对象编解码器容器的工具类
- */
- private void initialMsg() {saveNormalMsgCodec(new MsgCodecDeviceUnlock(0x10, 0x11, "客户端解锁"));
- saveNormalMsgCodec(new MsgCodecDeviceClear(0x10, 0x13, "客户端初始化"));
- saveNormalMsgCodec(new MsgCodecDeviceId(0x10, 0x1B, "客户端 ID 设置"));
- saveNormalMsgCodec(new MsgCodecEmployeeName(0x10, 0x1C, "客户端别名设置"));
- ... ...
- }
- /**
- * 将普通消息对象及其回复消息对象的编解码器均保存到 HashMap 中
- *
- * @param baseMsgCodec 特定的消息对象编解码器
- */
- private void saveNormalMsgCodec(BaseMsgCodec baseMsgCodec) {
- saveSpecialMsgCodec(baseMsgCodec);
- baseMsgCodec = new MsgCodecReplyNormal(baseMsgCodec.getMajorMsgId() + 0x10, baseMsgCodec.getSubMsgId(), baseMsgCodec.getDetail());
- saveSpecialMsgCodec(baseMsgCodec);
- }
- /**
- * 将消息对象的编解码器保存到 HashMap 中
- *
- * @param baseMsgCodec 特定的编解码器
- */
- private void saveSpecialMsgCodec(BaseMsgCodec baseMsgCodec) {
- HASH_MAP.put(figureFrameId(baseMsgCodec.getMajorMsgId(), baseMsgCodec.getSubMsgId()), baseMsgCodec);
- }
上述代码表明, 如果有新的业务需求, 需要增删插拔业务消息对象, 只需在 initialMsg() 方法中, 对相应编解码器的注册语句进行增删即可.
saveNormalMsgCodec(BaseMsgCodec)
方法可以同时注册特定业务消息对象及其通用回复消息对象, 操作方法清晰, 简洁.
所以, 在启动该 Java 程序时, 只需要在启动过程中, 执行上述 initialMsg() 方法, 即可完成所有业务消息对象的注册.
2. 多个消息对象自由组合进同一个数据帧的实现原理
由该系列的第一篇文章可知, 如果某二进制数据帧所要传输的数据体部分内容很少, 导致一个帧的大部分容量均被帧头占据, 导致有效数据的占比很小, 这就产生了巨大的浪费, 故数据帧的数据体部分由子帧组成, 同一类子帧均可被组装进同一个数据帧. 如此做法, 整个通信链路的数据量会明显减少, IO 负担也会因此减轻.
该需求的实现原理如下所示:
- /**
- * 启动一个 Channel 的定时任务, 用于间隔指定的时间对消息队列进行轮询, 并发送指定数据帧
- *
- * @param deque 指定的消息发送队列
- * @param channelId 指定 Channel 的序号
- */
- private void startMessageQueueTask(LinkedBlockingDeque<BaseMsg> deque, Integer channelId) {
- executorService.scheduleWithFixedDelay(() -> {
- try {
- BaseMsg baseMsg = deque.take(); // 从队列中取出一个消息对象, 队列为空时阻塞
- Thread.sleep(AWAKE_TO_PROCESS_INTERVAL);// 等待极短的时间, 保证队列中缓存尽可能多的对象
- Channel channel = touchChannel(channelId); // 获取指定的待发送的 Channel
- List<ByteBuf> dataList = new ArrayList<>();// 子帧容器
- ByteBuf data = baseMsg.subFrameEncode(channel.alloc().buffer());// 编码一个子帧
- dataList.add(data);
- touchNeedReplyMsg(baseMsg); // 对该子帧设置检错重发任务
- int length = data.readableBytes();
- int flag = baseMsg.combineFrameFlag(); // 获取消息对象标识
- while (true) {
- BaseMsg subMsg = deque.peek(); // 查看队列中的第一个消息对象
- if (subMsg == null || subMsg.combineFrameFlag() != flag) {
- break; // 消息对象标识不同, 即欲生成的主帧帧头不同, 不能组合进同一主帧
- }
- data = subMsg.subFrameEncode(channel.alloc().buffer());
- if (length + data.readableBytes()> FrameSetting.MAX_DATA_LENGTH) {
- break;
- }
- length += data.readableBytes();
- dataList.add(data); // 组合进了同一主帧
- deque.poll(); // 从队列中移除该消息对象
- touchNeedReplyMsg(subMsg);
- }
- FrameMajorHeader frameHeader = new FrameMajorHeader(
- baseMsg.getMajorMsgId(),
- baseMsg.getGroupId(),
- baseMsg.getDeviceId(),
- length); // 生成主帧帧头消息对象
- channel.writeAndFlush(new SendableMsgContainer(frameHeader, dataList)); // 送入 Channel 进行发送
- } catch (InterruptedException e) {
- logger.warn("消息队列定时发送任务被中断");
- }
- }, channelId, CommSetting.FRAME_SEND_INTERVAL, TimeUnit.MILLISECONDS);
- }
由代码可知, 待发送的消息对象均被送入指定的发送队列进行缓存, 某客户端相应的线程对队列进行操作, 取出消息对象并进行编码, 组装, 发送等. 当然, 当客户端数量较多时, 上述的线程实现方式可采用 Netty 的 NIO 方式进行优化, 以降低系统开销.
由上述描述可知, 欲发送一个消息对象, 只需将该消息对象送入相应的发送队列即可.
3. 实际业务消息对象的编解码
3.1 消息对象的编码方式
由于每个 Java 消息对象均内含相应编解码器的引用, 故可直接对该消息对象进行编码操作, 代码如下:
- public abstract class BaseMsg implements Cloneable {
- private final BaseMsgCodec msgCodec;
- ... ...
- /**
- * 将 java 消息对象编码为 TCP 子帧
- *
- * @param buffer 空白的 TCP 子帧的容器
- * @return 保存有 TCP 子帧的容器
- */
- public ByteBuf subFrameEncode(ByteBuf buffer) {
- return msgCodec.code(this, buffer);
- }
- }
3.2 消息对象的解码方式
首先根据数据帧的帧头, 即可解析出 FrameMajorHeader 对象, 然后即可调用如下方法完成子帧的解析工作. 实现原理文章开头已指出.
- /**
- * TCP 帧解码为 Java 消息对象
- *
- * @param head 主帧头
- * @param subMsgId 子帧功能位
- * @param data 子帧数据
- * @return 已解码的 Java 对象
- */
- public BaseMsg decode(FrameMajorHeader head, int subMsgId, byte[] data) {
- BaseMsgCodec msgCodec = MsgCodecToolkit.getMsgCodec(head.getMsgId(), subMsgId);
- return msgCodec.decode(head.getGroupId(), head.getDeviceId(), data);
- }
来源: https://juejin.im/post/5ae2bcc1518825673027df23