Linux 中一切皆文件,不论是我们存储在磁盘上的字符文件,可执行文件还是我们的接入电脑的 I/O 设备等都被 VFS 抽象成了文件,比如标准输入设备默认是键盘,我们在操作标准输入设备的时候,其实操作的是默认打开的一个文件描述符是 0 的文件,而一切软件操作硬件都需要通过 OS,而 OS 操作一切硬件都需要相应的驱动程序,这个驱动程序里配置了这个硬件的相应配置和使用方法.Linux 的 I/O 分为阻塞 I/O, 非阻塞 I/O,I/O 多路复用, 信号驱动 I/O 四种.对于 I/O 设备的驱动,一般都会提供关于阻塞和非阻塞两种配置.我们最常见的 I/O 设备之一 -- 键盘 (标准输入设备) 的驱动程序默认是阻塞的.
多路复用就是为了使进程能够从多个阻塞 I/O 中获得自己想要的数据并继续执行接下来的任务.其主要的思路就是同时监视多个文件描述符,如果有文件描述符的设定状态的被触发,就继续执行进程,如果没有任何一个文件描述符的设定状态被触发,进程进入 sleep
多路复用的一个主要用途就是实现 "I/O 多路复用并发服务器",和多线程并发或者多进程并发相比,这种服务器的系统开销更低,更适合做 web 服务器,但是由于其并没有实现真正的多任务,所以当压力大的时候,部分用户的请求响应会较慢
阻塞 I/O
阻塞 I/O,就是当进程试图访问这个 I/O 设备而这个设备并没有准备好的时候,设备的驱动程序会通过内核让这个试图访问的进程进入 sleep 状态.阻塞 I/O 的一个好处就是可以大大的节约 CPU 时间,因为一旦一个进程试图访问一个没有准备好的阻塞 I/O,就会进入 sleep 状态,而进入 sleep 状态的进程是不在内核的进程调度链表中,直到目标 I/O 准备好了将其唤醒并加入调度链表,这样就可以节约 CPU 时间.当然阻塞 I/O 也有其固有的缺点,如果进程试图访问一个阻塞 I/O,但是否访问成功并不对接下来的任务有决定性影响,那么直接使其进入 sleep 状态显然会延误其任务的完成.
典型的默认阻塞 IO 有标准输入设备,socket 设备,管道设备等,当我们使用 gets(),scanf(),read() 等操作请求这些 IO 时而 IO 并没有数据流入,就会造成进程的 sleep. 进程会一直阻塞下去直到接收缓冲区中有数据可读,此时内核再去唤醒该进程,通过相应的函数从中获取数据.如果阻塞过程中对方发生故障,那么这个进程将会永远阻塞下去.
写操作时发生阻塞的情况要比读操作少,主要发生在要写入的缓冲区的大小小于要写入的数据量的情况下,这时写操作将不进行任何任何拷贝工作,将发生阻塞.一旦发送缓冲区内有足够的空间,内核将唤醒进程,将数据从用户缓冲区中拷贝到相应的发送数据缓冲区.udp 不用等待确认,没有实际的发送缓冲区,所以 udp 协议中不存在发送缓冲区满的情况,在 udp 套接字上执行的写操作永远都不会阻塞
现假设一个进程希望通过三个管道中任意一个中读取数据并显示,伪代码如下
由于管道是阻塞 I/O,所以如果 pipe_0 没有数据流入,进程就是在第一个 read() 处进入 sleep 状态而即使 pipe_1 和 pipe_2 有数据流入也不会被读取.
read(pipe_0, buf, sizeof(buf)); //sleep
print buf;
read(pipe_1, buf, sizeof(buf));
print buf;
read(pipe_2, buf, sizeof(buf));
print buf;
如果我们使用下述代码重新设置管道的阻塞属性,显然,如果三个管道都没有数据流入,那么进程就无法获得请求的数据而继续执行,倘若这些数据很重要 (所以我们才要用阻塞 I/O),那结果就会十分的糟糕,改为轮询却又大量的占据 CPU 时间.
如何让进程同时监视三个管道,其中一个有数据就继续执行而不会 sleep,如果全部没有数据流入再 sleep,就是多路复用技术需要解决的问题.
int fl = fcntl(pipe_fd, F_GETFL);
fcntl(pipe_fd, F_SETFL, fl | O_NONBLOCK);
非阻塞 I/O
非阻塞 I/O 就是当一个进程试图访问一个 I/O 设备的时候,无论是否从中获取了请求的数据都会返回并继续执行接下来的任务.,但非常适合请求是否成功对接下来的任务影响不大的 I/O 请求.但如果访问一个非阻塞 I/O,但这个请求如果失败对进程接下来的任务有致命影响,最粗暴的就是使用
while (1){read()}
轮询.显然,这种方式会占用大量的 CPU 时间.对于非阻塞 IO,除了直接返回,一个更重要的应用就是利用 IO 多路复用机制同时监视多个非阻塞 IO.
select 机制
select 是一种非常 "古老" 的同步 I/O 接口,但是提供了一种很好的 I/O 多路复用的思路
模型
select 的第一个参数 nfds 是指
fd_set //创建fd_set对象,将来从中增减需要监视的fd
FD_ZERO() //清空fd_set对象
FD_SET() //将一个fd加入fd_set对象中
select() //监视fd_set对象中的文件描述符
pselect() //先设定信号屏蔽,再监视
FD_ISSET() //测试fd是否属于fd_set对象
FD_CLR() //从fd_set对象中删除fd
Note:
集合中的最大的文件描述符 + 1
,因为 select 会无差别遍历整个文件描述符表直到找到目标,而文件描述符是从 0 开始的,所以一共是
集合中的最大的文件描述符 + 1
次.
上一条导致了这种机制的低效,如果需要监视的文件描述符是 0 和 100 那么每一次都会遍历 101 次
select() 每次返回都会修改 fd_set,如果要循环 select(),需要先对初始的 fd_set 进行备
例子_I/O 多路复用并发服务器
关于 server 本身的编程模型,参见 tcp/ip 协议服务器模型 和 udp/ip 协议服务器模型 这里仅是使用 select 实现伪并行的部分模型
nready = select(maxfd + 1, &temprfds, &tempwfds, NULL, NULL) if (FD_ISSET(listenfd, &temprfds)){
#define BUFSIZE 100#define MAXNFD 1024 int main() {
/***********服务器的listenfd已经准本好了**************/
fd_set readfds;
fd_set writefds;
FD_ZERO( & readfds);
FD_ZERO( & writefds);
FD_SET(listenfd, &readfds);
fd_set temprfds = readfds;
fd_set tempwfds = writefds;
int maxfd = listenfd;
int nready;
char buf[MAXNFD][BUFSIZE] = {
0
};
while (1) {
temprfds = readfds;
tempwfds = writefds;
for (; fd <= maxfd; fd++){
//如果监听到的是listenfd就进行accept
int sockfd = accept(listenfd, (struct sockaddr * ) & clientaddr, &len);
//将新accept的scokfd加入监听集合,并保持maxfd为最大fd
FD_SET(sockfd, &readfds);
maxfd = maxfd > sockfd ? maxfd: sockfd;
//如果意见检查了nready个fd,就没有必要再等了,直接下一个循环
if (--nready == 0) continue;
}
int fd = 0;
//遍历文件描述符表,处理接收到的消息
if (0 == ret){ //客户端链接已经断开
if (fd == listenfd) continue;
if (FD_ISSET(fd, &temprfds)) {
int ret = read(fd, buf[fd], sizeof buf[0]);
poll 机制
close(fd);
FD_CLR(fd, &readfds);
if (maxfd == fd)--maxfd;
continue;
}
//将fd加入监听可写的集合
FD_SET(fd, &writefds);
}
//找到了接收消息的socket的fd,接下来将其加入到监视写的fd_set中
//将在下一次while()循环开始监视
if (FD_ISSET(fd, &tempwfds)) {
int ret = write(fd, buf[fd], sizeof buf[0]);
printf("ret %d: %d\n", fd, ret);
FD_CLR(fd, &writefds);
}
}
}
close(listenfd);
}
poll 是一种基于 select 的改良机制,其针对 select 的一些缺陷进行了重新设计,包括不需要备份 fd_set 等等,但是依然是遍历整个文件描述符表,效率较低
模型
例子_I/O 多路复用并发服务器
struct pollfd fds //创建一个pollfd类型的数组
fds[0].fd //向fds[0]中放入需要监视的fd
fds[0].events //向fds[0]中放入需要监视的fd的触发事件
POLLIN //I/O有输入
POLLPRI //有紧急数据需要读取
POLLOUT //I/O可写
POLLRDHUP //流式套接字连接断开或套接字处于半关闭状态
POLLERR //错误条件(仅针对输出)
POLLHUP //挂起(仅针对输出)
POLLNVAL //无效的请求:fd没有被打开(仅针对输出)
if (0 == ret){ //如果连接断开了
/* ... */
int main() {
/* ... */
struct pollfd myfds[MAXNFD] = {
0
};
myfds[0].fd = listenfd;
myfds[0].events = POLLIN;
int maxnum = 1;
int nready;
//准备二维数组buf,每个fd使用buf的一行,数据干扰
char buf[MAXNFD][BUFSIZE] = {
0
};
while (1) {
//poll直接返回event被触发的fd的个数
nready = poll(myfds, maxnum, -1) int i = 0;
for (; i < maxnum; i++) {
//poll通过将相应的二进制位置一来表示已经设置
//如果下面的条件成立,表示revent[i]里的POLLIN位已经是1了
if (myfds[i].revents & POLLIN) {
if (myfds[i].fd == listenfd) {
int sockfd = accept(listenfd, (struct sockaddr * ) & clientaddr, &len);
//将新accept的scokfd加入监听集合
myfds[maxnum].fd = sockfd;
myfds[maxnum].events = POLLIN;
maxnum++;
//如果意见检查了nready个fd,就直接下一个循环
if (--nready == 0) continue;
} else {
int ret = read(myfds[i].fd, buf[myfds[i].fd], sizeof buf[0]);
epoll 在 poll 基础上实现的更为健壮的接口,它每次只会遍历我们关心的文件描述符,也是现在主流的 web 服务器使用的多路复用技术,epoll 一大特色就是支持
close(myfds[i].fd);
//初始化将文件描述符表所有的文件描述符标记为-1
//close的文件描述符也标记为-1
//打开新的描述符时从表中搜索第一个-1
//open()就是这样实现始终使用最小的fd
//这里为了演示并没有使用这种机制
myfds[i].fd = -1;
continue;
}
myfds[i].events = POLLOUT;
}
} else if (myfds[i].revents & POLLOUT) {
int ret = write(myfds[i].fd, buf[myfds[i].fd], sizeof buf[0]);
myfds[i].events = POLLIN;
}
}
}
close(listenfd);
}
epoll
EPOLLET(边沿触发)
和
EPOLLLT(水平触发)
,前者表示如果读取之后缓冲区还有数据,那么只要读取结束,剩余的数据也会丢弃,而后者表示里面的数据不会丢弃,下次读的时候还在,默认是 EPOLLLT
模型
例子_I/O 多路复用并发服务器
epoll_create() //创建epoll对象
struct epoll_event //准备事件结构体和事件结构体数组
event.events
event.data.fd ...
epoll_ctl() //配置epoll对象
epoll_wait() //监控epoll对象中的fd及其相应的event
/* ... */
int main() {
/* ... */
/* 创建epoll对象 */
int epoll_fd = epoll_create(1024);
//准备一个事件结构体
struct epoll_event event = {
0
};
event.events = EPOLLIN;
event.data.fd = listenfd; //data是一个共用体,除了fd还可以返回其他数据
//ctl是监控listenfd是否有event被触发
//如果发生了就把event通过wait带出.
//所以,如果event里不标明fd,我们将来获取就不知道哪个fd
epoll_ctl(epoll_fd, EPOLL_CTL_ADD, listenfd, &event);
struct epoll_event revents[MAXNFD] = {
0
};
int nready;
char buf[MAXNFD][BUFSIZE] = {
0
};
while (1) {
//wait返回等待的event发生的数目
//并把相应的event放到event类型的数组中
nready = epoll_wait(epoll_fd, revents, MAXNFD, -1) int i = 0;
for (; i < nready; i++) {
//wait通过在events中设置相应的位来表示相应事件的发生
//如果输入可用,那么下面的这个结果应该为真
if (revents[i].events & EPOLLIN) {
//如果是listenfd有数据输入
if (revents[i].data.fd == listenfd) {
int sockfd = accept(listenfd, (struct sockaddr * ) & clientaddr, &len);
struct epoll_event event = {
0
};
event.events = EPOLLIN;
event.data.fd = sockfd;
epoll_ctl(epoll_fd, EPOLL_CTL_ADD, sockfd, &event);
} else {
int ret = read(revents[i].data.fd, buf[revents[i].data.fd], sizeof buf[0]);
if (0 == ret) {
close(revents[i].data.fd);
epoll_ctl(epoll_fd, EPOLL_CTL_DEL, revents[i].data.fd, &revents[i]);
}
revents[i].events = EPOLLOUT;
epoll_ctl(epoll_fd, EPOLL_CTL_MOD, revents[i].data.fd, &revents[i]);
}
} else if (revents[i].events & EPOLLOUT) {
int ret = write(revents[i].data.fd, buf[revents[i].data.fd], sizeof buf[0]);
revents[i].events = EPOLLIN;
epoll_ctl(epoll_fd, EPOLL_CTL_MOD, revents[i].data.fd, &revents[i]);
}
}
}
close(listenfd);
}
来源: http://click.aliyun.com/m/40093/