天天看点

Linux进程间通信 - 消息队列

1.1.  什么是消息队列

消息队列提供了一种从一个进程向另一个进程发送一个数据块的方法。每个数据块都被认为含有一个类型,接收进程可以独立地接收含有不同类型的数据结构。我们可以通过发送消息来避免命名管道的同步和阻塞问题。但是消息队列与命名管道一样,每个数据块都有一个最大长度的限制。

1.2.  相关的接口函数介绍

Linux提供了一系列消息队列的函数接口来让我们方便地使用它来实现进程间的通信。它的用法与其他两个System V PIC机制,即信号量和共享内存相似。

1、    ftok函数: 动态获取key值

功  能:把从path导出的信息与id的低序8位组合成一个key_t值(也称IPC键)
原  型:key_t ftok(const char  *path, int id);
返回值:成功则返回key, 否则返回 -1
参  数:path:已存在的文件或目录(必须存在)
        id:项目ID,取低序8位值,故有效范围为(1  - 255),
说  明:1)  如果使用同一个项目ID,那么对于不同文件的两个路径名可能产生相同的key值。
        2)  在访问同一共享内存的多个进程先后调用ftok函数的时间段中,如果 path指定的文件(或目录)
            被删除且重新创建,可能得到不一样的键值
        3) 必须path文件(或目录)存在才可创建成功,和path的所属权限无关,即当前用户没有该path的访问
            权限也可以创建成功
原  理:ftok的典型实现调用stat函数,然后组合以下三个值:
        1.path所在的文件系统的信息(stat结构的st_dev成员) 
        2.该文件在本文件系统内的索引节点号(stat结构的st_ino成员),也称i节点
        3.id的低序8位(不能为0)
       key的32位值 = 0X + id的低序8位 + stat.st_dev的低序8位 + stat.st_ino的低序16位      

2、    msgget函数:创建消息

功  能:创建一个新队列或打开一个存在的队列
原  型:int msgget(key_t, key, int msgflg);
返回值:成功则返回与键值key相对应的消息队列描述字qid,否则返回 -1
参  数:key:键值,可由ftok获取,或者直接使用宏定义的整数值,或者IPC_PRIVATE
        msgflg:消息标志:IPC_CREAT、IPC_EXCL
说  明:IPC_PRIVATE:创建一个该进程独占的消息队列,其它进程不能访问该消息队列
       IPC_CREAT:若消息队列不存在,创建一个新的消息队列,否则,返回存在的消息队列
       IPC_CREAT | IPC_EXCL:IPC_EXCL标志本身没有多大意义,与IPC_CREAT一起使用,保证只创建新
            的消息队列,若对应key的消息队列已经存在,则返回错误
       IPC_NOWAIT:小队列以非阻塞的方式获取(若不能获取,立即返回错误)
原  理:1) 如果key == IPC_PRIVATE,则申请一块内存,创建一个新的消息队列(数据结构msqid_ds),
            将其初始化后加入到msgque向量表中的某个空位置处,返回标示符
        2) 在msgque向量表中找键值为key的消息队列,如果没有找到,结果有二:
            msgflag表示不创建新的队列,则错误返回
            msgflag表示要创建新的队列(IPC_CREAT),则创建新消息队列,创建过程如1)
        3) 如果在msgque向量表中找到了键值为key的消息队列,则有以下情况:
            如果msgflg表示一定要创建新的消息队列而且不允许有相同键值的队列存在,则错误返回。
            如果找到的队列是不能用的或已经损坏的队列,则错误返回
            认证和存取权限检查,如果该队列不允许msgflg要求的存取,则错误返回
            正常,返回队列的标识符      

3、    msgsnd函数:发送消息

功  能:发送一个消息到消息对列
原  型:int msgsnd (int msqid, struct msgbuf *msgp, size_t msgsz, int msgflg)
返回值:0:成功
        -1:非阻塞方式访问满消息队列返回
        EACCES:没有该消息队列写权限
        EAGAIN:msgflg = IPC_NOWAIT,并且已达到整个消息的最大长度(msg_qbytes)
        EFAULT:消息队列地址msgp无法获取
        EIDRM:消息队列已经被删除
        EINTR:消息队列等待写入的时候被中断
        EINVAL:无效msqid、没有活动的消息类型值、无效msgsz值(小于0或大于系统消息队列最大值MSGMAX)
        ENOMEM:内存不够
参  数:msqid:消息队列qid
        msgp:消息内容结构体指针
        msgsz:消息消息内容结构体中,除了消息类型msg_type之后的大小
        msgflg:消息标志:
            0(忽略该标志位,以阻塞的方式发送消息到消息队列)、
            IPC_NOWAIT(以非阻塞的方式发送消息,若消息队列满,函数立即返回EAGAIN)
说  明:msgsz可以优化成实际数据的长度,相比分配空间的长度,可以增加单个队列中的消息个数
原  理:1)  计算id = (unsigned int) msqid % MSGMNI,然后根据id在linux系统消息队列向量msgque[MSGMNI]中
            查找对应的消息队列,并进行认证检查,合法性检查
        2)  如果队列已满,以可中断等待状态(TASK_INTERRUPTIBLE)将当前进程挂起在wwait等待队列(发送
            消息等待队列)上(msgflag==0)
        3)  否则,根据msgbuf的大小申请一块空间,并在其上创建一个消息数据结构struct msg(内核空间),
            将消息缓冲区中的消息内容拷贝到该内存块中消息头的后面(从用户空间拷贝到内核空间)
        4)  将消息数据结构加入到消息队列的队尾,修改队列msqid_ds的相应参数
        5)  唤醒在该消息队列的rwait进程队列(读等待进程队列)上等待读的进程,并返回      

4、    msgrcv函数:接收消息

功  能:从消息队列接收一个消息到msgbuf*
原  型:int msgrcv(int msqid, struct msgbuf *msgp, size_t msgsz,long msgtyp, int msgflg)
返回值:成功返回读取消息的实际字节数
        -1:消息长度大于msgsz
        E2BIG:msgflg ≠ MSG_NOERROR,并且实际消息的长度大于msgsz
        EACCES:没有该消息队列读权限
        EAGAIN:msgflg = IPC_NOWAIT,并且消息队列不可用
        EFAULT:消息队列地址无法获取
        EIDRM:消息队列已经被删除
        EINTR:消息队列等待写入的时候被中断
        EINVAL:msqid无效值、msgsz小于0
        ENOMSG:msgflg = IPC_NOWAIT,并且在消息队列中不存在请求的消息类型msgtyp
参  数:msqid:消息队列qid
        msgp:接收到的消息将要存放的缓冲区
        msgsz:消息数据分配空间的大小
        msgtype:期望接收的消息类型:
            0(获取队列中的第一个消息)、
            大于0(获取具有相同消息类型的第一个信息)、
            小于0(获取类型等于或小于msgtype的绝对值的第一个消息,从类型值最小的消息开始)
        msgflg:消息标志:
            0(忽略)、
            IPC_NOWAIT(如果消息队列为空,不阻塞等待,返回一个EAGAIN)、
            MSG_EXCEPT(取出第一个消息类型大于0,并且消息类型不等于msgtyp的消息)、
            MSG_NOERROR(如果实际消息的长度大于msgsz,对该消息进行截断)
说  明:
原  理:1)  计算id = (unsigned int) msqid % MSGMNI,然后根据id在linux系统消息队列向量msgque[MSGMNI]
            中查找对应的消息队列,并进行认证检查,合法性检查
        2)  根据msgtyp搜索消息队列,情况有二:
            如果找不到所要的消息,则以可中断等待状态(TASK_INTERRUPTIBLE),将当前进程挂起在rwait等
            待队列上;
            如果找到所要的消息,则将消息从队列中摘下,调整队列msqid_ds参数,唤醒该消息队列的wwait进
            程队列上等待写的进程,将消息内容拷贝到用户空间的消息缓冲区msgp中,释放内核中该消息所占用
            的空间,返回      

5、    msgctl函数:控制消息

功  能:对消息队列进行设置以及相关操作,具体操作由cmd指定
原  型:int msgctl (int msqid, int cmd, struct msqid_ds *buf)
返回值:0:成功
        -1:失败
        EACCES:没有读的权限同时,cmd 是IPC_STAT
        EFAULT:buf 指向的地址无效
        EIDRM:在读取中队列被删除
        EINVAL:msgqid无效, 或者msgsz 小于0
        EPERM:IPC_SET或者IPC_RMID 命令被使用,但调用程序没有写的权限
参  数:msqid:消息队列qid
        cmd:消息队列执行的操作:
            IPC_STAT(取取消息队列的结构体msqid_ds,并将其存储在buf指定的地址中)、 
            IPC_SET(设置消息队列的数据结构msqid_ds中的msg_perm 成员的值。这个值取自buf参数)
            IPC_EMID(从系统内核中移走消息队列)
        buf:消息队列msqid_ds结构体指针
说  明:1) 消息队列中的数据结构中唯一可以改动的元素就是ipc_perm。它包括队列的存取权限和关于队列创
            建者和拥有者的信息。你可以改变用户的uid、用户的组gid以及消息队列的存取权限mode      

1.3.  内核限制

消息队列是IPC资源信息中的一种,所以可以通过ipcs确定系统的当前IPC限制及已使用的资源状况。

查看IPC:ipcs  [-u](查看当前的IPC资源状况)、[-l](可以查看IPC限制值)

创建IPC:ipcmk  [ [-q msqid] [-m shmid] [-s semid] [-Qmsgkey] [-M shmkey] [-S semkey] ... ]

删除IPC:ipcrm  [[-Q] [-S <size>] [-M <nsems>]]

# ipcs -l
 
------ Shared Memory Limits --------
max number of segments = 4096
max seg size (kbytes) = 67108864
max total shared memory (kbytes) = 17179869184
min seg size (bytes) = 1
 
------ Semaphore Limits --------
max number of arrays = 128
max semaphores per array = 250
max semaphores system wide = 32000
max ops per semop call = 32
semaphore max value = 32767
 
------ Messages: Limits --------
max queues system wide = 10                 【系统最多的消息队列数量(最多支持同时10个消息队列)】
max size of message (bytes) = 2048          【单个消息的最大字节数2048】
default max size of queue (bytes) = 65536   【默认的单个队列的大小65536】      

修改消息队列参数如下:

1、  永久修改:

在root用户下修改/etc/sysctl.conf配置文件

具体方法为:在sysctl.conf中加上 kernel.msgmni=10kernel.msgmax=2048 kernel.msgmnb=65535,然后运行sysctl –p即可修改。

在Red Hat中,rc.sysinit初始化脚本将自动读取/etc/sysctl.conf文件

而在SUSE Linux中,还需要激活 boot.sysctl服务(chkconfig boot.sysctl on)

2、  临时修改:

root用户下sysctl -w kernel.msgmni=10 kernel.msgmax=2048 kernel.msgmnb=65535

1.4.  实例

发送信息的程序MsgSend.c的源代码如下:

#include <unistd.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <sys/msg.h>
#include <errno.h>

#define MSGKEY 1024
#define MAX_TEXT 512  
struct msg_st
{
    long int msg_type;      //第一个参数必须是long的消息类型
    char text[MAX_TEXT];    //此后可以定义多个数组和其他值
};

#define SPACE "                "
#define PRINT_X(var1, var2) printf(""#var2"%.*s: 0X%08X\n", \
        strlen(SPACE) - strlen(#var2), SPACE, var1.var2)
#define PRINT_D(var1, var2) printf(""#var2"%.*s: %d\n", \
        strlen(SPACE) - strlen(#var2), SPACE, var1.var2)

void Show(int msqid)
{
    int iRet = 0;
    struct msqid_ds buf;
    
    iRet = msgctl(msqid, IPC_STAT, &buf);
    if(0 != iRet)
    {
        printf("msgctl error, errno=%d [%s]\n", errno, strerror(errno));
        return ;
    }
    
    printf("\n");
    PRINT_X(buf.msg_perm, __key);
    PRINT_D(buf.msg_perm, uid);
    PRINT_D(buf.msg_perm, gid);
    PRINT_D(buf.msg_perm, cuid);
    PRINT_D(buf.msg_perm, cgid);
    PRINT_X(buf.msg_perm, mode);
    PRINT_X(buf.msg_perm, __seq);
    
    PRINT_X(buf, msg_stime);
    PRINT_X(buf, msg_rtime);
    PRINT_X(buf, msg_ctime);
    PRINT_D(buf, __msg_cbytes);
    PRINT_D(buf, msg_qnum);
    PRINT_D(buf, msg_qbytes);
    PRINT_X(buf, msg_lspid);
    PRINT_X(buf, msg_lrpid);
    printf("\n");
}


int main()  
{
    int running = 1;
    struct msg_st data;
    char buffer[MAX_TEXT];
    int msgid = -1;

    //建立消息队列
    msgid = msgget(MSGKEY, 0666 | IPC_CREAT);
    if(msgid == -1)  
    {
        fprintf(stderr, "msgget failed, error=%d [%s]\n", errno, strerror(errno));
        exit(EXIT_FAILURE);
    }
    
    Show(msgid);

    //向消息队列中写消息,直到写入end
    while(running)
    {
        //输入数据
        printf("Enter some text: ");
        fgets(buffer, MAX_TEXT, stdin);
        data.msg_type = 1;    //注意此处设置消息类型
        strcpy(data.text, buffer);
        
        /* 向队列发送数据, 如果是字符串,则参数3可以修改为字符串的长度,可以节省内存
           msgsnd(msgid, (void*)&data, strlen(data.text), 0) */
        if(msgsnd(msgid, (void*)&data, MAX_TEXT, 0) == -1)
        {
            fprintf(stderr, "msgsnd failed, error=%d [%s]\n", errno, strerror(errno));
            exit(EXIT_FAILURE);
        }

        Show(msgid);
        
        //输入end结束输入
        if(strncmp(buffer, "end", 3) == 0)
        {
            running = 0;
        }
        sleep(1);
    }
    
    exit(EXIT_SUCCESS);
}
           

接收信息的程序MsgReceive.c的源代码如下:

#include <unistd.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <errno.h>
#include <sys/msg.h>

#define MSGKEY 1024
#define MAX_TEXT 512
struct msg_st
{
    long int msg_type;
    char text[MAX_TEXT];
};

int main()
{
    int running = 1;
    int msgid = -1;
    struct msg_st data;
    long int msgtype = 0; //此处设置为0,表示接收所有消息

    //建立消息队列
    msgid = msgget(MSGKEY, 0666 | IPC_CREAT);
    if(msgid == -1)
    {  
        fprintf(stderr, "msgget failed, error=%d [%s]\n", errno, strerror(errno));
        exit(EXIT_FAILURE);
    }
    
    //从队列中获取消息,直到遇到end消息为止
    while(running)
    {
        if(msgrcv(msgid, (void*)&data, MAX_TEXT, msgtype, 0) == -1)
        {
            fprintf(stderr, "msgrcv failed, error=%d [%s]\n", errno, strerror(errno));
            exit(EXIT_FAILURE);
        }
        
        printf("You wrote: %s\n",data.text);
        
        //遇到end结束
        if(strncmp(data.text, "end", 3) == 0)
        {
            running = 0;
        }
    }
    
    //删除消息队列
    if(msgctl(msgid, IPC_RMID, 0) == -1)
    {
        fprintf(stderr, "msgctl(IPC_RMID) failed, error=%d [%s]\n", 
                errno, strerror(errno));
        exit(EXIT_FAILURE);
    }
    
    exit(EXIT_SUCCESS);
}
           

1.5.  注意事项

一、每次msgrcv一个消息时:
1、那个消息会在内核中移除 
2、每次msgrcv都只会给一个消息出来,不管你用多大的buf来接收,都是可以的。如果msgrcv的bufSize小于实际的该消息的大小,那么可以设置一个标志(MSG_NOERROR):表示截断。 如果不设置,那么会报错,取不出来
   消息满了,则默认0为阻塞,直到有了空间位置,才能snd消息进入到内核
   消息空了,则默认0为阻塞,直到有了一个消息位置,才能 rcv消息进入到进程内存
二、如果指定msgflg = MSG_NOERROR,如果函数取得的消息长度大于msgsz,将只返回msgsz 长度的信息,剩下的部分被丢弃了。如果不指定这个参数,E2BIG 将被返回,而消息则留在队列中不被取出
三、如果使用msgctl控制单个队列的大小(msg_qbytes),当设置的值小于系统设置的值时,可以生效;当设置的值大于系统设置的值时,设置会失败,此时还是原来系统设置的值      

继续阅读