经过前面对 Redis 源码的了解, 令人印象深刻的也许就是 Redis 各种节约内存手段. 而 Redis 对于内存的节约可以说是费尽心思, 今天我就再来介绍一种 Redis 为了节约内存而创造的存储结构 -- 压缩列表(ziplist).
存储结构
ziplist 是 zset 和 hash 在元素数量较少时使用的一种存储结构. 它的特点存储于一块连续的内存, 元素与元素之间没有空隙. 我们可以用 DEBUG OBJECT 命令来查看一个 zset 的编码格式:
- 127.0.0.1:6379> ZADD db 1.0 MySQL 2.0 mongo 3.0 Redis
- (integer) 3
- 127.0.0.1:6379> DEBUG OBJECT db
- Value at:0x7f5bf1908070 refcount:1 encoding:ziplist serializedlength:39 lru:9589668 lru_seconds_idle:12
那么 ziplist 究竟是一种怎样的结构的, 话不多说, 直接看图.
ZIPLIST OVERALL LAYOUT
接下来我们挨个解释一下每一部分存储的内容:
zlbytes:32 位无符号整数, 存储的是包括它自己在内的整个 ziplist 所占用的字节数
zltail:32 位无符号整数, 存储的是最后一个 entry 的偏移量, 用来快速定位最后一个元素
zllen:16 位无符号整数, 用于存储 entry 的数量, 当元素数量大于 216-2 时, 这个值就被设置为 216-1. 我们想知道元素的数量就需要遍历整个列表
entry: 表示存储的元素
zlend:8 位无符号整数, 用于标识整个 ziplist 的结尾. 它的值是 255.
ZIPLIST ENTRIES
了解了 ziplist 的大概结构以后, 我们剖析更深一层的 entry 的结构.
对于每个 entry 都有两个前缀
prevlen: 表示前一个元素的长度, 它与 zltail 字段结合使用可以实现快速的从后向前定位元素
encoding: 表示元素的编码格式, 它用来表示元素是整数还是字符串, 如果是字符串, 也表示字符串的长度
entry-data: 元素的数据, 它并不是一定存在, 对于某些编码而言, 编码本身也是数据, 因此这一部分可以省略
这里要解释一点, prevlen 是一个变长的整数, 当前一个元素的长度小于 254 时, 它仅需要一个字节 (8 位无符号整数) 表示, 如果元素的长度大于(或等于)254 字节, prevlen 就用 5 个字节来表示, 其中第一个字节是 254, 后 4 个字节表示前一个元素的长度.
encoding 字段决定了元素的内容. 如果 entry 存储的是字符串, 那么就通过 encoding 的前两位来区分不同长度的字符串, 如果 entry 存储的内容是整数, 那么前两位都会被设置为 1, 再后面两位用来区分整数的类型.
|00pppppp|: 字符串长度小于 63 字节, pppppp 是 6 位无符号整数, 用来表示字符串长度
|01pppppp|qqqqqqqq|: 字符串长度小于等于 16383 字节, 后面 14 位表示字符串长度
|10000000|qqqqqqqq|rrrrrrrr|ssssssss|tttttttt|: 字符串长度大于等于 16384 字节, 后 4 个字节表示字符串长度
|11000000|:16 位整数, 后面跟 2 个字节存储整数
|11010000|:32 位整数, 后面跟 4 个字节存储整数
|11100000|:64 位整数, 后面跟 8 个字节存储整数
|11110000|:24 位整数, 后面跟 3 个字节存储整数
|11111110|:8 位整数, 后面跟 1 个字节存储整数
|1111xxxx|:(xxxx 取值从 0000 到 1101)表示 0 到 12 的整数, 读到的 xxxx 减 1 为实际表示的整数. 这就是前面提到的省略 entry-data 的情况
|11111111|:ziplist 的结束值, 也就是 zlend 的值
说了这么多, 也许你还是不太清楚 ziplist 存储的内容究竟要表示什么, 我们还是来举一个栗子
[0f 00 00 00] [0c 00 00 00] [02 00] [00 f3] [02 f6] [ff]
这是一个实际的 ziplist 存储的内容, 我们就一起来解读一下.
首先是 4 个字节的 zlbytes,ziplist 一共是 15 个字节, 因此 zlbytes 的值是 0x0f; 接下来是 4 个字节的 zltail, 偏移量是 12, 因此 zltail 的值是 0x0c; 后两个字节是 zllen, 也就是一共两个元素; 第一个元素的 prevlen 为 00,0xf3 表示元素值是 2:1111 0011 符合上述第 9 条, 读到 xxxx 为 3, 需要减 1, 因此实际值是 2; 第二个元素同理, 0xf6 表示的值是 5, 最后 0xff 表示这个 ziplist 结束.
这时, 我向这个 ziplist 中又加了一个元素, 是一个字符串, 请大家自行解读下面的 entry(注意, 只是 entry). 友情提示: 需要查询 ASCII 码表来解读
[02] [0b] [48 65 6c 6c 6f 20 57 6f 72 6c 64]
增加元素
了解了 ziplist 的存储之后, 我们再来看一下 ziplist 是如何增加元素的. 前面提到过, ziplist 存储结构用于元素数量少的 zset 和 hash. 那么我们就以 zset 为例, 一起追踪源码, 了解 ziplist 增加元素的过程.
我们从 ZADD 命令执行的函数 zaddCommand()开始.
- void zaddCommand(client *c) {
- zaddGenericCommand(c,ZADD_NONE);
- }
它只是简单调用了 zaddGenericCommand()函数, 传入了客户端对象 c 和一个标志位, 表示要执行 ZADD 命令, 因为这个函数同样也是 ZINCRBY 要执行的函数(传入的标志是 ZADD_INCR).
而在 zaddGenericCommand()函数中, 首先对参数进行了处理, 并且做了一些校验.
- /* Lookup the key and create the sorted set if does not exist. */
- zobj = lookupKeyWrite(c->db,key);
- if (zobj == NULL) {
- if (xx) goto reply_to_client; /* No key + XX option: nothing to do. */
- if (server.zset_max_ziplist_entries == 0 ||
- server.zset_max_ziplist_value <sdslen(c->argv[scoreidx+1]->ptr))
- {
- zobj = createZsetObject();
- } else {
- zobj = createZsetZiplistObject();
- }
- dbAdd(c->db,key,zobj);
- } else {
- if (zobj->type != OBJ_ZSET) {
- addReply(c,shared.wrongtypeerr);
- goto cleanup;
- }
- }
然后判断 key 是否存在, 如果存在, 验证数据类型; 否则创建一个新的 zset 对象. 这里可以看到, 当
zset_max_ziplist_entries 为 0 或者第一个元素的长度大于 zset_max_ziplist_value 时, 创建 zset 对象, 否则创建 ziplist 对象. 创建好对象之后, 就开始遍历元素, 执行 zsetAdd 函数了:
- for (j = 0; j <elements; j++) {
- double newscore;
- score = scores[j];
- int retflags = flags;
- ele = c->argv[scoreidx+1+j*2]->ptr;
- int retval = zsetAdd(zobj, score, ele, &retflags, &newscore);
- if (retval == 0) {
- addReplyError(c,nanerr);
- goto cleanup;
- }
- if (retflags & ZADD_ADDED) added++;
- if (retflags & ZADD_UPDATED) updated++;
- if (!(retflags & ZADD_NOP)) processed++;
- score = newscore;
- }
这个函数用来增加新元素或者更新元素的 score. 这个函数中判断了 zset 对象的编码方式, 对压缩列表 ziplist 和跳跃列表 skiplist 分开处理, 跳跃列表是 zset 的另一种编码方式, 这个我们以后再介绍, 本文我们只关注 ziplist.
- if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
- unsigned char *eptr;
- if ((eptr = zzlFind(zobj->ptr,ele,&curscore)) != NULL) {
- /* NX? Return, same element already exists. */
- if (nx) {
- *flags |= ZADD_NOP;
- return 1;
- }
- /* Prepare the score for the increment if needed. */
- if (incr) {
- score += curscore;
- if (isnan(score)) {
- *flags |= ZADD_NAN;
- return 0;
- }
- if (newscore) *newscore = score;
- }
- /* Remove and re-insert when score changed. */
- if (score != curscore) {
- zobj->ptr = zzlDelete(zobj->ptr,eptr);
- zobj->ptr = zzlInsert(zobj->ptr,ele,score);
- *flags |= ZADD_UPDATED;
- }
- return 1;
- } else if (!xx) {
- /* Optimize: check if the element is too large or the list
- * becomes too long *before* executing zzlInsert. */
- zobj->ptr = zzlInsert(zobj->ptr,ele,score);
- if (zzlLength(zobj->ptr)> server.zset_max_ziplist_entries)
- zsetConvert(zobj,OBJ_ENCODING_SKIPLIST);
- if (sdslen(ele)> server.zset_max_ziplist_value)
- zsetConvert(zobj,OBJ_ENCODING_SKIPLIST);
- if (newscore) *newscore = score;
- *flags |= ZADD_ADDED;
- return 1;
- } else {
- *flags |= ZADD_NOP;
- return 1;
- }
- }
可以看到, 这里首先调用 zzlFind()函数查找对应的元素, 如果元素存在, 那么就判断是否包含参数 NX 或者是否是 INCR 操作. 如果修改了元素的分数, 则先删除原有的元素, 再重新增加; 如果元素不存在, 就直接执行 zzlInsert()函数, 再 insert 之后, 会判断是否需要改为跳跃列表存储. 这里有两个条件:
zset 元素数量大于 zset_max_ziplist_entries(默认 128)
添加的元素长度大于 zset_max_ziplist_value(默认 64)
满足任意一个条件, zset 都会使用跳跃列表来存储.
我们继续追踪 zzlInsert()函数.
- unsigned char *zzlInsert(unsigned char *zl, sds ele, double score) {
- unsigned char *eptr = ziplistIndex(zl,0), *sptr;
- double s;
- while (eptr != NULL) {
- sptr = ziplistNext(zl,eptr);
- serverAssert(sptr != NULL);
- s = zzlGetScore(sptr);
- if (s> score) {
- /* First element with score larger than score for element to be
- * inserted. This means we should take its spot in the list to
- * maintain ordering. */
- zl = zzlInsertAt(zl,eptr,ele,score);
- break;
- } else if (s == score) {
- /* Ensure lexicographical ordering for elements. */
- if (zzlCompareElements(eptr,(unsigned char*)ele,sdslen(ele))> 0) {
- zl = zzlInsertAt(zl,eptr,ele,score);
- break;
- }
- }
- /* Move to next element. */
- eptr = ziplistNext(zl,sptr);
- }
- /* Push on tail of list when it was not yet inserted. */
- if (eptr == NULL)
- zl = zzlInsertAt(zl,NULL,ele,score);
- return zl;
- }
它首先定位了 zset 的第一个元素, 如果该元素不为空, 就比较该元素的分数 s 与要插入的元素分数 score, 如果 s>score, 就插入到当前位置, 如果分数相同, 则比较元素(按字典序). 插入后, 将后面的元素依次移到下一位.
- unsigned char *zzlInsertAt(unsigned char *zl, unsigned char *eptr, sds ele, double score) {
- unsigned char *sptr;
- char scorebuf[128];
- int scorelen;
- size_t offset;
- scorelen = d2string(scorebuf,sizeof(scorebuf),score);
- if (eptr == NULL) {
- zl = ziplistPush(zl,(unsigned char*)ele,sdslen(ele),ZIPLIST_TAIL);
- zl = ziplistPush(zl,(unsigned char*)scorebuf,scorelen,ZIPLIST_TAIL);
- } else {
- /* Keep offset relative to zl, as it might be re-allocated. */
- offset = eptr-zl;
- zl = ziplistInsert(zl,eptr,(unsigned char*)ele,sdslen(ele));
- eptr = zl+offset;
- /* Insert score after the element. */
- serverAssert((sptr = ziplistNext(zl,eptr)) != NULL);
- zl = ziplistInsert(zl,sptr,(unsigned char*)scorebuf,scorelen);
- }
- return zl;
- }
在 zzlInsertAt()函数中, 主要是调用了 ziplistPush()或者 ziplistInsert()将元素和分数插入列表尾部或中间. 插入顺序是先插入元素, 然后插入分数.
接下来就到了 ziplist.c 文件中, 真正向压缩列表中插入元素了. 关键代码在__ziplistInsert()函数中.
首先需要计算插入位置前一个元素的长度, 存储到当前 entry 的 prevlen.
- if (p[0] != ZIP_END) {
- ZIP_DECODE_PREVLEN(p, prevlensize, prevlen);
- } else {
- unsigned char *ptail = ZIPLIST_ENTRY_TAIL(zl);
- if (ptail[0] != ZIP_END) {
- prevlen = zipRawEntryLength(ptail);
- }
- }
这里区分了是否是在尾部插入元素的情况, 如果是在尾部, 就可以通过 ziplist 中的 zltail 字段直接定位. 接下来就是尝试对插入的元素进行编码, 判断是否可以存储为整数, 如果不能, 就按照字符串的编码格式来存储.
- if (zipTryEncoding(s,slen,&value,&encoding)) {
- /* 'encoding' is set to the appropriate integer encoding */
- reqlen = zipIntSize(encoding);
- } else {
- /* 'encoding' is untouched, however zipStoreEntryEncoding will use the
- * string length to figure out how to encode it. */
- reqlen = slen;
- }
这一步判断是节省内存的关键, 它会使用我们前面介绍的尽量小的编码格式来进行编码. 编码完成后就要计算当前 entry 的长度, 包括 prevlen,encoding 和 entry-data, 并且需要保证后一个 entry(如果有的话)的 prevlen 能够保存当前 entry 的长度. 这里调用的是 zipPrevLenByteDiff()函数, 需要的 prevlen 的长度和现有的 prevlen 的长度的差值, 也就是说如果返回为整数, 表示需要更多空间.
在这之后就要调用 zrealloc()来扩展空间了. 这里有可能会在原来的基础上进行扩展, 也有可能重新分配一块内存, 然后将原来的 ziplist 整体迁移. 如果 ziplist 占用较大内存时, 整体迁移的代价是很高的. 有了足够的空间之后, 就是把当前位置的 entry 向后移一位了, 然后要修改这个 entry 的 prevlen. 更新 zltail.
- if (nextdiff != 0) {
- offset = p-zl;
- zl = __ziplistCascadeUpdate(zl,p+reqlen);
- p = zl+offset;
- }
nextdiff 是前面 zipPrevLenByteDiff()函数的返回值, 它不为 0 表示需要更多空间(小于 0 时被置为 0). 这时后面的元素需要级联更新. 所有的这些处理完毕之后, 我们终于可以把要插入的 entry 写入当前位置了, 并且将 ziplist 的长度加 1.
级联更新
如果一个 entry 的长度小于 254 字节, 那么后一个元素的 prevlen 就用一个字节来存储, 否则就要用 5 个字节存储. 当我们插入一个元素时, 如果它的长度大于 253 字节, 那么原来的 entry 就可能从 1 个字节变成 5 个字节, 而如果由于这一变化导致这个 entry 的长度大于 254 字节, 那么后面的元素也要更新. 到后面甚至有可能导致重新分配内存的问题, 所以级联更新是一件很可怕的事情.
接下来就通过源码, 看一下级联更新的具体步骤.(查看 ziplist.c 文件的__ziplistCascadeUpdate 函数)
首先, 判断当前 entry 是否是最后一个, 如果是, 则跳出级联更新.
if (p[rawlen] == ZIP_END) break;
接着判断了下一个 entry 的 prevlen 长度是否发生变化, 如果没有变化, 也不用继续进行级联更新.
if (next.prevrawlen == rawlen) break;
而如果下一个 entry 的 prevlen 长度需要扩展, 那么就先调用 ziplistResize 扩展内存, 然后要更新 zltail. 要将后面的 entry 向后移动, 再开始判断下一个 entry 是否需要更新.
- if (next.prevrawlensize <rawlensize) {
- /* The "prevlen" field of "next" needs more bytes to hold
- * the raw length of "cur". */
- offset = p-zl;
- extra = rawlensize-next.prevrawlensize;
- zl = ziplistResize(zl,curlen+extra);
- p = zl+offset;
- /* Current pointer and offset for next element. */
- np = p+rawlen;
- noffset = np-zl;
- /* Update tail offset when next element is not the tail element. */
- if ((zl+intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))) != np) {
- ZIPLIST_TAIL_OFFSET(zl) =
- intrev32ifbe(intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))+extra);
- }
- /* Move the tail to the back. */
- memmove(np+rawlensize,
- np+next.prevrawlensize,
- curlen-noffset-next.prevrawlensize-1);
- zipStorePrevEntryLength(np,rawlen);
- /* Advance the cursor */
- p += rawlen;
- curlen += extra;
- }
如果后面的 entry 的 prevlen 大于需要的长度呢, 此时应该收缩 prevlen, 如果要进行收缩, 那么可能会继续级联更新. 这太麻烦了, 所以这里选择了浪费一些空间, 用 5 个字节的空间来存储 1 个字节可以存储的内容. 如果 prevlen 的长度等于需要的长度, 就直接更新内容.
- if (next.prevrawlensize> rawlensize) {
- /* This would result in shrinking, which we want to avoid.
- * So, set "rawlen" in the available bytes. */
- zipStorePrevEntryLengthLarge(p+rawlen,rawlen);
- } else {
- zipStorePrevEntryLength(p+rawlen,rawlen);
- }
- /* Stop here, as the raw length of "next" has not changed. */
- break;
除了新增操作以外, 删除操作也有可能引起级联更新. 假设我们有 3 个 entry 是下面的情况
我们可以知道, entry2 的 prevlen 需要 5 个字节, entry3 的 prevlen 只需要 1 个字节. 而如果我们删除了 entry2, 那么 entry3 的 prevlen 就需要扩展到 5 个字节, 这一操作就有可能引起级联更新, 后面的情况和新增节点时一样.
总结
最后做一个总结:
压缩列表是 zset 和 hash 元素个数较少时的存储结构
ziplist 由 zlbytes,zltail,zllen,entry,zlend 这五部分组成
每个 entry 由 prevlen,encoding 和 entry-data 三部分组成
ziplist 增加元素时, 需要重新计算插入位置的 entry 的 prevlen(prevlen 的长度为 1 字节或 5 字节), 这一操作有可能引起级联更新.
来源: https://juejin.im/post/5c9b7b085188251e406491d3