这是学习网络编程后写的一个练手的小程序,可以帮助复习 socket,I/O 复用,非阻塞 I/O 等知识点。
通过回顾写的过程中遇到的问题的形式记录程序的关键点,最后给出完整程序代码。
0. 功能
编写一个简易群聊程序,程序具备的基本功能:
服务器:支持多个客户端连接,并将每个客户端发过来的消息发给所有其他的客户端
客户端:能够连接服务器,并向服务器发送消息,同时接收服务器发过来的任何消息
1. Server I/O 模型
采用事件驱动(I/O 复用)+ 非阻塞 I/O 的模型,即 Reactor 模式。I/O 复用采用 linux 下的 epoll 机制。
相关 API 介绍见最后,先梳理几个写程序的时候想到的问题。
1.1 I/O 复用为什么搭配非阻塞 I/O?(select/epoll 返回可读后还用非阻塞是不是没有意义?)
select/epoll 返回了可读,并不一定代表能读,在返回可读到调用 read 函数之间,是有时间间隙的。内核可能把数据丢失,也可能存在比如多个线程监听该 socket,
数据被别人读走的情况。所以这里使用非阻塞 I/O 是有意义的。
1.2 epoll 的条件触发 LT(水平触发)和边缘触发 ET 区别,如何正确地处理 ET 模式下的读操作?
简单讲,以读取数据操作举例。条件触发,只要输入缓冲中还有数据,就会以事件方式再次注册;
而边缘触发中仅在输入缓冲收到数据时注册一次该事件(你没读完也 epoll_wait 也不再返回了)。
所以如果使用边缘触发发生输入相关事件,需要读取输入缓冲中的全部数据。方法是一直读,直到 read 返回 - 1, 并且变量 errno 中的值为 EAGAIN,说明没有数据可读。
所以在这里再次考虑一下 1.1 中的问题,epoll 如果采用边缘触发,更要使用非阻塞 I/O,否则可能就因为无数据可读阻塞整个线程了。
1.3 select 与 epoll 的差别
一个老生常谈的问题,select 函数效率低主要有以下两个原因,首先是每次调用 select 函数时需要向操作系统传递监视对象信息,其次是调用后针对所有文件描述符的循环语句。
第一点对效率的影响更大。
此外,epoll 还支持 ET 模式,而 select 只支持 LT 模式。
但 select 也有优点,比如兼容性好(大多数操作系统支持),在服务端介入者少的情况下仍然可以考虑使用 select。
1.4 epoll 相关 API
- // 创建一个epoll句柄,参数size向操作系统建议epoll例程大小
- int epoll_create(int size)
- /*
- 函数功能: epoll事件注册函数
- 参数epfd为epoll的句柄,即epoll_create返回值
- 参数op表示动作,用3个宏来表示:
- EPOLL_CTL_ADD(注册新的fd到epfd),
- EPOLL_CTL_MOD(修改已经注册的fd的监听事件),
- EPOLL_CTL_DEL(从epfd删除一个fd);
- 其中参数fd为需要监听的标示符;
- 参数event告诉内核需要监听的事件,event的结构如下:
- struct epoll_event {
- __uint32_t events; //Epoll events
- epoll_data_t data; //User data variable
- };
- 其中介绍events是宏的集合,常用的有:
- EPOLLIN:有数据可读
- EPOLLONESHOT:发生一次事件后,相应的文件描述符不再收到事件通知。因此需要向第二个参数传递EPOLL_CTL_MOD再次设置事件。
- 例如在多线程处理时,如果某个线程在处理fd的同时,又有新的一批数据发来,该fd可读,那么该fd会被分给另一个线程,这样两个线程处理同一个fd肯定就不对了,
- 这时用EPOLLONESHOT可以解决。在fd返回可读后,需要显式地设置一下才能让epoll重新返回这个fd。
- */
- int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event)
- // 等待事件的产生,函数返回需要处理的事件数目
- int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout)
2. Client 怎么处理?
Client 采用分割读写的方式,开两个进程。父进程负责负责接受数据,子进程负责发送数据。
- if (pid == 0) {
- //子进程负责写操作
- write_routine(sock);
- }
- else {
- //父进程负责读操作
- read_routine(sock);
- }
3. 代码
代码中有详细注释
- 1 //utility.h
- 2 #ifndef UTILITY_H_
- 3 #define UTILITY_H_
- 4
- 5 #include
- 6 #include
- 7 #include
- 8 #include
- 9 #include in.h>
- 10 #include
- 11 #include
- 12 #include
- 13 #include
- 14 #include
- 15 #include
- 16 #include
- 17 #include <string.h>
- 18
- 19 using namespace std;
- 20
- 21 // clients_list save all the clients's socket
- 22 list<int> clients_list;
- 23
- 24 /********************** macro defintion **************************/
- 25 // server ip
- 26 #define SERVER_IP "127.0.0.1"
- 27
- 28 // server port
- 29 #define SERVER_PORT 8888
- 30
- 31 //epoll size
- 32 #define EPOLL_SIZE 5000
- 33
- 34 //message buffer size
- 35 #define BUF_SIZE 0xFFFF
- 36
- 37 #define SERVER_WELCOME "Welcome you to join the chatroom! Your chat ID is: Client #%d"
- 38
- 39 #define SERVER_MESSAGE "ClientID %d say >> %s"
- 40
- 41 // exit
- 42 #define EXIT "EXIT"
- 43
- 44 #define CAUTION "There is only one int the chatroom!"
- 45
- 46 /********************** some function **************************/
- 47 /**
- 48 *设置非阻塞IO
- 49 **/
- 50 int setnonblocking(int sockfd)
- 51 {
- 52 fcntl(sockfd, F_SETFL, fcntl(sockfd, F_GETFD, 0)| O_NONBLOCK);
- 53 return 0;
- 54 }
- 55
- 56 /**
- 57 * 将文件描述符fd添加到epollfd标示的内核事件表中, 并注册EPOLLIN事件,
- 58 * EPOOLET表明是ET工作方式,根据enable_et来判定是否设置边缘触发。
- 59 * 最后将文件描述符设置非阻塞方式
- 60 **/
- 61 void addfd( int epollfd, int fd, bool enable_et )
- 62 {
- 63 struct epoll_event ev;
- 64 ev.data.fd = fd;
- 65 ev.events = EPOLLIN;
- 66 if( enable_et )
- 67 ev.events = EPOLLIN | EPOLLET;
- 68 epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &ev);
- 69 setnonblocking(fd);
- 70 printf("fd added to epoll!\n\n");
- 71 }
- 72
- 73 /**
- 74 * 群发消息
- 75 **/
- 76 int sendBroadcastmessage(int clientfd)
- 77 {
- 78 // buf[BUF_SIZE] receive new chat message
- 79 // message[BUF_SIZE] save format message
- 80 char buf[BUF_SIZE], message[BUF_SIZE];
- 81 bzero(buf, BUF_SIZE);
- 82 bzero(message, BUF_SIZE);
- 83
- 84 // receive message
- 85 printf("read from client(clientID = %d)\n", clientfd);
- 86 int len = recv(clientfd, buf, BUF_SIZE, 0);
- 87
- 88 if(len == 0) // len = 0 means the client closed connection
- 89 {
- 90 close(clientfd);
- 91 clients_list.remove(clientfd); //server remove the client
- 92 printf("ClientID = %d closed.\n now there are %d client in the chatroom\n", clientfd, (int)clients_list.size());
- 93
- 94 }
- 95 else //broadcast message
- 96 {
- 97 if(clients_list.size() == 1) { // this means There is only one int the chatroom
- 98 send(clientfd, CAUTION, strlen(CAUTION), 0);
- 99 return len;
- 100 }
- 101 // format message to broadcast
- 102 sprintf(message, SERVER_MESSAGE, clientfd, buf);
- 103
- 104 list<int>::iterator it;
- 105 for(it = clients_list.begin(); it != clients_list.end(); ++it) {
- 106 if(*it != clientfd){
- 107 if( send(*it, message, BUF_SIZE, 0) < 0 ) { perror("error"); exit(-1);}
- 108 }
- 109 }
- 110 }
- 111 return len;
- 112 }
- 113 #endif // UTILITY_H_
- 1 //Server.cpp
- 2 3#include "utility.h"4 5 int main(int argc, char * argv[]) 6 {
- 7 //服务器端口号和IP地址
- 8 struct sockaddr_in serverAddr;
- 9 serverAddr.sin_family = PF_INET;
- 10 serverAddr.sin_port = htons(SERVER_PORT);
- 11 serverAddr.sin_addr.s_addr = inet_addr(SERVER_IP);
- 12 //创建监听套接字
- 13 int listener = socket(PF_INET, SOCK_STREAM, 0);
- 14
- if (listener < 0) {
- 15 perror("listener");
- exit( - 1);
- 16
- }
- 17 printf("listen socket created \n");
- 18 //绑定地址
- 19
- if (bind(listener, (struct sockaddr * ) & serverAddr, sizeof(serverAddr)) < 0) {
- 20 perror("bind error");
- 21 exit( - 1);
- 22
- }
- 23 //listen
- 24 int ret = listen(listener, 5);
- 25
- if (ret < 0) {
- 26 perror("listen error");
- 27 exit( - 1);
- 28
- }
- 29 printf("Start to listen: %s\n", SERVER_IP);
- 30 //创建epoll事件表
- 31 int epfd = epoll_create(EPOLL_SIZE);
- 32
- if (epfd < 0) {
- 33 perror("epfd error");
- 34 exit( - 1);
- 35
- }
- 36 printf("epoll created, epollfd = %d\n", epfd);
- 37 static struct epoll_event events[EPOLL_SIZE];
- 38 //注册监听套接字到epoll事件表
- 39 addfd(epfd, listener, true);
- 40 //main loop
- 41
- while (1) 42 {
- 43 //epoll_events_count指明待处理事件数
- 44 int epoll_events_count = epoll_wait(epfd, events, EPOLL_SIZE, -1);
- 45
- if (epoll_events_count < 0) {
- 46 perror("epoll failure");
- 47
- break;
- 48
- }
- 49 50 printf("epoll_events_count = %d\n", epoll_events_count);
- 51 //处理事件
- 52
- for (int i = 0; i < epoll_events_count; ++i) 53 {
- 54 int sockfd = events[i].data.fd;
- 55 //sockfd == listener表明有新连接
- 56
- if (sockfd == listener) 57 {
- 58 struct sockaddr_in client_address;
- 59 socklen_t client_addrLength = sizeof(struct sockaddr_in);
- 60 int clientfd = accept(listener, (struct sockaddr * ) & client_address, &client_addrLength);
- 61 62 printf("client connection from: %s : % d(IP : port), clientfd = %d \n", 63 inet_ntoa(client_address.sin_addr), ntohs(client_address.sin_port), clientfd);
- 64 65 //把新连接加入epoll事件表中
- 66 addfd(epfd, clientfd, true);
- 67 68 // 把clientfd加入客户连接的list内
- 69 clients_list.push_back(clientfd);
- 70 printf("Add new clientfd = %d to epoll\n", clientfd);
- 71 printf("Now there are %d clients int the chat room\n", (int) clients_list.size());
- 72 73 // 想新连接发送欢迎信息
- 74 printf("welcome message\n");
- 75 char message[BUF_SIZE];
- 76 bzero(message, BUF_SIZE);
- 77 sprintf(message, SERVER_WELCOME, clientfd);
- 78 int ret = send(clientfd, message, BUF_SIZE, 0);
- 79
- if (ret < 0) {
- 80 perror("send error");
- 81 exit( - 1);
- 82
- }
- 83
- }
- 84 //sockfd != listener表明之前的连接发来数据,将数据群发给所有连接对象
- 85
- else 86 {
- 87 printf("i got an message");
- 88 int ret = sendBroadcastmessage(sockfd);
- 89
- if (ret < 0) {
- perror("error");
- exit( - 1);
- }
- 90
- }
- 91
- }
- 92
- }
- 93 close(listener); //close socket
- 94 close(epfd); //close epoll instance
- 95
- return 0;
- 96
- }
- 1 //Client.cpp
- 2 3#include "utility.h"4 5 void write_routine(int sock);
- 6 void read_routine(int sock);
- 7 int main(int argc, char * argv[]) 8 {
- 9 //服务器IP和端口
- 10 struct sockaddr_in serverAddr;
- 11 serverAddr.sin_family = PF_INET;
- 12 serverAddr.sin_port = htons(SERVER_PORT);
- 13 serverAddr.sin_addr.s_addr = inet_addr(SERVER_IP);
- 14 15 // create socket
- 16 int sock = socket(PF_INET, SOCK_STREAM, 0);
- 17
- if (sock < 0) {
- perror("sock error");
- exit( - 1);
- }
- 18 19 // 连接服务器
- 20
- if (connect(sock, (struct sockaddr * ) & serverAddr, sizeof(serverAddr)) < 0) {
- 21 perror("connect error");
- 22 exit( - 1);
- 23
- }
- 24 char buf[BUF_SIZE];
- 25 int str_len = read(sock, buf, BUF_SIZE);
- 26
- if (str_len == 0) {
- 27
- return 0;
- 28
- }
- 29 buf[str_len] = 0;
- 30 printf("%s\n", buf);
- 31 32 pid_t pid = fork();
- 33
- if (pid == 0) {
- 34 //子进程负责写操作
- 35 write_routine(sock);
- 36
- }
- 37
- else {
- 38 //父进程负责读操作
- 39 read_routine(sock);
- 40
- }
- 41 42
- return 0;
- 43
- }
- 44 45 void read_routine(int sock) {
- 46 char buf[BUF_SIZE];
- 47
- while (1) {
- 48 memset(buf, 0, sizeof(buf));
- 49 int str_len = read(sock, buf, BUF_SIZE);
- 50
- if (str_len == 0) {
- 51
- return;
- 52
- }
- 53 buf[str_len] = 0;
- 54 printf("%s", buf);
- 55
- }
- 56
- }
- 57 58 void write_routine(int sock) {
- 59 char buf[BUF_SIZE];
- 60
- while (1) {
- 61 memset(buf, 0, sizeof(buf));
- 62 fgets(buf, BUF_SIZE, stdin);
- 63
- if (!strcmp(buf, "exit\n")) {
- 64 shutdown(sock, SHUT_WR);
- 65
- return;
- 66
- }
- 67 write(sock, buf, strlen(buf));
- 68
- }
- 69
- }
来源: http://www.cnblogs.com/wangxiaobao/p/6481316.html