消息与消息队列
IPC (Inter process communication)
广义: 所有可以用于进程间通信的对象和方法
狭义: 特指消息队列, 信号量, 共享内存
消息队列 应用于进程间少量数据的顺序共享
信号量 应用于进程间互斥
共享内存 应用与进程间大量数据的随机共享访问
命令行查询 IPC 对象
ipcs [...]
-q 查询 消息队列
-s 查询 信号量
-m 查询 共享内存
-a 查询所有
无 查询所有
命令行删除 IPC 对象
ipcrm [...]
-q msqid 删除消息队列
-m shmid 删除共享内存
-s semid 删除信号量
创建消息队列的步骤:
消息队列使用 (函数):
1: ftok 使用某个文件做关键字创建 key
2: msgget 使用 key 创建消息队列 msqid
3: msgsnd 往消息队列中写入消息
4: msgrcv 从消息队列中读取消息
5: msgctl 删除消息队列
创建 IPC
- <sys/types>
- <sys/ipc.h>
- key_t ftok(const char *pathname, int id)
注意: 由文件名 和 id 唯一决定一个 key, 文件名和 id 都一样的话, 获得的 key 必然一样, 否则, key 必然不一样, 通过创建一个 key, 实现 两个进程通信的桥梁关键字.
该文件也要必须存在, 可空白.
创建消息队列
- <sys/types.h>
- <sys/ipc.h>
- <sys/msg.h>
- int msgget(key_t key, int msgflg)
返回 msqid
创建消息队列
IPC_PRIVATE 创建 key = 0 的消息队列, 可以同时存在多个
IPC_EXCL | IPC_CREAT 确保这个消息队列是新创建的, 不与已有的消息队列冲突
往消息队列发送消息
int msgsnd(int msqid, const void *msg, size_t szLen, int msgflg);
参数解析:
msg 指向 msgbuf 结构 (自定义)
- struct msgbuf {
- long type; // 大于 0 的数
- char text[N]; // 文本大小由 szLen 指定
- }
- msgflag:
0 队列满则阻塞
IPC_NOWAIT 队列满则返回错误 (EAGAIN)
从消息队列中读取消息
size_t msgrcv(int msqid, void *msg, size_t sz, long msgtype, int msgflg)
参数:
msg 指向 msg_info 数据结构, 同 msgsnd
msgflg 取值 ( MSG_XXX 需要定义宏: _GNU_SOURCE)
0 : 默认值, 没有消息则阻塞
IPC_NOWAIT: 没有这个类型的消息, 立即返回 (ENOMSG)
MSG_EXCEPT: 返回队列中第一个不是某类型的消息
注意: 使用 MSG_EXCEPT 必须加上宏:#define _GNU_SOURCE, 并且该宏必须在 < sys/msg.h > 前面定义.
MSG_NOERROR: 消息内容大于请求长度, 丢弃多余部分
往消息队列发送消息
msgsnd 函数调用成功会修改 msqid_ds
msg_lspid 设置为对应进程 PID
msg_qnum 加 1
msg_stime 设置为当前时间
从队列中读取消息
msgrcv 调用成功后, 修改 msqid_ds 结构体
msg_lrpid: 设置为进程 PID
msg_qnum: 减 1
msg_rtime: 设置为当前是时间
例子:
- typedef struct tagMsg
- {
- long type;
- char szBuf[1024];
- }MSG_S;
- void testMsgRS()
- {
- int ch;
- fprintf(stderr,r"select r/w:");
- scanf("%c",&ch);
- if(ch!='w'&&ch!='r')
- {
- return ;
- }
- // 创建 key
- key_t key=ftok("123",321);
- if(key==-1)
- {
- perror("fail ftok");
- return ;
- }
- printf("key=%d\n",key);
- // 创建消息队列
- int msqid=msgget(key,IPC_CREAT|0666);
- if(msqid==-1)
- {
- perror("fail msgget!");
- return ;
- }
- printf("msqid=%d\n",msqid);
- MSG_S msg;
- //read message
- if(rs=='r')
- {
- while(1)
- {
- memset(&msg,0,sizeof(msg.szBuf));
- fprintf(stderr,"type:");
- scanf("%ld",&(msg.type));
- //read(0,&(msg.type),sizeof(long));
- int nRet=msgrcv(msqid,&msg,sizeof(msg.szBuf),msg.type, IPC_NOWAIT);
- if(nRet<0)
- {
- perror("fail msgsnd");
- break;
- }
- printf("receive:%s\n",msg.szBuf);
- }
- }
- //send message
- else if(rs=='s')
- {
- fprintf(stderr,"example:1 ABCD!\n");
- while(1)
- {
- memset(&msg,0,sizeof(msg.szBuf));
- fprintf(stderr,"send:");
- scanf("%ld%s",&(msg.type),msg.szBuf);
- //read(0,&(msg.type),msg.szBuf,sizeof(MSG_S));
- int nRet=msgsnd(msqid,&msg,strlen(msg.szBuf),0);
- if(nRet<0)
- {
- perror("fail msgsnd");
- break;
- }
- }
- }
- }
消息队列设置
int msgctl(int msqid, int cmd, struct msqid_ds *buf)
参数:
cmd 参数
IPC_STAT 读取内核中 msqid_ds 数据到 buf
IPC_SET 设置 buf 中数据到 msqid_ds
IPC_RMID 移除消息队列, 读写消息队列进程返回 EIDRM
IPC_INFO 返回系统级别的消息队列限制, 保存到 buf, 这里 buf 指向 msginfo 结构数据
MSG_INFO 返回 msginfo 消息, 同时获取资源消耗情况:
msgpool 系统中存在的消息队列数
msgmap 系统中所有消息队列的消息数
msgtql 系统中所有消息队列占用的字节数
MSG_STAT 返回 msginfo 消息
msqid 参数使用内核的消息队列信息
参数三结构体:
- struct msqid_ds {
- struct ipc_perm msg_perm; // 消息队列权限等信息
- time_t msg_stime; //msgsnd 最后调用时间
- time_t msg_rtime; //msgrcv 最后调用时间
- time_t msg_ctime; // 消息队列最后改变的时间
- unsigned long _msg_cbytes; // 消息队列当前消息总长度
- msgqnum_t msg_qnum; // 消息队列中消息个数
- msglen_t msg_qbytes; // 队列能存消息的最大长度
- pid_t msg_lspid; // 最后调用 msgsnd 的进程号
- pid_t msg_lrpid; // 最后调用 msgrcv 的进程
- }
struct ipc_perm 结构体
- struct ipc_perm{
- key_t __key; //msgget 的 key 参数
- uid_t uid // 消息队列所有者 euid
- gid_t gid; // 消息队列所有者 egid
- uid_t cuid; // 消息队列创建者 euid
- gid_t cgid; // 消息队列创建者 egid
- unsigned short mode; // 访问权限
- unsigned short __seq; // 序列号
- }
MSG_INFO 返回 msginfo 消息, 结构体:
- struct msginfo {
- int msgpool; // 系统中存在的消息队列数
- int msgmap; // 系统中所有消息队列的消息数
- int msgmax; // 单条消息中最大长度
- int msgmnb; // 消息队列最大可写入字节数
- int msgmni; // 消息队列可容纳消息条数
- int msgssz; // 消息段大小, 未使用
- int msgtql; // 系统中所有消息队列占用的字节数
- }
内核中维护的消息队列数据格式 (链表)
- struct msg {
- struct msg *msg_next; // 下一个消息节点
- long msg_type; // 消息类型
- ushort msg_ts; // 消息长度
- short msg_spot; // 消息内容 (指针)
- }
例子:
- #define _GNU_SOURCE
- #include <stdio.h>
- #include <sys/msg.h>
- #include <string.h>
- #include <stdlib.h>
- #include <sys/ipc.h>
- #include <unistd.h>
- typedef struct tagMsg
- {
- long type;
- char szBuf[1024];
- }MSG_S;
- void testMsgRS(char rs)
- {
- key_t key=ftok("123",321);
- if(key==-1)
- {
- perror("fail ftok");
- return ;
- }
- printf("key=%d\n",key);
- int msqid=msgget(key,IPC_CREAT|0666);
- if(msqid==-1)
- {
- perror("fail msgget!");
- return ;
- }
- printf("msqid=%d\n",msqid);
- MSG_S msg;
- //read message
- if(rs=='r')
- {
- while(1)
- {
- memset(&msg,0,sizeof(msg.szBuf));
- fprintf(stderr,"type:");
- scanf("%ld",&(msg.type));
- //read(0,&(msg.type),sizeof(long));
- int nRet=msgrcv(msqid,&msg,sizeof(msg.szBuf),msg.type, IPC_NOWAIT);
- if(nRet<0)
- {
- perror("fail msgsnd");
- break;
- }
- printf("receive:%s\n",msg.szBuf);
- }
- }
- //send message
- else if(rs=='s')
- {
- fprintf(stderr,"example:1 ABCD!\n");
- while(1)
- {
- memset(&msg,0,sizeof(msg.szBuf));
- fprintf(stderr,"send:");
- scanf("%ld%s",&(msg.type),msg.szBuf);
- //read(0,&(msg.type),msg.szBuf,sizeof(MSG_S));
- int nRet=msgsnd(msqid,&msg,strlen(msg.szBuf),0);
- if(nRet<0)
- {
- perror("fail msgsnd");
- break;
- }
- }
- }
- }
- // 列举 msqid_ds 的信息
- void printfInfo(struct msqid_ds *pstMsg)
- {
- printf("--------msg_perm------\n");
- printf("msgget key:%#o\n",pstMsg->msg_perm.__key);
- printf("all user euid:%d\n",pstMsg->msg_perm.uid);
- printf("all user egid:%d\n",pstMsg->msg_perm.gid);
- printf("creater euid:%d\n",pstMsg->msg_perm.cuid);
- printf("creater egid:%d\n",pstMsg->msg_perm.cgid);
- printf("R+W mode:%#o\n",pstMsg->msg_perm.mode);
- printf("queue numer:%d\n",pstMsg->msg_perm.__seq);
- printf("\n");
- printf("msgsnd time:%d\n",(int)pstMsg->msg_stime);
- printf("msgrcv time:%d\n",(int)pstMsg->msg_rtime);
- printf("change time:%d\n",(int)pstMsg->msg_ctime);
- printf("message len:%ld\n",pstMsg->msg_cbytes);
- printf("message number:%d\n",(int)pstMsg->msg_qnum);
- printf("queue max len:%d\n",(int)pstMsg->msg_qbytes);
- printf("msgsnd pid:%d\n",(int)pstMsg->msg_lspid);
- printf("msgrcv pid:%d\n",(int)pstMsg->msg_lrpid);
- printf("\n");
- }
- // 提示
- void ListSel()
- {
- printf("\tstat:output message queue infomation\n");
- printf("\tset:set message queue mode\n");
- printf("\texit:delete message queue\n");
- printf("\n");
- }
- // 消息队列设置
- void testMsgctl()
- {
- key_t key=ftok("123",321);
- if(key==-1)
- {
- perror("fail ftok");
- return ;
- }
- printf("key=%d\n",key);
- int msqid=msgget(key,IPC_CREAT|0666);
- if(msqid==-1)
- {
- perror("fail msgget!");
- return ;
- }
- printf("msqid=%d\n",msqid);
- ListSel();
- struct msqid_ds stMsg;
- char szCmd[128];
- // 对消息队列进行操作. 显示, 修改
- while(1)
- {
- fprintf(stderr,"-->");
- scanf("%s",szCmd);
- // 显示
- if(!strcmp(szCmd,"stat"))
- {
- msgctl(msqid, IPC_STAT, &stMsg);
- printfInfo(&stMsg);
- }
- // 修改权限
- else if(!strcmp(szCmd,"set"))
- {
- msgctl(msqid, IPC_STAT, &stMsg);
- int mode;
- printf("now mode:%d\n",stMsg.msg_perm.mode);
- fprintf(stderr,"input new mode:");
- scanf("%o",&mode);
- if(mode<0||mode>0777)
- {
- fprintf(stderr,"mode is invaid\n");
- continue;
- }
- // 修改操作
- stMsg.msg_perm.mode=mode;
- // 修改后更新到消息队列
- int nRet=msgctl(msqid, IPC_SET, &stMsg);
- if(nRet)
- {
- perror("failed IPC_SET");
- continue;
- }
- else
- {
- printf("set mode sucess\n");
- }
- }
- // 退出
- else if(!strcmp(szCmd,"exit"))
- {
- break;
- }
- }
- // 删除消息队列
- fprintf(stderr,"sure to delete this message[y/n]:");
- scanf("%s",szCmd);
- if(!strcmp(szCmd,"y"))
- {
- // 删除操作
- int nRet=msgctl(msqid, IPC_RMID, &stMsg);
- if(nRet)
- {
- perror("failed IPC_RMID");
- }
- else
- {
- printf("delete message queue[%d]sucess!\n",msqid);
- }
- }
- }
- int main(int argc,char** argv)
- {
- /*// 默认参数传入
- if((argc!=2)||(strcmp(argv[1],"s")&&strcmp(argv[1],"r")))
- {
- printf("select [s/r/c]:%s\n",argv[1]);
- printf("\ts:send/receive message\n");
- printf("\tr:receive message\n");
- printf("\tc:control message queue\n");
- return 0;
- }
- if(argv[1][0]=='s'||argv[1][0]=='r')
- {
- testMsgRS(argv[1][0]);
- }
- else
- {
- testMsgctl();
- }*/
- printf("\ts:send/receive message\n");
- printf("\tr:receive message\n");
- printf("\tc:control message queue\n");
- printf("select your choice [s/r/c]:");
- char ch;
- scanf("%c",&ch);
- // 读, 写队列消息
- if(ch=='s'||ch=='r')
- {
- testMsgRS(ch);
- }
- // 修改信息队列权限
- else
- {
- testMsgctl();
- }
- return 0;
- }
来源: http://www.bubuko.com/infodetail-2655515.html