文章导航
Redis 源码系列的初衷, 是帮助我们更好地理解 Redis, 更懂 Redis, 而怎么才能懂, 光看是不够的, 建议跟着下面的这一篇, 把环境搭建起来, 后续可以自己阅读源码, 或者跟着我这边一起阅读. 由于我用 c 也是好几年以前了, 些许错误在所难免, 希望读者能不吝指出.
曹工说 Redis 源码 (1)-- Redis debug 环境搭建, 使用 clion, 达到和调试 java 一样的效果
曹工说 Redis 源码 (2)-- Redis server 启动过程解析及简单 c 语言基础知识补充
曹工说 Redis 源码 (3)-- Redis server 启动过程完整解析 (中)
曹工说 Redis 源码 (4)-- 通过 Redis server 源码来理解 listen 函数中的 backlog 参数
本讲主题
本讲将延续第三讲的主题, 将启动过程的主体讲完. 为了保证阅读体验, 避免过于突兀, 可以先阅读第三讲. 本讲, 主要讲解余下的部分:
创建 pid 文件
加载 rdb,aof, 获取数据
运行事件处理器, 准备处理事件, EventLoop 每次处理事件前的前置工作
创建 pid 文件
pid, 也就是进程 id, 以后台模式运行时, Redis 会把自己的 pid, 写入到一个文件中, 默认的文件路径和名称为:/var/run/Redis.pid.
配置文件可配:
- # When running daemonized, Redis writes a pid file in /var/run/Redis.pid by
- # default. You can specify a custom pid file location here.
- pidfile /var/run/Redis.pid
这部分代码非常简洁:
- void createPidFile(void) {
- // 1
- FILE *fp = fopen(server.pidfile, "w");
- if (fp) {
- // 2
- fprintf(fp, "%d\n", (int) getpid());
- // 3
- fclose(fp);
- }
- }
1, 打开文件, 这里的 pidfile 就是前面的文件名,/var/run/Redis.pid, 配置文件可以对其修改. 模式为 w, 表示将对其写入.
2, 调用 pid, 获取当前进程的 pid, 写入该文件描述符
3, 关闭文件.
加载 rdb,aof
在启动时, 会检查 aof 和 rdb 选项是否打开, 如果打开, 则会去加载数据, 这里要注意的是, Redis 总是先查看是否有 aof 开关是否打开; 打开的话, 则直接使用 aof;
如果 aof 没打开, 则去加载 rdb 文件.
- void loadDataFromDisk(void) {
- // 记录开始时间
- long long start = ustime();
- // AOF 持久化已打开
- if (server.aof_state == REDIS_AOF_ON) {
- // 尝试载入 AOF 文件
- if (loadAppendOnlyFile(server.aof_filename) == REDIS_OK)
- // 打印载入信息, 并计算载入耗时长度
- redisLog(REDIS_NOTICE, "DB loaded from append only file: %.3f seconds",
- (float) (ustime() - start) / 1000000);
- // AOF 持久化未打开
- } else {
- // 尝试载入 RDB 文件
- if (rdbLoad(server.rdb_filename) == REDIS_OK) {
- // 打印载入信息, 并计算载入耗时长度
- redisLog(REDIS_NOTICE, "DB loaded from disk: %.3f seconds",
- (float) (ustime() - start) / 1000000);
- }
- }
- }
加载的过程, 现在来讲, 不太合适, 比如以 aof 为例, aof 文件中存储了一条条的命令, 加载 aof 文件的过程, 其实就会在进程内部创建一个 fake client(源码中就是这样命名, 也就是一个假的客户端), 来一条条地发送 aof 文件中的命令进行执行.
这个命令执行的过程, 现在讲会有点早, 所以 aof 也放后面吧, 讲了命令执行再回头看这块.
事件循环结构体讲解
核心流程如下:
- // 1
- aeSetBeforeSleepProc(server.el, beforeSleep);
- // 2
- aeMain(server.el);
先看 2 处, 这里传入 server 这个全局变量中的 el 属性, 该属性就代表了当前事件处理器的状态, 其定义如下:
- // 事件状态
- aeEventLoop *el;
el, 实际就是 EventLoop 的简写; 结构体 aeEventLoop, 里面维护了: 当前使用的多路复用库的函数, 当前注册到多路复用库, 在发生读写事件时, 需要被通知的 socket 文件描述符, 以及其他一些东西.
- typedef struct aeEventLoop {
- // 目前已注册的最大描述符
- int maxfd; /* highest file descriptor currently registered */
- // 目前已追踪的最大描述符
- int setsize; /* max number of file descriptors tracked */
- // 用于生成时间事件 id
- long long timeEventNextId;
- // 最后一次执行时间事件的时间
- time_t lastTime; /* Used to detect system clock skew */
- // 1 已注册的文件事件
- aeFileEvent *events; /* Registered events */
- // 2 已就绪的文件事件
- aeFiredEvent *fired; /* Fired events */
- // 3 时间事件
- aeTimeEvent *timeEventHead;
- // 事件处理器的开关
- int stop;
- // 4 多路复用库的私有数据
- void *apidata; /* This is used for polling API specific data */
- // 5 在处理事件前要执行的函数
- aeBeforeSleepProc *beforesleep;
- } aeEventLoop;
1 处, 注册到多路复用库, 需要监听的 socket 文件描述符事件, 比如, 某 socket 的可读事件;
2 处, 以 select 或者 epoll 这类多路复用库为例, 在一次 select 中, 如果发现某些 socket 事件已经满足, 则, 这些 ready 的事件, 会被存放到本属性中.
因为我的描述比较抽象, 这里拿一段 man select 中的说明给大家看下:
select() allow a program to monitor multiple file descriptors, waiting until one or more of the file descriptors become "ready" for some class of I/O operation (e.g., input possible). A file descriptor is considered ready if it is possible to perform the corresponding I/O operation (e.g., read(2)) without blocking.
直译一下: select() 允许一个程序去监听多个文件描述符, 等待直到 1 个或多个文件描述符变成 ready 状态, 该状态下, 可以不阻塞地读写该文件描述符.
3 处, 事件事件, 主要用来周期执行, 执行一些 Redis 的后台任务, 如删除过期 key, 后面细讲.
4 处, 指向当前正在使用的多路复用库的相关数据, 目前 Redis 支持: select,epoll,kqueue,evport
5 处, 在处理事件前, 要执行的一个函数
再回头来看前面的代码:
- // 1
- aeSetBeforeSleepProc(server.el, beforeSleep);
- aeMain(server.el);
这里的 1 处, 就是设置前面第 5 点提到的, 设置处理事件前, 先要执行的一个函数.
事件循环处理器的主循环
- void aeMain(aeEventLoop *eventLoop) {
- eventLoop->stop = 0;
- while (!eventLoop->stop) {
- // 如果有需要在事件处理前执行的函数, 那么运行它
- if (eventLoop->beforesleep != NULL)
- eventLoop->beforesleep(eventLoop);
- // 开始处理事件
- aeProcessEvents(eventLoop, AE_ALL_EVENTS);
- }
- }
可以看到, 一共 2 个部分, 首先执行 eventLoop 的事件处理前要执行的函数; 接着再开始处理事件.
事件处理前的前置执行函数
这里讲解下面这一句:
eventLoop->beforesleep(eventLoop);
这个函数, 在前面已经看到了, 被赋值为:
aeSetBeforeSleepProc(server.el, beforeSleep);
这个 beforeSleep 如下:
- void beforeSleep(struct aeEventLoop *eventLoop) {
- /* Run a fast expire cycle (the called function will return
- * ASAP if a fast cycle is not needed). */
- // 1 执行一次快速的主动过期检查
- if (server.active_expire_enabled && server.masterhost == NULL)
- activeExpireCycle(ACTIVE_EXPIRE_CYCLE_FAST);
- // 2
- ...
- /* Write the AOF buffer on disk */
- // 3 将 AOF 缓冲区的内容写入到 AOF 文件
- flushAppendOnlyFile(0);
- /* Call the Redis Cluster before sleep function. */
- // 在进入下个事件循环前, 执行一些集群收尾工作
- if (server.cluster_enabled) clusterBeforeSleep();
- }
1, 这里会去执行主动的过期检查, 大致流程代码如下:
- void activeExpireCycle(int type) {
- /* This function has some global state in order to continue the work
- * incrementally across calls. */
- // 静态变量, 用来累积函数连续执行时的数据
- static unsigned int current_db = 0; /* Last DB tested. */
- ...
- unsigned int j, iteration = 0;
- // 默认每次处理的数据库数量
- unsigned int dbs_per_call = REDIS_DBCRON_DBS_PER_CALL;
- // 函数开始的时间
- long long start = ustime(), timelimit;
- dbs_per_call = server.dbnum;
- timelimit = 1000000 * ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC / server.hz / 100;
- timelimit_exit = 0;
- if (timelimit <= 0) timelimit = 1;
- // 1 遍历数据库
- for (j = 0; j <dbs_per_call; j++) {
- int expired;
- // 指向要处理的数据库
- redisDb *db = server.db + (current_db % server.dbnum);
- current_db++;
- do {
- unsigned long num, slots;
- long long now, ttl_sum;
- int ttl_samples;
- /* If there is nothing to expire try next DB ASAP. */
- // 2 获取数据库中带过期时间的键的数量 如果该数量为 0 , 直接跳过这个数据库
- if ((num = dictSize(db->expires)) == 0) {
- db->avg_ttl = 0;
- break;
- }
- // 3 获取数据库中键值对的数量
- slots = dictSlots(db->expires);
- // 当前时间
- now = mstime();
- // 每次最多只能检查 LOOKUPS_PER_LOOP 个键
- if (num> ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP)
- num = ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP;
- // 4 开始遍历数据库
- while (num--) {
- dictEntry *de;
- long long ttl;
- // 从 expires 中随机取出一个带过期时间的键
- if ((de = dictGetRandomKey(db->expires)) == NULL) break;
- // 计算 TTL
- ttl = dictGetSignedIntegerVal(de) - now;
- // 5 如果键已经过期, 那么删除它, 并将 expired 计数器增一
- if (activeExpireCycleTryExpire(db, de, now)) expired++;
- }
- // 6 为这个数据库更新平均 TTL 统计数据
- ...
- // 更新遍历次数
- iteration++;
- // 7 每遍历 16 次执行一次
- if ((iteration & 0xf) == 0 && /* check once every 16 iterations. */
- (ustime() - start)> timelimit) {
- // 如果遍历次数正好是 16 的倍数
- // 并且遍历的时间超过了 timelimit
- // 那么断开 timelimit_exit
- timelimit_exit = 1;
- }
- // 8 已经超时了, 返回
- if (timelimit_exit) return;
- /* We don't repeat the cycle if there are Less than 25% of keys
- * found expired in the current DB. */
- // 如果已删除的过期键占当前总数据库带过期时间的键数量的 25 %
- // 那么不再遍历
- } while (expired> ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP / 4);
- }
- }
这个函数, 删减了一部分, 留下了主流程:
1 处, 遍历数据库, 一般就是遍历 16 个库
2 处, 获取当前库中, 过期键的数量, 过期键都存储在 db->expires 中, 只需要算这个 map 的 size 即可; 如果没有要过期的, 处理下一个库
3 处, 获取过期键的数量
4 处, 开始遍历当前数据库的过期键, 最多遍历 20 次, 这里的 num, 被
ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP
赋值, 这个值定义为 20, 也就是说, 每次扫描一个库中, 20 个过期键
5 处, 如果键已过期, 则将这个 key 过期掉, 比如从当前数据库删除, 发布事件等等
6 处, 计算一些统计数据
7 处, 遍历 16 次, 检查下是否已经执行了足够长的时间; 因为 Redis 是单线程的, 不能一直执行过期键清理任务, 还要处理客户端请求呢, 所以, 这里每执行 16 次循环, 就检查下时间, 看看是否已经超时, 超时直接返回.
8 处, 超时返回
讲完了主动过期, 接着讲前面的流程, 2 处, 涉及一些主从复制相关的东西, 这块放到后面吧
3 处, 将 aof 从缓存中, 刷到磁盘
这个方法比较长, 在后面分段讲解
刷新 aof 缓存到磁盘的执行过程
判断是否有正在进行中的任务
- void flushAppendOnlyFile(int force) {
- ssize_t nwritten;
- int sync_in_progress = 0;
- // 缓冲区中没有任何内容, 直接返回
- if (sdslen(server.aof_buf) == 0) return;
- // 策略为每秒 FSYNC
- if (server.aof_fsync == AOF_FSYNC_EVERYSEC)
- //1 是否有 SYNC 正在后台进行?
- sync_in_progress = bioPendingJobsOfType(REDIS_BIO_AOF_FSYNC) != 0;
1 处, 会去判断一个全局变量, 该变量是一个队列, 用于存储后台任务. 另外一个后台线程 (没错, Redis 不是单纯的单线程, 还是有其他线程的), 会去该队列取任务, 取不到就阻塞; 取到了则执行. 而刷新 aof 到磁盘这种重 io 的工作, 就是被封装为一个任务, 丢到这个队列中的. 所以, 这里去判断队列的大小是否为 0.
- /* Return the number of pending jobs of the specified type.
- *
- * 返回等待中的 type 类型的工作的数量
- */
- unsigned long long bioPendingJobsOfType(int type) {
- unsigned long long val;
- pthread_mutex_lock(&bio_mutex[type]);
- // 1
- val = bio_pending[type];
- pthread_mutex_unlock(&bio_mutex[type]);
- return val;
- }
1 处这里的 val, 就是存储指定类型的任务的数量. 我们这里传入的 type 为 REDIS_BIO_AOF_FSYNC, 所以就是看看: aof 刷盘的任务数量.
调用 write 函数执行写入
- // 1
- nwritten = write(server.aof_fd,server.aof_buf,sdslen(server.aof_buf));
- if (nwritten != (signed)sdslen(server.aof_buf)) {
- // 2
- ...
- }else{
- // 3
- /* Successful write(2). If AOF was in error state, restore the
- * OK state and log the event. */
- // 写入成功, 更新最后写入状态
- if (server.aof_last_write_status == REDIS_ERR) {
- redisLog(REDIS_WARNING,
- "AOF write error looks solved, Redis can write again.");
- server.aof_last_write_status = REDIS_OK;
- }
- }
1 处, 执行写入, 将 server.aof_buf 这个缓冲区的内容, 写入 aof 文件, 写入的字节长度为 sdslen(server.aof_buf). 也就是, 将整个缓冲区写入.
2 处, 如果写入的长度, 不等于缓冲区的长度, 表示只写了一部分, 进入异常分支
为什么写入的会比预期的少, 我们看看官方说明:
- write() writes up to count bytes from the buffer pointed buf to the file referred to by the file descriptor fd.
- The number of bytes written may be Less than count if, for example, there is insufficient space on the underlying physical medium, or the RLIMIT_FSIZE resource limit is encountered (see setrlimit(2)), or the call was interrupted by a signal handler after having written Less than count bytes. (See also pipe(7).)
这里的第二段就说了, 可能是因为底层物理介质的空间不够; 进程的资源限制; 或者被中断.
3 处, 写入成功; 更新状态, 如果上一次 aof 写入状态为 error, 这次改为 ok
flush 到磁盘
前面 write 是写入到操作系统的 os cache 中, 但是还没有落盘. 必须执行 flush 之后, 才会刷盘.
- // 总是执行 fsnyc
- if (server.aof_fsync == AOF_FSYNC_ALWAYS) {
- /* aof_fsync is defined as fdatasync() for Linux in order to avoid
- * flushing metadata. */
- // 1
- aof_fsync(server.aof_fd); /* Let's try to get this data on the disk */
- // 更新最后一次执行 fsnyc 的时间
- server.aof_last_fsync = server.unixtime;
- // 策略为每秒 fsnyc , 并且距离上次 fsync 已经超过 1 秒
- } else if ((server.aof_fsync == AOF_FSYNC_EVERYSEC &&
- server.unixtime> server.aof_last_fsync)) {
- // 2 放到后台执行
- if (!sync_in_progress) aof_background_fsync(server.aof_fd);
- // 更新最后一次执行 fsync 的时间
- server.aof_last_fsync = server.unixtime;
- }
1 处, 如果 aof 策略为: AOF_FSYNC_ALWAYS, 则调用 fsync, 刷盘
2 处, 如果策略为每秒刷盘: AOF_FSYNC_EVERYSEC, 放到后台去刷盘. 这里的放到后台, 就是放到前面提到的任务队列中, 由其他线程去刷.
- void aof_background_fsync(int fd) {
- bioCreateBackgroundJob(REDIS_BIO_AOF_FSYNC,(void*)(long)fd,NULL,NULL);
- }
- void bioCreateBackgroundJob(int type, void *arg1, void *arg2, void *arg3) {
- struct bio_job *job = zmalloc(sizeof(*job));
- job->time = time(NULL);
- job->arg1 = arg1;
- job->arg2 = arg2;
- job->arg3 = arg3;
- pthread_mutex_lock(&bio_mutex[type]);
- // 1 将新工作推入队列
- listAddNodeTail(bio_jobs[type],job);
- bio_pending[type]++;
- pthread_cond_signal(&bio_condvar[type]);
- pthread_mutex_unlock(&bio_mutex[type]);
- }
这里的 1 处, 可以看到, 将任务丢到了队列中, 且前后进行了加锁. 因为这个队列, 是会被其他线程访问的, 所以为了线程安全, 进行了加锁.
- todo
- void aeMain(aeEventLoop *eventLoop) {
- eventLoop->stop = 0;
- while (!eventLoop->stop) {
- // 如果有需要在事件处理前执行的函数, 那么运行它
- if (eventLoop->beforesleep != NULL)
- // 1
- eventLoop->beforesleep(eventLoop);
- // 2 开始处理事件
- aeProcessEvents(eventLoop, AE_ALL_EVENTS);
- }
- }
启动做的事实在太多了, 本篇把 1 这里的这个函数讲了, 下篇才能讲 2.
总结
本篇主要讲了, Redis 启动过程中, 主循环的大流程, 以及在主循环去处理一个事件之前, 要执行的任务. 这个主循环如何处理事件, 放到下篇继续.
来源: https://www.cnblogs.com/grey-wolf/p/12709257.html