并发控制:进程通信之消息队列
消息队列是消息的链接表,存储在内核中,用队列标识符标识(XSI的标识符)。消息队列的每个消息对象至少两个对象:消息类型(长整型表示)和消息主体。结构体定义如下:
struct msg { long type; char data[50]; };//此消息结构体由用户定义,开头一定是消息类型
每个消息队列都有一个msgid_ds的结构体(XSI IPC都有)。主要内容如下:
struct msgid_ds { struct ipc_perm; msgnum_t msg_qnum; //number of message on queue msg_len_t msg_qbytes; //max length of message on queue pid_t msg_lspid; //pid of last msgsnd() pid_t msg_lrpid; //pid of last msgrcv() };
该结构体的内容可以通过msgctl()获取(使用方法见3.2)。
消息队列的使用方式通常是:发送方调用msgget()创建消息队列,调用msgsnd()发送消息;接收方调用msgget()获取消息队列ID,调用msgrcv()接收消息,在退出时调用msgctl()删除该消息队列。每个函数的具体使用方法如下:
/* 函数名:msgget key:IPC的键,通常调用ftok生成或直接指定 msgflg: IPC_CREAT:创建新的IPC,如果有,则打开 IPC_EXCL:通常和IPC_CREAT一起使用,如果IPC已经存在,创建失败,errno显示EEXIST 关于读写权限见下图 返回值:消息队列ID */ int msgget(key_t key, int msgflg); /* 函数名:msgsnd msqid:消息队列标识符 msg_ptr:消息格式的结构体 msg_sz:消息主体大小(不包含消息类型) msgflg:如果为0,则在发送到消息队列前一直等待;如果为IPC_NOWAIT,则当队列满时,出错返回EAGAIN */ int msgsnd(int msqid, const void *msg_ptr, size_t msg_sz, int msgflg); /* 函数名:msgrcv 参数基与msgsnd一致 msgtype:期望接收的消息类型,如果为0,则表示接收消息队列头的一条消息 */ msgrcv(int msqid, void *msg_ptr, size_t msg_sz, long int msgtype, int msgflg); /* 函数名:msgctl 通常用来删除消息队列 */ int msgctl(int msqid, int command, struct msqid_ds *buf); //删除消息队列时用法为: msgctl(msgid,IPC_RMID,NULL);
关于msgget函数第二个参数msgflag,除了IPC_CREAT和IPC_EXCL,还有读写控制权限,如下图:
通常在调用一个XSI IPC时,都用如下形式:
msgget(key,IPC_CREAT|0666);//创建或打开一个消息队列并赋予所有读写权限
下面看一个完整的例子:写进程创建并写入消息;读进程读取消息,并在退出时删除消息队列
1 //写进程 2 #include3 #include <string.h> 4 #include 5 #include 6 #include 7 #include 8 #include 9 10 struct MSG 11 { 12 long type; 13 char msg[50]; 14 }; 15 16 int main() 17 { 18 key_t key=ftok("/tmp",1); 19 int msgid=msgget(key,IPC_CREAT|0666); 20 if(msgid<0) 21 { 22 printf("msgget error\n"); 23 msgid=msgget(key,O_RDONLY); //获取ID 24 msgctl(msgid,IPC_RMID,NULL); //删除消息队列 25 exit(1); 26 } 27 MSG m; 28 char c; 29 while(1) 30 { 31 printf("Input type\n"); 32 scanf("%ld",&(m.type)); 33 scanf("%c",&c); //处理换行 34 printf("Input message\n"); 35 fgets(m.msg,sizeof(m)-sizeof(long),stdin); 36 if(msgsnd(msgid,(void*)&m,50,0)==EAGAIN) 37 printf("Queue has no volumn\n"); 38 if(strncmp(m.msg,"Quit",4)==0) 39 break; 40 41 } 42 exit(0); 43 } 44 45 46 47 48 //读进程,头文件与写进程相同 49 struct MSG 50 { 51 long type; 52 char msg[50]; 53 }; 54 55 int main() 56 { 57 MSG m; 58 key_t key=ftok("/tmp",1); 59 int msgid=msgget(key,IPC_CREAT|0666); 60 if(msgid<0) 61 { 62 printf("msgget error\n"); 63 exit(1); 64 } 65 while(1) 66 { 67 if(msgrcv(msgid,(void*)&m,50,0,0)<0) //接收任何消息类型 68 { 69 printf("receive error\n"); 70 if(errno==E2BIG) printf("E2BIG"); 71 if(errno==EACCES) printf("EACCES"); 72 if(errno==EAGAIN) printf("EAGAIN"); 73 if(errno==EFAULT) printf("EFAULT"); 74 if(errno==EIDRM) printf("EIDRM"); 75 if(errno==EINTR) printf("EINTR"); 76 if(errno==EINVAL) printf("EINVAL"); 77 if(errno==ENOMSG)printf("ENOMSG"); 78 exit(1); 79 } 80 printf("type:%ld msg:%s\n",m.type,m.msg); 81 if(strncmp(m.msg,"Quit",4)==0) break; 82 } 83 msgctl(msgid,IPC_RMID,NULL); 84 exit(0); 85 }
注:消息队列一旦创建,除非显示删除(调用msgctl,或命令行ipcrm -q msgid),将一直存在于内存中。当一个进程删除了某个消息队列后,其他进程再次调用该消息队列将出错。