本文代码选自内核 4.17
eventfd(2) - 创建一个文件描述符用于事件通知.
使用
源码分析
参考
- #include <sys/eventfd.h>
- int eventfd(unsigned int initval, int flags);
- int eventfd2(unsigned int initval, int flags);
参数
- \initval 为初始值(关联内部结构的 count)
- \flags 内核 2.6.26 之前的版本这个参数无效且必须指定为 0
flags 有意义的参数为
- EFD_CLOEXEC, 等效于 O_CLOEXEC
- EFD_NONBLOCK, 等效于 O_NONBLOCK
- EFD_SEMAPHORE, 信号量选项, 影响 read(2) 的取值
返回
- 成功返回一个新的文件描述符, 失败返回 -1 并设置 errno
eventfd 作为一个非常简单的抽象文件, 每个文件描述符都对应一个在内核空间维护的 __u64 count, 一个无符号 64 位整形的计数器, 而 eventfd 对应的文件操作都与这个计数器相关.
提供的文件操作
read(2), 读取 count 减少的值, 若 flags 设置 EFD_SEMAPHORE 则 count -= 1, 否则 count -= count; 函数成功返回 8
write(2), 写入一个 cnt,count += cnt, 函数成功返回 8
poll(2), poll 操作, 事件通知的核心, 详见下
close(2), eventfd 结构对象引用计数减一, 若未 0, 则释放所占用的内存
使用
eventfd(2) 核心就是其 poll 操作, 最常见的用法是配合 select(2)/poll(2)/epoll(2) 使用达到不同线程间通信的作用.
- #include <poll.h>
- #include <unistd.h>
- #include <stdio.h>
- #include <sys/eventfd.h>
- #include <pthread.h>
- int efd;
- void *run_eventfd_write(void *arg) {
- uint64_t count = 1;
- while (1) {
- printf("write count: %zu\n", count);
- write(efd, &count, sizeof(count));
- count++;
- sleep(2);
- }
- }
- int main() {
- struct pollfd fds;
- pthread_t pid;
- unsigned int initval = 1000; // 观察将 1000 改为 0 后打印的顺序
- int flags = 0;
- int timeout = 1000;
- // flags |= EFD_SEMAPHORE; // 观察将该注释取消打印的结果
- efd = eventfd(initval, flags);
- fds.fd = efd;
- fds.events |= POLLIN;
- pthread_create(&pid, NULL, run_eventfd_write, NULL);
- while (1) {
- int ret = poll(&fds, 1, timeout);
- if (ret> 0) {
- uint64_t count;
- read(efd, &count, sizeof(count));
- printf("read count: %zu\n", count);
- }
- }
- }
- read count: 1000
- write count: 1
- read count: 1
- write count: 2
- read count: 2
- write count: 3
- read count: 3
- write count: 4
- read count: 4
这里使用了一个非常简单的示例, 程序不严谨但是很好的展示了如何在两个线程进行通信, 在子线程中, 通过一个无限循环每隔一秒向 eventfd 中写入一个逐渐增大的无符号长整形数字, 在主线程中通过 poll(2) 接收到有就绪事件产生, 并且使用 read 函数读取内核空间的计数器减少的值.
read write 系统调用的参数都是以下的形式
int read(int, void *, size_t);
而 eventfd 内部是维护的计数器, 所以在使用的时候, 保持第二个参数和第三个参数分别为 uint64_t * 和 sizeof(uint64_t)
实现
eventfd(2) 代码实现位于 fs/eventfd.c 中
从代码实现的目录就可以发现, eventfd 是作为一种文件来实现的, 代码很简单, 不到 500 行, 非常容易理解. 通过 eventfd 也可以窥探一下内核驱动的逻辑.
struct eventfd_ctx
struct eventfd_ctx 为 eventfd 在内核空间维护的结构, 简单轻量.
- struct eventfd_ctx {
- struct kref kref; // 结构的引用计数, 为 0 时回收内存空间
- wait_queue_head_t wqh; // 等待队列头
- /*
- * Every time that a write(2) is performed on an eventfd, the
- * value of the __u64 being written is added to "count" and a
- * wakeup is performed on "wqh". A read(2) will return the "count"
- * value to userspace, and will reset "count" to zero. The kernel
- * side eventfd_signal() also, adds to the "count" counter and
- * issue a wakeup.
- */
- __u64 count; // 和文件操作紧密相关的计数器
- unsigned int flags; // 一些标志位
- };
- eventfd(2)
系统调用用于创建一个新的文件描述符, 初始化内核空间的计数器, 还需要初始化等待队列头, 后面的读写文件操作都会将自己投入到等待队列中.
- static int do_eventfd(unsigned int count, int flags)
- {
- struct eventfd_ctx *ctx;
- int fd;
- /* Check the EFD_* constants for consistency. */
- BUILD_BUG_ON(EFD_CLOEXEC != O_CLOEXEC);
- BUILD_BUG_ON(EFD_NONBLOCK != O_NONBLOCK);
- // flags 只能在 EFD_CLOEXEC EFD_NONBLOCK EFD_SEMAPHORE 中产生
- if (flags & ~EFD_FLAGS_SET)
- return -EINVAL;
- ctx = kmalloc(sizeof(*ctx), GFP_KERNEL);
- if (!ctx)
- return -ENOMEM;
- kref_init(&ctx->kref); // 初始化内存引用为 1
- init_waitqueue_head(&ctx->wqh); // 初始化等待队列头
- ctx->count = count; // 初始化计数器的为 count
- ctx->flags = flags; // 设置 flags
- // 创建一个新的文件描述符, 并且设置 eventfd 的文件操作
- fd = anon_inode_getfd("[eventfd]", &eventfd_fops, ctx,
- O_RDWR | (flags & EFD_SHARED_FCNTL_FLAGS));
- if (fd <0)
- eventfd_free_ctx(ctx);
- return fd;
- }
eventfd_fops 为 eventfd 的文件操作结构, 最后注册在文件的 f_op 结构中.
- static const struct file_operations eventfd_fops = {
- .release = eventfd_release, // 文件的关闭操作
- .poll = eventfd_poll, // 文件的 poll 操作
- .read = eventfd_read, // 读
- .write = eventfd_write, // 写
- .llseek = noop_llseek,
- };
- eventfd_read(2), read(2), eventfd_write(2), write(2)
- static ssize_t eventfd_read(struct file *file, char __user *buf, size_t count,
- loff_t *ppos)
- {
- struct eventfd_ctx *ctx = file->private_data; // 将 eventfd 结构从文件的私有数据中取出来
- ssize_t res;
- __u64 ucnt = 0;
- DECLARE_WAITQUEUE(wait, current); // 声明一个等待队列项
- if (count <sizeof(ucnt)) // 读取的内存内存必须可以容下一个 sizeof(u64)
- return -EINVAL;
- spin_lock_irq(&ctx->wqh.lock);
- res = -EAGAIN; // 初始设置 EAGAIN, 对应非阻塞模式且不符合可读条件
- if (ctx->count> 0) // 计数器的值大于 0, 意味着可以进行 read 操作, 返回值取 8
- res = sizeof(ucnt);
- else if (!(file->f_flags & O_NONBLOCK)) { // count = 0 并且为设置非阻塞的方式
- __add_wait_queue(&ctx->wqh, &wait); // 将等待项添加到等待队列中
- for (;;) {
- set_current_state(TASK_INTERRUPTIBLE); // 设置任务的运行状态为可中断
- if (ctx->count> 0) { // 计数器值大于 0, 退出循环
- res = sizeof(ucnt);
- break;
- }
- if (signal_pending(current)) { // 当前任务有信号产生, 退出循环, 转而处理信号中断
- res = -ERESTARTSYS;
- break;
- }
- spin_unlock_irq(&ctx->wqh.lock);
- schedule(); // 调度
- spin_lock_irq(&ctx->wqh.lock);
- }
- __remove_wait_queue(&ctx->wqh, &wait); // 退出循环, 删除等待队列中的等待项
- __set_current_state(TASK_RUNNING); // 设置任务的运行状态为 运行
- }
- if (likely(res> 0)) {
- eventfd_ctx_do_read(ctx, &ucnt); // 根据 eventfd 的 flags 来选择读取的数量
- if (waitqueue_active(&ctx->wqh))
- wake_up_locked_poll(&ctx->wqh, EPOLLOUT); // 唤醒当前的线程, 记住这个函数, 后面会配合 select 分析一下, 就可以把整个逻辑走通了.
- }
- spin_unlock_irq(&ctx->wqh.lock);
- if (res> 0 && put_user(ucnt, (__u64 __user *)buf)) // 将 count 减小的数量复制到用户空间
- return -EFAULT;
- return res;
- }
- static void eventfd_ctx_do_read(struct eventfd_ctx *ctx, __u64 *cnt)
- {
- *cnt = (ctx->flags & EFD_SEMAPHORE) ? 1 : ctx->count; // 设置了 EFD_SEMAPHORE, 读取的大小为 1
- ctx->count -= *cnt;
- }
- static ssize_t eventfd_write(struct file *file, const char __user *buf, size_t count,
- loff_t *ppos)
- {
- struct eventfd_ctx *ctx = file->private_data;
- ssize_t res;
- __u64 ucnt;
- DECLARE_WAITQUEUE(wait, current);
- if (count <sizeof(ucnt))
- return -EINVAL;
- if (copy_from_user(&ucnt, buf, sizeof(ucnt))) // 从用户空间复制 8 个字节进内核空间
- return -EFAULT;
- if (ucnt == ULLONG_MAX) // count 最大值为 ULLONG_MAX
- return -EINVAL;
- spin_lock_irq(&ctx->wqh.lock);
- res = -EAGAIN; // 初始设置 EAGAIN, 对应非阻塞模式且不符合可写入条件
- if (ULLONG_MAX - ctx->count> ucnt) // 是否可以写入
- res = sizeof(ucnt);
- else if (!(file->f_flags & O_NONBLOCK)) { // 不能写入且未设置非阻塞模式
- __add_wait_queue(&ctx->wqh, &wait); // 将等待项添加至等待队列中
- for (res = 0;;) { // 清除设置的 EAGAIN
- set_current_state(TASK_INTERRUPTIBLE); // 设置当前任务的运行状态为可中断
- if (ULLONG_MAX - ctx->count> ucnt) { // 可写入, 设置返回值, 退出循环
- res = sizeof(ucnt);
- break;
- }
- if (signal_pending(current)) { // 当前任务有信号产生
- res = -ERESTARTSYS;
- break;
- }
- spin_unlock_irq(&ctx->wqh.lock);
- schedule(); // 投入到调度队列中
- spin_lock_irq(&ctx->wqh.lock);
- }
- __remove_wait_queue(&ctx->wqh, &wait); // 删除等待队列中的等待项
- __set_current_state(TASK_RUNNING); // 设置任务正在运行
- }
- if (likely(res> 0)) {
- ctx->count += ucnt; // 计数器的值增加
- if (waitqueue_active(&ctx->wqh))
- wake_up_locked_poll(&ctx->wqh, EPOLLIN); // 唤醒线程
- }
- spin_unlock_irq(&ctx->wqh.lock);
- return res;
- }
除去对入参 cnt 的判断外, 在对阻塞模式处理的循环前对 res 的处理也不同, write(2) 是将原来的 res = -EAGAIN 赋值为 0, 而 read(2) 未做修改.
但是实际上两者的效果是一样的, 进入阻塞模式后, res 一定会取到一个值再返回.
read(2)/write(2) 每一次阻塞时都会将自己投入至内部结构的等待队列中 __add_wait_queue(), 在 count 可用后, 进行唤醒操作: 通过遍历当前等待队列, 唤醒线程
- poll
- static __poll_t eventfd_poll(struct file *file, poll_table *wait)
- {
- struct eventfd_ctx *ctx = file->private_data;
- __poll_t events = 0;
- u64 count;
- poll_wait(file, &ctx->wqh, wait); // 结合 select 一起看这个函数
- // 一些关于临界区资源访问的注释
- count = READ_ONCE(ctx->count);
- if (count> 0) // 数量大于 0 可读
- events |= EPOLLIN;
- if (count == ULLONG_MAX) // 数量达到上限, 错误
- events |= EPOLLERR;
- if (ULLONG_MAX - 1> count) // 可写
- events |= EPOLLOUT;
- return events;
- }
poll 的实现非常简单, 根据 count 的数量进行返回.
文件的 f_op->poll() 在 eventfd 中对应 eventfd_poll(), 在 select(2)/poll(2) 中看到两者都会循环调用 f_op->poll(), 以下使用 select(2) 的实现为参考.
在 select(2)调用时, 函数 do_select() -> poll_initwait() 设置 pt->_qproc 为 __pollwait(),select(2) 循环执行每个文件描述符对应的 poll 方法, 在 eventfd 中也就是调用 eventfd_poll().
eventfd_poll() 调用 poll_wait() -> 调用 pt->_qproc() 也就是 __pollwait(), 在 __pollwait() 中设置队列项的回调函数为 pollwake() 并将其投入至文件的等待队列中, 返回就绪的事件掩码.
发生了 read(2)/write(2) 操作, 在函数返回前, 调用
wake_up_locked_poll()
, 遍历文件的等待队列, 执行队列项的回调函数 (这里对应 select(2) 中的 pollwake()), 然后唤醒线程.
小结
eventfd 是一个非常轻量的事件通知方式, 通过它的简单运行机制, 也可以大概了解一般文件的处理方式. 结合 select(2)/poll(2)/epoll(2) 可以把多路复用这一块的整个知识点串联起来.
对 epoll(2) 分析之前把 eventfd 和 poll 先看一遍也是好处多多, 毕竟 epoll(2) 也是文件和事件通知的结合.
参考
select 源码分析, 上一篇写的关于 select 的分析, 有一些关于 poll 结构和文件回调的分析.
来源: https://www.cnblogs.com/shuqin/p/11700682.html