前言
了解 T-io 框架有些日子了, 并且还将它应用于实战, 例如 tio-websocket-server,tio-http-server 等. 但是由于上述两个 server 已经封装好, 直接应用就可以. 所以对于整个数据流通的过程不是很明朗, 甚至对于 hello-world 例子中的 encode,decode 作用并不理解. 于是乎想写一个更贴近实际应用的 Redis-client 来作为学习切入点, 虽然编码过程中困难重重, 不过最后还是实现了一个粗糙的客户端. 由于代码中大量参考了 Jedis 源码, 所以, 我给这个客户端起名 T-io+Redis=Tedis. 哈哈, 这些都不重要, 下文中将会记录出我的学习和开发历程.
Redis 通信协议
Redis Protocol https://redis.io/topics/protocol
在开发之前, 首先要去了解客户端和服务端的通信协议, 那么我们开发 Redis 客户端, 就要去看看 Redis 协议了. 所以, 下面要做的就是:
明确客户端发送给服务端的消息格式
明确服务端返回给客户端的消息格式
在此呢, 我只简单举一个 GET,SET 的例子, 其他的内容大家可以去看参考文档.
- //SET 命令
- set mykey myvalue
- //GET 命令
- get mykey
上述两个简单的命令, 根据 Redis 协议可以解析成如下内容
- //SET 命令
- *3\r\n$3\r\nset\r\n$5\r\nmykey\r\n$7\r\nmyvalue\r\n
- //GET 命令
- *2\r\n$3\r\nget\r\n$5\r\nmykey\r\n
其中 *3 代表有三段内容, 即 SET,mykey,myvalue. 每一段内容之间由 CRLF(\r\n)隔开.$ 符号后边跟的数字就是数据字节数. 引用官方的一个图:
在 Jedis 源码中, 对于消息体的构造比较麻烦, 我看的也是云里雾里的, 所以在 Tedis 的实现中我才用了最简单的拼接方式. 即 StringBuilder 根据规则拼接字符串, 然后调用 getBytes 方法获取 byte[]. 示例代码如下:
- public static byte[] buildCommandBody(final ProtocolCommand cmd,String... args) {
- StringBuilder builder = new StringBuilder();
- //*[num]
- builder.append('*')
- // 命令数(1) + 参数的个数
- .append(1 + args.length);
- appendCrLf(builder)
- // 命令长度 $[cmd_length]
- .append("$")
- .append(cmd.getName().length());
- appendCrLf(builder)
- // 命令内容 cmd
- .append(cmd.getName());
- appendCrLf(builder);
- // 遍历参数, 按照 $[num]\r\n[content]\r\n 的格式拼接
- for (String arg : args) {
- builder.append("$")
- .append(arg.length());
- appendCrLf(builder)
- .append(arg);
- appendCrLf(builder);
- }
- // 最后转换为 byte[], 此处使用 Jedis 中的 SafeEncoder
- return SafeEncoder.encode(builder.toString());
- }
调用示例:
- public static void main(String[] args){
- Protocol.buildCommandBody(Protocol.Command.SET,"key","value");
- }
打印结果:
- *3
- $3
- SET
- $3
- key
- $5
- value
那么到此为止, 我们已经了解了如何构造发送给服务端的消息, 那么如何解析服务端返回的消息呢?
Redis 命令会返回多种不同类型的回复.
通过检查服务器发回数据的第一个字节, 可以确定这个回复是什么类型:
状态回复 (status reply) 的第一个字节是 "+"
错误回复 (error reply) 的第一个字节是 "-"
整数回复 (integer reply) 的第一个字节是 ":"
批量回复 (bulk reply) 的第一个字节是 "$"
多条批量回复 (multi bulk reply) 的第一个字节是 "*"
时间有限, 我也只是完成了状态回复和批量回复的部分功能, 下文中将以这两种回复作为讲解示例.
T-io 登场
由于只是客户端的开发, 所以这里我们只会用到 TioClient. 所以, 我们先把 Redis-Server 连接上. ClientAioHandler,ClientAioListener,ClientGroupContext 自然是少不了的啦, 直接上代码吧.
初始化一个 ServerNode
Node serverNode = new Node("127.0.0.1",6379);
初始化一个 ClientGroupContext, 它依赖于 ClientAioHandler,ClientAioListener
ClientGroupContext clientGroupContext = new ClientGroupContext(tioClientHandler, aioListener, null);
初始化一个 TioClient
TioClient tioClient = new TioClient(clientGroupContext);
最后连接服务器, 如果没有什么异常打印的话, 就连接成功啦
- // 返回的 ClientChannelContext 用于发送消息使用
- ClientChannelContext clientChannelContext = tioClient.connect(serverNode);
恭喜你, 一个 Redis 客户端宝宝就此诞生, 只不过它还不会说话. 结合上文协议部分的内容, 我们发送一条消息给服务器. 首先定义消息包:
- public class TedisPacket extends Packet {
- private byte[] body;
- //getter setter
- }
然后调用 Tio.send 方法就可以啦.
Tio.send(clientChannelContext, packet);
如果你已经看懂了上半部分, 那么你就会知道这里 TedisPacket 中的 body 的值就是通过 Protocol.buildCommandBody(Protocol.Command.SET,"key","value"); 来生成的. 不要忘了 `ClientAioHandler.encode'方法哦.
- @Override
- public ByteBuffer encode(Packet packet, GroupContext groupContext, ChannelContext channelContext) {
- TedisPacket tedisPacket = (TedisPacket) packet;
- byte[] body = tedisPacket.getBody();
- int bodyLen = 0;
- if (body != null) {
- bodyLen = body.length;
- }
- // 只是简单将 body 放入 ByteBuffer .
- ByteBuffer buffer = ByteBuffer.allocate(bodyLen);
- buffer.put(body);
- return buffer;
- }
到此为止, 客户端向服务器发送消息的内容已经写完了. 下面将介绍如何解析服务端的响应.
当服务器正常, 并且发送到服务器的消息格式符合 RESP 协议的话, 那么服务器会返回你相应的内容, 比如我们发送 SET 命令, 服务器的正常响应是 + OK\r\n. 下面我们看
ClientAioHandler.decode
方法. 当我批量向服务器发送消息时, 服务器给我的响应也是批量接收到的. 打印结果如下:
那么问题来了, 我们只想要每一次发送对应一个 OK. 所以, 原谅我这个菜鸟, 我才明白 decode 方法的目的. 那么, 我们就去解析这个内容. 解析过程有几个需要关注的地方:
遇到第一个 \r 的时候, 下一个字节一定是'\n'否则, 作为解析失败处理.
\r\n 之后停止本轮解析, 返回解析结果.
基于上述注意事项, 解析代码如下:(应该会有更优秀的方法)
先获取第一个字节, 它应该是 + - $ : * 的其中一个, 如果不是的话, 说明消息可能是上一次不完整导致的, 等待下次解析.
byte first = buffer.get();
以 +OK\r\n 举例:
- private TedisPacket readSingleLinePacket(ByteBuffer buffer,int limit,int position) throws AioDecodeException {
- byte[] body = new byte[limit - position];
- int i = 0;
- // 结束标志
- boolean endFlag = false;
- while (buffer.position() <= limit) {
- byte b = buffer.get();
- // 如果是 \ r
- if (BufferReader.isCr(b)) {
- byte c = buffer.get();
- // 如果不是 \ n 抛出异常
- if (!BufferReader.isLf(c)) {
- throw new AioDecodeException("unexpected redis server response");
- }
- // 结束解析
- endFlag = true;
- break;
- } else {
- body[i++] = b;
- }
- }
- // 如果此次解析一直没有遇到 \ r\n, 则返回 null, 等待下次解析
- if (!endFlag) {
- return null;
- }
- TedisPacket packet = new TedisPacket();
- packet.setBody(body);
- return packet;
- }
写完解析代码之后, 再一次调试结果如下, 可以看到数据以 5 个字节减少, 说明数据包被正确解析了. 打印内容来自
- Tio:DecodeRunnable.java
- .
到此为止, 我们完成了消息的发送和接收, 但是问题来了, 由于消息是异步接收, 那我们如何才能让客户端知道命令调用是否成功呢? 注意, 下文中的内容仅为个人理解, 错误之处恳请指正
既然 Redis 是单线程处理的, 那么我是否可以理解为, 消息的处理就是先到先处理, 后到后处理呢? 所以, 我的解决方式是通过
LinkedBlockingQueue
. 当解析完一个包之后, 将这个包放入阻塞队列中.
- @Override
- public void handler(Packet packet, ChannelContext channelContext) throws Exception {
- TedisPacket responsePacket = (TedisPacket) packet;
- if (responsePacket != null) {
- QueueFactory.get(clientName).put(responsePacket);
- }
- }
同步接收返回消息:
- private String getReponse() {
- for (; ; ) {
- try {
- TedisPacket packet = QueueFactory.get(clientName).take();
- return packet.hasBody() ? SafeEncoder.encode(packet.getBody()) : null;
- } catch (InterruptedException e) {
- e.printStackTrace();
- return null;
- }
- }
- }
所以 set 代码就变成这样:
- @Override
- public String set(String key, String value) {
- client.set(key,value);
- return client.getStatusCodeReply();
- }
OK, 消息接收这块是基于我的理解, 我也不知道对不对, 而且, 其中的 BUG 肯定也是多的数不胜数, 没关系, 抱着学习的心态慢慢去完善就好了. Jedis 也不是一次两次就写成的对吧.
Tedis 与 Jedis
在开发过程中, 我阅读了很多 Jedis 的源代码, 大体思路能看懂, 可是很多细节处理对我来说就比较难了, 大神的代码只可膜拜. 不过也给了我很多启发. 最后不知天高地厚的和人家做一下对比吧.
- public static void main(String[] args) {
- Jedis tedis = new Jedis("192.168.1.225", 6379);
- long start = SystemTimer.currentTimeMillis();
- for (int i = 0; i < 200; i++) {
- tedis.set("tedis", "tedis");
- }
- tedis.get("tedis");
- long end = SystemTimer.currentTimeMillis();
- System.out.println("总共用时:" + (end - start) + "ms, 平均用时:" + ((end - start) / 100) + "ms");
- }
Jedis 结果: 总共用时: 262ms, 平均用时: 2ms
Tedis 结果: 总共用时: 390ms, 平均用时: 3ms
那么这一毫秒差在哪里呢?
总结
一篇博客简单介绍了 Redis 客户端的开发过程, 当然对于成熟的客户端 Jedis 来说, 也就是一个 HelloWorld, 不过这有什么关系呢? 知其然, 更要知其所以然. 看了大神的代码才知道自己有多渺小哦. 继续加油~~
源码地址: https://github.com/fanpan26/tedis https://github.com/fanpan26/tedis
来源: https://www.cnblogs.com/panzi/p/10213821.html