1. 概述
Memcached 网络模块是基于 libevent 库开发的, 主要分为两个模块: 连接监听线程, 工作线程. 连接监听线程是用来监听来自客户端连接的, 工作线程主要是用来完成具体业务逻辑处理.
网络模块模型
网络模块时序图
2. 连接监听线程 (主线程)
1. 初始化主线程 libevent 实例, 用于监听来自客户端的连接
2.work 线程的资源分配与初始化
- int main (int argc, char **argv) {
- //.......
- /* initialize main thread libevent instance */
- #if defined(LIBEVENT_VERSION_NUMBER) && LIBEVENT_VERSION_NUMBER>= 0x02000101
- /* If libevent version is larger/equal to 2.0.2-alpha, use newer version */
- struct event_config *ev_config;
- ev_config = event_config_new();
- event_config_set_flag(ev_config, EVENT_BASE_FLAG_NOLOCK);
- main_base = event_base_new_with_config(ev_config);
- event_config_free(ev_config);
- #else
- /* Otherwise, use older API */
- main_base = event_init();
- //main_base 主线程的 libevent 实例, 主要用于监听来自客户端连接
- #endif
- //.......
- //work 线程相关初始化
- memcached_thread_init(settings.num_threads, storage);
- //.......
- errno = 0;
- // 服务端 socket 绑定 ip 与端口进行连接监听
- if (settings.port && server_sockets(settings.port, tcp_transport,
- portnumber_file)) {
- vperror("failed to listen on TCP port %d", settings.port);
- exit(EX_OSERR);
- }
- //.......
- /* enter the event loop */
- if (event_base_loop(main_base, 0) != 0) {
- retval = EXIT_FAILURE;
- }
- //.......
- return retval
- }
server_sockets 根据配置信息开启服务端 socket 的监听
- static int server_sockets(int port, enum network_transport transport,
- FILE *portnumber_file) {
- if (settings.inter == NULL) {
- return server_socket(settings.inter, port, transport, portnumber_file);
- }else{
- // tokenize them and bind to each one of them..
- char *b;
- int ret = 0;
- char *list = strdup(settings.inter);
- //.......
- ret |= server_socket(p, the_port, transport, portnumber_file);
- //.......
- return ret;
- }
- }
- static int server_socket(const char *interface,
- int port,
- enum network_transport transport,
- FILE *portnumber_file) {
- //.......
- for (next= ai; next; next= next->ai_next) {
- //.......
- conn *listen_conn_add;
- // 服务端监听 fd 的生成
- if ((sfd = new_socket(next)) == -1) {
- //.......
- setsockopt(sfd,...);
- bind(sfd,...);
- listen(sfd,...);
- // 至此完成了服务端 socket 参数设置, bind,listen 等工作
- // 接下来就是将该 sfd 加入到 libevent 实例中进行 "客户端连接" 事件的监听
- //.......
- // 该函数功能执行相关 libevent 配置, 以及针对该套接字句柄作一些相关资源的配置
- conn_new(sfd, conn_listening, EV_READ | EV_PERSIST, 1, transport, main_base)));
- //.......
- }
- }
- //.......
- }
3.work 线程
1. 监听来自客户端连接的可读事件
2. 获取客户端的操作请求, 完成具体业务逻辑处理功能
- //work 线程初始入口函数
- void memcached_thread_init(int nthreads, void *arg) {
- //.......
- //nthreads work 线程数目
- //threads 是 LIBEVENT_THREAD[nthreads] 数组, 每个 work 线程都有个自己对应的 LIBEVENT_THREAD
- // 是 LIBEVENT_THREAD 结构体保存了当前线程相关资源, 例如: libevent 实例, 套接字 fd 队列等
- threads = calloc(nthreads, sizeof(LIBEVENT_THREAD));
- for (i = 0; i <nthreads; i++) {
- //for 循环为每个线程生成一个管道
- int fds[2];
- // 管道初始化
- if (pipe(fds)) {
- perror("Can't create notify pipe");
- exit(1);
- }
- // 第 i 号 work 线程的接收管道
- threads[i].notify_receive_fd = fds[0];
- // 第 i 号 work 线程的发送管道
- threads[i].notify_send_fd = fds[1];
- //.......
- // 线程资源初始化, 主要是线程的 libevent 实例初始化, 套接字 fd 队列等资源初始化
- setup_thread(&threads[i]);
- //.......
- }
- //.......
- for (i = 0; i < nthreads; i++) {
- // 创建 work 线程
- //worker_libevent 是 work 线程函数
- create_worker(worker_libevent, &threads[i]);
- }
- }
- static void setup_thread(LIBEVENT_THREAD *me) {
- #if defined(LIBEVENT_VERSION_NUMBER) && LIBEVENT_VERSION_NUMBER>= 0x02000101
- struct event_config *ev_config;
- ev_config = event_config_new();
- event_config_set_flag(ev_config, EVENT_BASE_FLAG_NOLOCK);
- me->base = event_base_new_with_config(ev_config);
- event_config_free(ev_config);
- #else
- me->base = event_init();
- #endif
- //me->base 将一个 libevent 实例保存到 LIBEVENT_THREAD 中
- // 每个线程都维护着自己的 libevent 实例, 用于监听文件句柄事件的发生
- //me->base 这个 libevent 实例主要用于监听 "客户端 fd 连接" 是否有可读事件
- //.......
- //notify_event 也是一个 libevent 实例, 主要用来监听当前 work 线程是否有 "管道事件"
- // 当主线程向 me->notify_send_fd 管道写'c'的时候
- //me->notify_receive_fd 接收管道有数据可读, 那么就会触发 thread_libevent_process 回调函数
- //thread_libevent_process 管道回调函数
- event_set(&me->notify_event, me->notify_receive_fd,
- EV_READ | EV_PERSIST, thread_libevent_process, me);
- event_base_set(me->base, &me->notify_event);
- if (event_add(&me->notify_event, 0) == -1) {
- fprintf(stderr, "Can't monitor libevent notify pipe\n");
- exit(1);
- }
- //.......
- // 线程套接字 fd 队列资源初始化
- me->new_conn_queue = malloc(sizeof(struct conn_queue));
- if (me->new_conn_queue == NULL) {
- perror("Failed to allocate memory for connection queue");
- exit(EXIT_FAILURE);
- }
- cq_init(me->new_conn_queue);
- //.......
- }
- // 创建线程 work 线程函数
- typedef void *(*func)(void *) pfunc;
- static void create_worker(pfunc func, void *arg) {
- pthread_attr_t attr;
- int ret;
- pthread_attr_init(&attr);
- if ((ret = pthread_create(&((LIBEVENT_THREAD*)arg)->thread_id, &attr, func, arg)) != 0) {
- fprintf(stderr, "Can't create thread: %s\n",
- strerror(ret));
- exit(1);
- }
- }
- /*
- * Worker thread: main event loop
- */
- static void *worker_libevent(void *arg) {
- LIBEVENT_THREAD *me = arg;
- //.......
- //libevent 事件循环监听, 如果有事件发生, 就会触发对应的业务回调函数
- event_base_loop(me->base, 0);
- event_base_free(me->base);
- return NULL;
- }
thread_libevent_process 管道回调函数, 例如: 当主线程监听到有客户端连接过滤, 主线程 accept 之后返回该客户端的 fd 句柄, 主线程不会将 fd 直接加入到 work 线程的 libevent 中进行可读事件的监听, 而是分两步进行, 第一步: 将 fd 封装一个 item 节点, 将 item 节点放入到一个 work 线程的 item 队列中. 第二步: 通过管道写入'c'消息, 通知对应的 work 线程到它自己的 item 队列中取 item 节点, 然后加入到自己的 libevent 中对 fd 进行可读事件的监听. 而这个 thread_libevent_process 就是管道可读事件的回调触发函数.
- // 管道事件回调函数
- static void thread_libevent_process(int fd, short which, void *arg) {
- // 线程资源
- LIBEVENT_THREAD *me = arg;
- CQ_ITEM *item;
- char buf[1];
- conn *c;
- unsigned int timeout_fd;
- //notify_receive_fd 读管道读 1 个字节数据
- if (read(fd, buf, 1) != 1) {
- if (settings.verbose> 0)
- fprintf(stderr, "Can't read from libevent pipe\n");
- return;
- }
- switch (buf[0]) {
- case 'c':
- //'c'消息就是主线程将客户端新连接加到 work 线程 new_conn_queue 队列中, 并通过 notify_send_fd 通知 work 线程去取
- item = cq_pop(me->new_conn_queue);
- //item 客户端 fd 封装结构
- if (NULL == item) {
- break;
- }
- switch (item->mode) {
- case queue_new_conn:
- //item->init_state = conn_new_cmd 新连接
- c = conn_new(item->sfd, item->init_state, item->event_flags,
- item->read_buffer_size, item->transport,
- me->base);
- if (c == NULL) {
- //.......
- } else {
- //c->thread 保存 work 线程 LIBEVENT_THREAD 资源
- // 这样 fd 会话连接就可以访问该 work 主线程的 LIBEVENT_THREAD 相关资源
- c->thread = me;
- }
- break;
- case queue_redispatch:
- conn_worker_readd(item->c);
- break;
- }
- cqi_free(item);
- break;
- /* we were told to pause and report in */
- case 'p':
- register_thread_initialized();
- break;
- /* a client socket timed out */
- case 't':
- // 超时线程的通知消息
- // 超时线程会定时扫描 conns 连接会话, 如果发现某个连接会话超时了,
- // 那么就会写't'消息通知该会话对应的 work 线程关闭该会话
- if (read(fd, &timeout_fd, sizeof(timeout_fd)) != sizeof(timeout_fd)) {
- if (settings.verbose> 0)
- fprintf(stderr, "Can't read timeout fd from libevent pipe\n");
- return;
- }
- // 关闭会话
- conn_close_idle(conns[timeout_fd]);
- break;
- }
- }
1.conn_new 主要是针对新连接会话做资源分配与初始化, 以及将 fd 套接字加入到 libevent 中进行监听. 在主线程和 work 线程中都有涉及
2.drive_machine 是一个非常重要的函数, 它内部会根据 conn 连接的具体状态, 选择执行相应的业务处理逻辑, 我们这里可以理解为一个状态机
- conn *conn_new(const int sfd, enum conn_states init_state,
- const int event_flags,
- const int read_buffer_size, enum network_transport transport,
- struct event_base *base) {
- //conn 保存了 sfd 连接的相关信息, 我在这里将其理解为一个会话
- //conns[MAX] 数组就是维护着当前系统所有的连接会话
- conn *c;
- c = conns[sfd];
- if (NULL == c) {
- c = (conn *)calloc(1, sizeof(conn));
- //.......
- c->sfd = sfd;
- conns[sfd] = c;
- }
- // 事件回调函数 event_handler, 通过 c->state 状态值调用相应的业务逻辑处理
- c->state = init_state;
- //.......
- // 将套接字句柄 sfd 加入到 libevent 实例中进行监听
- // 当 sfd 套接字有可读事件发生的时候 libevent 会回调 event_handler 函数
- // 回调函数 event_handler 就是具体的业务逻辑处理
- event_set(&c->event, sfd, event_flags, event_handler, (void *)c);
- event_base_set(base, &c->event);
- event_add(&c->event, 0);
- //.......
- return c;
- }
- // 事件回调函数
- void event_handler(const int fd, const short which, void *arg) {
- conn *c;
- c = (conn *)arg;
- //.......
- // 状态机
- drive_machine(c);
- //.......
- }
- static void drive_machine(conn *c) {
- bool stop = false;
- int sfd;
- //.......
- while (!stop) {
- switch(c->state) {
- case conn_listening:
- //.......
- sfd = accept(c->sfd, (struct sockaddr *)&addr, &addrlen);
- //.......
- // 服务端 accept 一个新的客户端连接句柄 sfd
- // 将该 sfd 通过轮训的方式分发到对应 work 线程的 fd 队列中, 然后通过管道通知
- // 对应 work 线程到对应 fd 队列中去取客户端连接句柄
- //work 线程获取该 sfd 后将其加入 work 线程的 libevent 进行监听, work 线程 libevent 主要监听 sfd 是否有可读数据
- dispatch_conn_new(sfd, conn_new_cmd, EV_READ | EV_PERSIST,
- DATA_BUFFER_SIZE, c->transport);
- stop = true;
- break;
- case conn_waiting:
- //.......
- break;
- case conn_read:
- //.......
- break;
- case conn_parse_cmd:
- //.......
- break;
- case conn_new_cmd:
- //.......
- reset_cmd_handler(c);
- //.......
- break;
- case conn_nread:
- //.......
- break;
- //.......
- case conn_max_state:
- //.......
- break;
- }
- }
- //.......
- }
- // 分发一个新的连接到其他线程
- void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags,
- int read_buffer_size, enum network_transport transport) {
- // 将 sfd 封装到 CQ_ITEM 节点中
- CQ_ITEM *item = cqi_new();
- char buf[1];
- // 通过轮训的方式确定一个 work 线程的编号
- //settings.num_threads work 线程数目
- int tid = (last_thread + 1) % settings.num_threads;
- //LIBEVENT_THREAD 该结构保存了线程相关资源
- //threads 全局变量
- //threads + tid 对应线程相关资源
- LIBEVENT_THREAD *thread = threads + tid;
- last_thread = tid;
- item->sfd = sfd;
- //init_state = conn_new_cmd
- // 新连接
- item->init_state = init_state;
- item->event_flags = event_flags;
- item->read_buffer_size = read_buffer_size;
- item->transport = transport;
- item->mode = queue_new_conn;
- // 将封装好 sfd 的节点 push 到对应 work 线程 fd 队列中
- cq_push(thread->new_conn_queue, item);
- MEMCACHED_CONN_DISPATCH(sfd, thread->thread_id);
- //'c'新连接标志
- buf[0] = 'c';
- // 将标志写入 notify_send_fd 管道
- //work 线程发现 notify_receive_fd 管道有数据发送过来, 就会触发管道事件的回调函数
- if (write(thread->notify_send_fd, buf, 1) != 1) {
- perror("Writing to thread notify pipe");
- }
- }
- static void reset_cmd_handler(conn *c) {
- //.......
- if (c->rbytes> 0) {
- // 已经读取了客户端的数据, 那么设置状态为 conn_parse_cmd,
- // 提示状态机 drive_machine 下一步执行 conn_parse_cmd 逻辑处理
- conn_set_state(c, conn_parse_cmd);
- } else {
- // 还未读取数据, 是一个新连接
- // 将状态设置为 conn_waiting, 提示状态机下一步执行 conn_waiting 逻辑处理
- //conn_waiting 中执行了修改 update_event(c, EV_READ | EV_PERSIST), 用于监听可读事件
- conn_set_state(c, conn_waiting);
- }
- //.......
- }
来源: http://www.jianshu.com/p/a6c3a3e7c363