目录
muduo 网络库学习笔记(四) 通过 eventfd 实现的事件通知机制
eventfd 的使用
eventfd 系统函数
使用示例
EventLoop 对 eventfd 的封装
工作时序
- runInLoop()
- queueInLoop()
- wakeup()
- handleRead()
- doPendingFunctors()
总结
上篇文章为 EventLoop 添加了一个定时器 Fd, 为 EventLoop 增加了 3 个接口: runAfter(),runAt(),runEvery(), 这三个接口用于处理定时任务和周期任务. 底层通过封装 TimerFd 实现.
- TimerId runAt(const TimeStamp& time, const NetCallBacks::TimerCallBack& cb);
- TimerId runAfter(double delay, const NetCallBacks::TimerCallBack& cb);
- TimerId runEvery(double interval, const NetCallBacks::TimerCallBack& cb);
今天为 EventLoop 添加另一个 Fd:EventFd, 用于实现线程间的事件通知机制. 本文会先介绍 eventfd 的使用, 然后给出 muduo 中 EventLoop 对 eventfd 的封装.
eventfd 的使用
eventfd 系统函数
eventfd - 事件通知文件描述符
- #include <sys/eventfd.h>
- int eventfd(unsigned int initval ,int flags );
创建一个能被用户应用程序用于时间等待唤醒机制的 eventfd 对象.
initval :
eventfd()创建一个可用作事件的 "eventfd 对象" 用户空间应用程序和内核等待 / 通知机制通知用户空间应用程序的事件. 该对象包含一个由内核维护的无符号 64 位整型 (uint64_t) 计数器. 此计数器的初始值通过 initval 指定. 一般设 0.
flags :
以下标志中按位 OR 运算以更改 eventfd()的行为,(文件中常用的这两个 flags 肯定都懂意思吧, 就不翻译了, 第三个信号量的不管它.):
- EFD_CLOEXEC (since Linux 2.6.27)
- Set the close-on-exec (FD_CLOEXEC) flag on the new file
- descriptor. See the description of the O_CLOEXEC flag in
- open(2) for reasons why this may be useful.
- EFD_NONBLOCK (since Linux 2.6.27)
- Set the O_NONBLOCK file status flag on the new open file
- description. Using this flag saves extra calls to fcntl(2) to
- achieve the same result.
- EFD_SEMAPHORE (since Linux 2.6.30)
- Provide semaphore-like semantics for reads from the new file
- descriptor. See below.
- read(2)
成功读取返回一个 8byte 的整数. read(2)如果提供的缓冲区的大小小于 8 个字节返回错误 EINVAL
write (2)
将缓冲区写入的 8 字节整形值加到内核计数器上. 可以写入的最大值
是计数器中是最大的无符号 64 位值减 1(即 0xfffffffffffffffe).
返回值:
On success, eventfd() returns a new eventfd file descriptor. On error, -1 is returned and errno is set to indicate the error.
使用示例
- #include <iostream>
- #include <assert.h>
- #include <poll.h>
- #include <signal.h>
- #include <sys/eventfd.h>
- #include <unistd.h>
- #include <string.h>
- #include <thread>
- static int s_efd = 0;
- int createEventfd()
- {
- int evtfd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
- std::cout <<"createEventfd() fd :" << evtfd << std::endl;
- if (evtfd < 0)
- {
- std::cout << "Failed in eventfd\n";
- abort();
- }
- return evtfd;
- }
- void testThread()
- {
- int timeout = 0;
- while(timeout < 3) {
- sleep(1);
- timeout++;
- }
- uint64_t one = 1;
- ssize_t n = write(s_efd, &one, sizeof one);
- if(n != sizeof one)
- {
- std::cout << "writes" << n << "bytes instead of 8\n";
- }
- }
- int main()
- {
- s_efd = createEventfd();
- fd_set rdset;
- FD_ZERO(&rdset);
- FD_SET(s_efd, &rdset);
- struct timeval timeout;
- timeout.tv_sec = 1;
- timeout.tv_usec = 0;
- std::thread t(testThread);
- while(1)
- {
- if(select(s_efd + 1, &rdset, NULL, NULL, &timeout) == 0)
- {
- std::cout << "timeout\n";
- timeout.tv_sec = 1;
- timeout.tv_usec = 0;
- FD_SET(s_efd, &rdset);
- continue;
- }
- uint64_t one = 0;
- ssize_t n = read(s_efd, &one, sizeof one);
- if(n != sizeof one)
- {
- std::cout << "read" << n << "bytes instead of 8\n";
- }
- std::cout << "wakeup !\n";
- break;
- }
- t.join();
- close(s_efd);
- return 0;
- }
- ./test.out
- createEventfd() fd : 3
- timeout
- timeout
- timeout
- wakeup !
eventfd 单纯的使用文件描述符实现的线程间的通知机制, 可以很好的融入 select,poll,epoll 的 I/O 复用机制中.
EventLoop 对 eventfd 的封装
所增加的接口及成员:
- typedef std::function<void()> Functor;
- void runInLoop(const Functor& cb);
- void wakeup(); // 是写 m_wakeupFd 通知 poll 处理读事件.
- void queueInLoop(const Functor& cb);
- private:
- //used to waked up
- void handleRead();
- void doPendingFunctors();
- int m_wakeupFd;
- std::unique_ptr<Channel> p_wakeupChannel;
- mutable MutexLock m_mutex;
- bool m_callingPendingFunctors; /* atomic */
- std::vector<Functor> m_pendingFunctors; // @GuardedBy mutex_
工作时序
- (runInLoop() -> quueInLoop())/queueInLoop() -> wakeup() -> poll() -> handleRead() -> doPendingFunctors()
- runInLoop()
如果用户在当前 IO 线程调用这个函数, 回调会同步进行; 如果用户在其他线程调用 runInLoop(),cb 会被加入队列, IO 线程会被唤醒来调用这个 Functor.
- void EventLoop::runInLoop(const Functor& cb)
- {
- if(isInloopThread())
- cb();
- else
- queueInLoop(cb);
- }
- queueInLoop()
会将回调添加到容器, 同时通过 wakeup()唤醒 poll()调用容器内的回调.
- void EventLoop::queueInLoop(const Functor& cb)
- {
- LOG_TRACE <<"EventLoop::queueInLoop()";
- {
- MutexLockGuard lock(m_mutex);
- m_pendingFunctors.push_back(std::move(cb));
- }
- if(!isInloopThread())
- {
- wakeup();
- }
- }
内部实现,
wakeup()
写已注册到 poll 的 eventfd 通知 poll 处理读事件.
- // m_wakeupFd(createEventfd()),
- // p_wakeupChannel(new Channel(this, m_wakeupFd)),
- void EventLoop::wakeup()
- {
- uint64_t one = 1;
- ssize_t n = sockets::write(m_wakeupFd, &one, sizeof one);
- if(n != sizeof one)
- {
- LOG_ERROR << "EventLoop::wakeup() writes" << n << "bytes instead of 8";
- }
- }
- handleRead()
poll 回调读事件, 处理 eventfd.
- void EventLoop::handleRead() //handle wakeup Fd
- {
- LOG_TRACE << "EventLoop::handleRead() handle wakeup Fd";
- uint64_t one = 1;
- ssize_t n = sockets::read(m_wakeupFd, &one, sizeof one);
- if(n != sizeof one)
- {
- LOG_ERROR << "EventLoop::handleRead() reads" << n << "bytes instead of 8";
- }
- doPendingFunctors();
- }
- doPendingFunctors()
处理挂起的事件.
- void EventLoop::doPendingFunctors()
- {
- LOG_TRACE << "EventLoop::doPendingFunctors()";
- std::vector<Functor> functors;
- m_callingPendingFunctors = true;
- {
- MutexLockGuard lock(m_mutex);
- functors.swap(m_pendingFunctors);
- }
- for(size_t i = 0; i < functors.size(); ++i)
- {
- functors[i]();
- }
- m_callingPendingFunctors = false;
- }
总结
本文主要介绍了 muduo 中 EventLoop 通过 通过封装一层 eventfd 实现的 runInLoop()函数, 使得其他线程想往 EventLoop 所在的 I/O 线程注册任务成为可能.
下篇文章会写 Connector 和 Acceptor, 链接器和监听器 实现第一条链接.
来源: https://www.cnblogs.com/ailumiyana/p/9961125.html