这部分的内容主要包括 Epoll/select 的封装,在封装好相应函数后,再使用一个类来管理相应事件,实现的文件为 pollmgr.{h, cc}。
事件函数封装
可看到 pollmgr.h 文件下定一个了一个虚基类 aio_mgr
View Code
- 1 class aio_mgr {
- 2 public: 3 virtual void watch_fd(int fd, poll_flag flag) = 0;
- 4 virtual bool unwatch_fd(int fd, poll_flag flag) = 0;
- 5 virtual bool is_watched(int fd, poll_flag flag) = 0;
- 6 virtual void wait_ready(std: :vector < int > *readable, std: :vector < int > *writable) = 0;
- 7 virtual~aio_mgr() {}
- 8
- };
这便是具体事件类实现的基类,可看到文件末尾处的继承关系
View Code
- 1 class SelectAIO: public aio_mgr {
- 2 public: 3 4 SelectAIO();
- 5~SelectAIO();
- 6 void watch_fd(int fd, poll_flag flag);
- 7 bool unwatch_fd(int fd, poll_flag flag);
- 8 bool is_watched(int fd, poll_flag flag);
- 9 void wait_ready(std: :vector < int > *readable, std: :vector < int > *writable);
- 10 11 private: 12 13 fd_set rfds_;
- 14 fd_set wfds_;
- 15 int highfds_;
- 16 int pipefd_[2];
- 17 18 pthread_mutex_t m_;
- 19 20
- };
- 21 22#ifdef __linux__ 23 class EPollAIO: public aio_mgr {
- 24 public: 25 EPollAIO();
- 26~EPollAIO();
- 27 void watch_fd(int fd, poll_flag flag);
- 28 bool unwatch_fd(int fd, poll_flag flag);
- 29 bool is_watched(int fd, poll_flag flag);
- 30 void wait_ready(std: :vector < int > *readable, std: :vector < int > *writable);
- 31 32 private: 33 int pollfd_;
- 34 struct epoll_event ready_[MAX_POLL_FDS];
- 35 int fdstatus_[MAX_POLL_FDS];
- 36 37
- };
- 38#endif
- /* __linux */
相应是使用 select 和 epoll 分别实现的事件管理类,其中最主要的方法是 wait_ready,这个方法实现了具体的事件查询,其余几个函数用于管理套接字,如增加套接字,删除套接字以及判断套接字是否还存活着。这里我们主要看下 epoll 实现部分,select 实现部分类似。
View Code
- 1 EPollAIO: :EPollAIO() 2 {
- 3 pollfd_ = epoll_create(MAX_POLL_FDS);
- 4 VERIFY(pollfd_ >= 0);
- 5 bzero(fdstatus_, sizeof(int) * MAX_POLL_FDS);
- 6
- }
- 7 8 EPollAIO: :~EPollAIO() 9 {
- 10 close(pollfd_);
- 11
- }
- 12 13 //状态转换
- 14 static inline 15 int poll_flag_to_event(poll_flag flag) 16 {
- 17 int f;
- 18
- if (flag == CB_RDONLY) {
- 19 f = EPOLLIN;
- 20
- } else if (flag == CB_WRONLY) {
- 21 f = EPOLLOUT;
- 22
- } else { //flag == CB_RDWR
- 23 f = EPOLLIN | EPOLLOUT;
- 24
- }
- 25
- return f;
- 26
- }
- 27
- /*
- 28 * 这个函数就相当于:准备下一个监听事件的类型
- 29 */
- 30 void 31 EPollAIO: :watch_fd(int fd, poll_flag flag) 32 {
- 33 VERIFY(fd < MAX_POLL_FDS);
- 34 35 struct epoll_event ev;
- 36 int op = fdstatus_[fd] ? EPOLL_CTL_MOD: EPOLL_CTL_ADD;
- 37 fdstatus_[fd] |= (int) flag;
- 38 39 //边缘触发模式
- 40 ev.events = EPOLLET;
- 41 ev.data.fd = fd;
- 42 //注册读事件
- 43
- if (fdstatus_[fd] & CB_RDONLY) {
- 44 ev.events |= EPOLLIN;
- 45
- } //注册写事件
- 46
- if (fdstatus_[fd] & CB_WRONLY) {
- 47 ev.events |= EPOLLOUT;
- 48
- }
- 49 50
- if (flag == CB_RDWR) {
- 51 VERIFY(ev.events == (uint32_t)(EPOLLET | EPOLLIN | EPOLLOUT));
- 52
- }
- 53 //更改
- 54 VERIFY(epoll_ctl(pollfd_, op, fd, &ev) == 0);
- 55
- }
- 56 57 bool 58 EPollAIO: :unwatch_fd(int fd, poll_flag flag) 59 {
- 60 VERIFY(fd < MAX_POLL_FDS);
- 61 fdstatus_[fd] &= ~ (int) flag;
- 62 63 struct epoll_event ev;
- 64 int op = fdstatus_[fd] ? EPOLL_CTL_MOD: EPOLL_CTL_DEL;
- 65 66 ev.events = EPOLLET;
- 67 ev.data.fd = fd;
- 68 69
- if (fdstatus_[fd] & CB_RDONLY) {
- 70 ev.events |= EPOLLIN;
- 71
- }
- 72
- if (fdstatus_[fd] & CB_WRONLY) {
- 73 ev.events |= EPOLLOUT;
- 74
- }
- 75 76
- if (flag == CB_RDWR) {
- 77 VERIFY(op == EPOLL_CTL_DEL);
- 78
- }
- 79 VERIFY(epoll_ctl(pollfd_, op, fd, &ev) == 0);
- 80
- return (op == EPOLL_CTL_DEL);
- 81
- }
- 82 83 bool 84 EPollAIO: :is_watched(int fd, poll_flag flag) 85 {
- 86 VERIFY(fd < MAX_POLL_FDS);
- 87
- return ((fdstatus_[fd] & CB_MASK) == flag);
- 88
- }
- 89
- /**
- 90 * 事件循环,查看有哪些事件已经准备好,准备好的事件则插入相应列表中
- 91 */
- 92 void 93 EPollAIO: :wait_ready(std: :vector < int > *readable, std: :vector < int > *writable) 94 {
- 95 //得到已准备好的事件数目
- 96 int nfds = epoll_wait(pollfd_, ready_, MAX_POLL_FDS, -1);
- 97 //遍历套接字数组,将可读/可写套接字添加到readable/writable数组中,便于后面处理
- 98
- for (int i = 0; i < nfds; i++) {
- 99
- if (ready_[i].events & EPOLLIN) {
- 100 readable - >push_back(ready_[i].data.fd);
- 101
- }
- 102
- if (ready_[i].events & EPOLLOUT) {
- 103 writable - >push_back(ready_[i].data.fd);
- 104
- }
- 105
- }
- 106
- }
事件管理
在 pollmgr.h 中还有个重要的类
View Code
- class aio_callback {
- public: virtual void read_cb(int fd) = 0;
- virtual void write_cb(int fd) = 0;
- virtual~aio_callback() {}
- };
这是一个回调虚基类,里面两个函数可从函数名猜到功能,即从对应的套接字读取 / 写入数据。该基类在后面底层通信中扮演着重要的角色。
然后我们再看后面的 PollMgr 类,这便是事件管理类,同时它还使用了单例模式。
View Code
- 1 class PollMgr {
- 2 public: 3 PollMgr();
- 4~PollMgr();
- 5 6 static PollMgr * Instance();
- 7 static PollMgr * CreateInst();
- 8 //在对应的套接字上添加事件
- 9 void add_callback(int fd, poll_flag flag, aio_callback * ch);
- 10 //删除套接字上的所有事件
- 11 void del_callback(int fd, poll_flag flag);
- 12 bool has_callback(int fd, poll_flag flag, aio_callback * ch);
- 13 //阻塞删除套接字,为何阻塞呢?因为删除时,其它线程正在使用该套接字
- 14 void block_remove_fd(int fd);
- 15 //主要事件循环方法
- 16 void wait_loop();
- 17 18 static PollMgr * instance;
- 19 static int useful;
- 20 static int useless;
- 21 22 private: 23 pthread_mutex_t m_;
- 24 pthread_cond_t changedone_c_;
- 25 pthread_t th_;
- 26 27 aio_callback * callbacks_[MAX_POLL_FDS]; //事件数组,即数组下标为相应的套接字
- 28 aio_mgr * aio_; //具体的事件函数类,可实现为epoll/select
- 29 bool pending_change_;
- 30
- };
其中最主要的函数是 wait_loop
接下来我们看具体实现。
View Code
- 1 PollMgr * PollMgr: :instance = NULL;
- 2 static pthread_once_t pollmgr_is_initialized = PTHREAD_ONCE_INIT;
- 3 4 void 5 PollMgrInit() 6 {
- 7 PollMgr: :instance = new PollMgr();
- 8
- }
- 9 10 PollMgr * 11 PollMgr: :Instance() 12 {
- 13 //保证PollMgrInit在本线程内只初始化一次
- 14 pthread_once( & pollmgr_is_initialized, PollMgrInit);
- 15
- return instance;
- 16
- }
这里实现单例,pthread_once 保证了线程中只初始化一次 PollMgrInit() 函数,所以在具体使用时,只需调用 PollMgr::Instance() 即可获得该管理类,再在其上处理各种各种事件。这里有个小疑问是:instance 变量不应该是私有变量吗?
接下来我们看构造析构函数:
View Code
- PollMgr: :PollMgr() : pending_change_(false) {
- bzero(callbacks_, MAX_POLL_FDS * sizeof(void * ));
- //aio_ = new SelectAIO();
- aio_ = new EPollAIO();
- VERIFY(pthread_mutex_init( & m_, NULL) == 0);
- VERIFY(pthread_cond_init( & changedone_c_, NULL) == 0);
- //this表示本类,wait_loop是本类中的一个方法,false表示不分离(detach)
- VERIFY((th_ = method_thread(this, false, &PollMgr: :wait_loop)) != 0);
- }
- PollMgr: :~PollMgr() {
- //never kill me!!!
- VERIFY(0);
- }
构造函数中初始化了事件类,使用了 EpollAIO 类,初始化了互斥量和条件变量,然后创建了一个线程调用 wait_loop。有意思的是析构函数 (never kill me)
接下来是几个管理函数,管理套接字和回调的函数
View Code
- 1 void 2 PollMgr: :add_callback(int fd, poll_flag flag, aio_callback * ch) 3 {
- 4 VERIFY(fd < MAX_POLL_FDS);
- 5 6 ScopedLock ml( & m_);
- 7 aio_ - >watch_fd(fd, flag);
- 8 9 VERIFY(!callbacks_[fd] || callbacks_[fd] == ch);
- 10 callbacks_[fd] = ch;
- 11
- }
- 12 13 //remove all callbacks related to fd
- 14 //the return guarantees that callbacks related to fd
- 15 //will never be called again
- 16 void 17 PollMgr: :block_remove_fd(int fd) 18 {
- 19 ScopedLock ml( & m_);
- 20 aio_ - >unwatch_fd(fd, CB_RDWR);
- 21 pending_change_ = true;
- 22 VERIFY(pthread_cond_wait( & changedone_c_, &m_) == 0);
- 23 callbacks_[fd] = NULL;
- 24
- }
- 25 26 //删除相应的回调函数
- 27 void 28 PollMgr: :del_callback(int fd, poll_flag flag) 29 {
- 30 ScopedLock ml( & m_);
- 31
- if (aio_ - >unwatch_fd(fd, flag)) {
- 32 callbacks_[fd] = NULL;
- 33
- }
- 34
- }
- 35 36 //
- 37 bool 38 PollMgr: :has_callback(int fd, poll_flag flag, aio_callback * c) 39 {
- 40 ScopedLock ml( & m_);
- 41
- if (!callbacks_[fd] || callbacks_[fd] != c) 42
- return false;
- 43 44
- return aio_ - >is_watched(fd, flag);
- 45
- }
下面便是循环的主方法,该方法一直循环获取相应的事件,但此方法有个问题是,当某个回调读取需要长时间阻塞时,
会耽误后续事件的读取或写入。
View Code
- //循环的主方法
- void PollMgr: :wait_loop() {
- std: :vector < int > readable; //可读套接字的vector
- std: :vector < int > writable; //可写套接字的vector
- //
- while (1) {
- {
- ScopedLock ml( & m_);
- if (pending_change_) {
- pending_change_ = false;
- VERIFY(pthread_cond_broadcast( & changedone_c_) == 0);
- }
- }
- //首先清空两个vector
- readable.clear();
- writable.clear();
- //这里便监听了事件,读或写事件,有时间发生便将事件的fd插入相应的vector
- aio_ - >wait_ready( & readable, &writable);
- //如果这次没有可读和可写事件,则继续下一次循环
- if (!readable.size() && !writable.size()) {
- continue;
- }
- //no locking of m_
- //because no add_callback() and del_callback should
- //modify callbacks_[fd] while the fd is not dead
- for (unsigned int i = 0; i < readable.size(); i++) {
- int fd = readable[i];
- if (callbacks_[fd]) //相应的回调函数读取套接字上的数据
- callbacks_[fd] - >read_cb(fd);
- }
- for (unsigned int i = 0; i < writable.size(); i++) {
- int fd = writable[i];
- if (callbacks_[fd]) callbacks_[fd] - >write_cb(fd);
- }
- }
- }
具体使用时,只需获得单例类即可,然后再添加相应的套接字及回调函数,添加都是线程安全的,因为在相应的实现上都会阻塞在内部互斥变量 m_上
来源: http://www.cnblogs.com/fwensen/p/5778134.html