并发控制:进程通信之消息队列


  消息队列是消息的链接表,存储在内核中,用队列标识符标识(XSI的标识符)。消息队列的每个消息对象至少两个对象:消息类型(长整型表示)和消息主体。结构体定义如下:

struct msg
{
    long type;
    char data[50];
};//此消息结构体由用户定义,开头一定是消息类型

  每个消息队列都有一个msgid_ds的结构体(XSI IPC都有)。主要内容如下:

struct msgid_ds
{
struct ipc_perm;
msgnum_t msg_qnum; //number of message on queue
msg_len_t msg_qbytes; //max length of message on queue
pid_t msg_lspid;  //pid of last msgsnd()
pid_t msg_lrpid; //pid of last msgrcv()
};

该结构体的内容可以通过msgctl()获取(使用方法见3.2)。

  消息队列的使用方式通常是:发送方调用msgget()创建消息队列,调用msgsnd()发送消息;接收方调用msgget()获取消息队列ID,调用msgrcv()接收消息,在退出时调用msgctl()删除该消息队列。每个函数的具体使用方法如下:

/*
函数名:msgget
key:IPC的键,通常调用ftok生成或直接指定
msgflg:
    IPC_CREAT:创建新的IPC,如果有,则打开
    IPC_EXCL:通常和IPC_CREAT一起使用,如果IPC已经存在,创建失败,errno显示EEXIST
    关于读写权限见下图
返回值:消息队列ID
*/
int msgget(key_t key, int msgflg);

/*
函数名:msgsnd
msqid:消息队列标识符
msg_ptr:消息格式的结构体
msg_sz:消息主体大小(不包含消息类型)
msgflg:如果为0,则在发送到消息队列前一直等待;如果为IPC_NOWAIT,则当队列满时,出错返回EAGAIN
*/
int msgsnd(int msqid, const void *msg_ptr, size_t msg_sz, int msgflg);

/*
函数名:msgrcv
参数基与msgsnd一致
msgtype:期望接收的消息类型,如果为0,则表示接收消息队列头的一条消息
*/
msgrcv(int msqid, void *msg_ptr, size_t msg_sz, long int msgtype, int msgflg);

/*
函数名:msgctl
通常用来删除消息队列
*/
int msgctl(int msqid, int command, struct msqid_ds *buf);
//删除消息队列时用法为:
msgctl(msgid,IPC_RMID,NULL);

关于msgget函数第二个参数msgflag,除了IPC_CREAT和IPC_EXCL,还有读写控制权限,如下图:

                       

通常在调用一个XSI IPC时,都用如下形式:

msgget(key,IPC_CREAT|0666);//创建或打开一个消息队列并赋予所有读写权限

下面看一个完整的例子:写进程创建并写入消息;读进程读取消息,并在退出时删除消息队列

 1 //写进程
 2 #include 
 3 #include <string.h>
 4 #include 
 5 #include 
 6 #include 
 7 #include 
 8 #include 
 9 
10 struct MSG
11 {
12     long type;
13     char msg[50];
14 };
15 
16 int main()
17 {
18     key_t key=ftok("/tmp",1);
19     int msgid=msgget(key,IPC_CREAT|0666);
20     if(msgid<0)
21     {
22         printf("msgget error\n");
23         msgid=msgget(key,O_RDONLY); //获取ID
24         msgctl(msgid,IPC_RMID,NULL); //删除消息队列
25         exit(1);
26     }
27     MSG m;
28     char c;
29     while(1)
30     {
31         printf("Input type\n");
32         scanf("%ld",&(m.type));
33         scanf("%c",&c);  //处理换行
34         printf("Input message\n");
35         fgets(m.msg,sizeof(m)-sizeof(long),stdin);
36         if(msgsnd(msgid,(void*)&m,50,0)==EAGAIN)
37             printf("Queue has no volumn\n");
38         if(strncmp(m.msg,"Quit",4)==0)
39             break;
40         
41     }
42     exit(0);
43 }
44 
45 
46 
47 
48 //读进程,头文件与写进程相同
49 struct MSG
50 {
51     long type;
52     char msg[50];
53 };
54 
55 int main()
56 {
57     MSG m;
58     key_t key=ftok("/tmp",1);
59     int msgid=msgget(key,IPC_CREAT|0666);
60     if(msgid<0)
61     {
62         printf("msgget error\n");
63         exit(1);
64     }
65     while(1)
66     {
67         if(msgrcv(msgid,(void*)&m,50,0,0)<0) //接收任何消息类型
68         {
69             printf("receive error\n");
70             if(errno==E2BIG) printf("E2BIG");
71             if(errno==EACCES) printf("EACCES");
72             if(errno==EAGAIN) printf("EAGAIN");
73             if(errno==EFAULT) printf("EFAULT");
74             if(errno==EIDRM) printf("EIDRM");
75             if(errno==EINTR) printf("EINTR");
76             if(errno==EINVAL) printf("EINVAL");
77             if(errno==ENOMSG)printf("ENOMSG");
78             exit(1);
79         }
80         printf("type:%ld msg:%s\n",m.type,m.msg);
81         if(strncmp(m.msg,"Quit",4)==0) break;
82     }
83     msgctl(msgid,IPC_RMID,NULL);
84     exit(0);
85 }

注:消息队列一旦创建,除非显示删除(调用msgctl,或命令行ipcrm -q msgid),将一直存在于内存中。当一个进程删除了某个消息队列后,其他进程再次调用该消息队列将出错。