歡迎來到Linux教程網
Linux教程網
Linux教程網
Linux教程網
Linux教程網 >> Unix知識 >> Unix基礎知識 >> UNIX環境高級編程:system V消息隊列

UNIX環境高級編程:system V消息隊列

日期:2017/3/3 15:19:47   编辑:Unix基礎知識

unix早期通信機制中的信號能夠傳送的信息量有限,管道則只能傳送無格式字節流,這遠遠是不夠的。

消息隊列(也叫報文隊列)客服了這些缺點:

消息隊列就是一個消息的鏈表。

可以把消息看作一個記錄,具有特定的格式。

進程可以按照一定的規則向消息隊列中添加新消息;另一些進程可以從消息隊列中讀走消息。

消息隊列是隨內核持續的,只有內核重啟或人工刪除時,該消息隊列才會被刪除。

system V消息隊列使用消息隊列標識符標識。具有足夠特權的任何進程都可以往一個給定隊列放置一個消息,具有足夠特權的任何進程都可以從一個給定隊列讀出一個消息。

消息隊列具有一定的FIFO特性,但是它可以實現消息的隨機查詢,比FIFO具有更大的優勢。同時這些消息又存在於內核中,由“隊列”ID來標識消息隊列的實現包括創建或打開消息隊列,添加消息,讀取消息和控制消息隊列這四種操作。

對於系統中的每個消息隊列,內核維護一個定義在<sys/msg.h>頭文件中的信息結構。

struct msqid_ds {  
    struct ipc_perm msg_perm;  
    struct msg *msg_first;      /* first message on queue,unused  */
    struct msg *msg_last;       /* last message in queue,unused */
    __kernel_time_t msg_stime;  /* last msgsnd time */
    __kernel_time_t msg_rtime;  /* last msgrcv time */
    __kernel_time_t msg_ctime;  /* last change time */
    unsigned long  msg_lcbytes; /* Reuse junk fields for 32 bit */
    unsigned long  msg_lqbytes; /* ditto */
    unsigned short msg_cbytes;  /* current number of bytes on queue */
    unsigned short msg_qnum;    /* number of messages in queue */
    unsigned short msg_qbytes;  /* max number of bytes on queue */
    __kernel_ipc_pid_t msg_lspid;   /* pid of last msgsnd */
    __kernel_ipc_pid_t msg_lrpid;   /* last receive pid */
};

我們可以將內核中的某個特定的消息隊列畫為一個消息鏈表,如圖假設有一個具有三個消息的隊列,消息長度分別為1字節,2字節和3字節,而且這些消息就是以這樣的順序寫入該隊列的。再假設這三個消息的類型分別為100,200,300.

1.msgget函數

msgget函數用於創建一個新的消息隊列或訪問一個已存在的消息隊列。

#include <sys/msg.h>  
int msgget(key_t key,int oflag);

返回值是一個整數標識符,其他三個msg函數就用它來指代該隊列。它是基於指定的key產生的,而key即可以是ftok的返回值,也可以是常值IPC_PRIVATE。

oflag是讀寫權限值得組合。它還可以與IPC_CREAT或IPC_CREAT | IPC_EXCL按位或,IPC_NOWAIT --- 讀寫消息隊列要求無法得到滿足時,不阻塞。

當創建一個新消息隊列時,msqid_ds結構的如下成員被初始化。

(1)msg_perm結構的uid和cuid成員被設置成當前進程的有效用戶ID,gid和cgid成員被設置成當前進程的有效組ID。

(2)oflag中的讀寫權限位存放在msg_perm.mode中。

(3)msg_qnum,msg_lspid,msg_lrpid,msg_stime和msg_rtime被置為0.

(4)msg_ctime被設置成當前時間。

(5)msg_qbytes被設置成系統限制值。

huangcheng@ubuntu:~$ ipcs -q  ----查看消息隊列  
      
------ Message Queues --------  
key        msqid      owner      perms      used-bytes   messages

2.msgsnd函數

使用msgget函數打開一個消息隊列後,我們使用msgsnd往其上放置一個消息。

#include <sys/msg.h>  
int msgsnd(int msgid,const void *ptr,size_t length,int flag);

其中msqid是由msgget返回的標識符。ptr是一個結構指針,該結構具有如下模板,它定義在<sys/msg.h>中。

struct msgbuf {  
    long mtype;       /* message type, must be > 0 */
    char mtext[1];    /* message data */
};

msgsnd的length參數以字節為單位指定待發送消息的長度。這是位於長整數消息類型之後的用戶自定義數據的長度(注意:不包括消息類型)。該長度可以是0.

flag參數既可以是0,也可以是IPC_NUWAIT。IPC_NOWAIT標志使得msgsnd調用非阻塞:如果沒有存放新消息的可用空間(即消息隊列已滿),該函數馬上返回。這個條件發生的情形包括:

(1)在指定的隊列中已有太多的字節(對於該隊列的msqid_ds結構中的msg_qbytes值);

(2)在系統范圍存在太多的消息。

如果這兩個條件中有一個存在,而且IPC_NUWAIT標志已指定,msgsnd就返回一個EAGAIN錯誤。如果這兩個條件有一個存在,但是IPC_NUWAIT標志未指定,那麼調用線程被投入睡眠直到:

(1)具備存放新消息的空間;

(2)由msqid標志的消息隊列從系統中刪除(這種情況下返回一個EIDRM錯誤);

(3)調用線程被某個捕獲的信號所中斷(這種情況下返回一個EINTR錯誤)。

3.msgrcv函數

使用msgrcv函數從某個消息隊列中讀取一個消息。

#include <sys/msg.h>  
ssize_t msgrcv(int msqid,void *ptr,size_t lengh,long type,int flag);

其中ptr參數指定所接收消息的存放位置。跟msgsnd一樣,該指針指向緊挨在真正的消息數據之前返回的長整數類型字段。

length指定了由ptr指向的緩沖區中數據部分的大小。這是該函數能返回的最大數據量。該長度不包括長整數類型字段。

type指定希望從所給定的隊列中讀出什麼樣的消息。

(1)如果type為0,那就返回該隊列中的第一個消息,既然每個消息隊列都是作為一個FIFO鏈表維護的,因此type為0指定返回該隊列中最早的消息。

(2)如果type大於0,那就返回其類型值為type的第一個消息。

(3)如果type小於0,那就返回其類型小於或等於type參數的絕對值的消息中類型值最小的第一個消息。

msgrcv的flag參數指定所請求類型的消息不在所指定的隊列中時該做何處理。在沒有消息可得的情況下,如果設置了flag中的IPC_NOWAIT位,msgrcv函數就立即返回一個ENOMSG錯誤。否則,設置了flag為0,調用者被阻塞到下列某個事件發生為止:

(1)有一個所請求類型的消息可獲取;

(2)由msqid標志的消息隊列從系統中刪除(這種情況下返回一個EIDRM錯誤);

(3)調用線程被某個捕獲的信號所中斷(這種情況下返回一個EINTR錯誤)。

flag參數中另有一位可以指定:MSG_NOERROR。當所接收消息的真正數據部分大於length參數時,如果設置了該位,msgrcv函數就只是截短數據部分,而不返回錯誤。否則,ms_grcv返回一個E2BIC錯誤。

成功返回時,msgrcv返回的是所接收消息中數據的字節數。它不包括也通過ptr參數返回的長整數消息類型所需的幾個字節。

4.msgctl函數

msgctl函數提供在一個消息隊列上的各種控制操作。

#include <sys/msg.h>  
int msgctl(int msqid,int cmd,struct msqid_ds *buf);

msgctl函數提供3個命令。

IPC_RMID 從系統中刪除由msqid指定的消息隊列。當前在該隊列上的任何消息都被丟棄。對於該命令而言,msgctl函數的第三個參數被忽略。

IPC_SET 給所指定的消息隊列設置其msgid_ds結構的以下4個成員:msg_perm.uid,msg_perm.gid,msg_perm.mode和msg_qbytes。他們的值來自buff參數指向的結構中的相應的成員。

IPC_STAT 讀取消息隊列的屬性,並將其保存在buf指向的緩沖區中。

msgrcv.c用於接收消息,msgsend.c用於發送消息:

msgrcv.c

/* Here's the receiver program. */
      
#include <stdlib.h>  
#include <stdio.h>  
#include <string.h>  
#include <errno.h>  
#include <unistd.h>  
      
#include <sys/msg.h>  
      
      
struct my_msg_st {  
    long int my_msg_type;  
    char some_text[BUFSIZ];  
};  
      
int main()  
{  
    int running = 1;  
    int msgid;  
    struct my_msg_st some_data;  
    long int msg_to_receive = 0;  
      
/* First, we set up the message queue. */
      
    msgid = msgget((key_t)1234, 0666 | IPC_CREAT);  
      
    if (msgid == -1) {  
        fprintf(stderr, "msgget failed with error: %d\n", errno);  
        exit(EXIT_FAILURE);  
    }  
      
/* Then the messages are retrieved from the queue, until an end message is encountered. 
 Lastly, the message queue is deleted. */
      
    while(running) {  
        if (msgrcv(msgid, (void *)&some_data, BUFSIZ,msg_to_receive, 0) == -1) {  
            fprintf(stderr, "msgrcv failed with error: %d\n", errno);  
            exit(EXIT_FAILURE);  
        }  
        printf("You wrote: %s", some_data.some_text);  
        if (strncmp(some_data.some_text, "end", 3) == 0)  
            running = 0;  
    }  
      
    if (msgctl(msgid, IPC_RMID, 0) == -1) {  
        fprintf(stderr, "msgctl(IPC_RMID) failed\n");  
        exit(EXIT_FAILURE);  
    }  
      
    exit(EXIT_SUCCESS);  
}

msgsend.c

查看本欄目更多精彩內容:http://www.bianceng.cn/OS/unix/

/* The sender program is very similar to msg1.c. In the main set up, delete the 
 msg_to_receive declaration and replace it with buffer[BUFSIZ], remove the message 
 queue delete and make the following changes to the running loop. 
 We now have a call to msgsnd to send the entered text to the queue. */
      
#include <stdlib.h>  
#include <stdio.h>  
#include <string.h>  
#include <errno.h>  
#include <unistd.h>  
      
#include <sys/msg.h>  
      
#define MAX_TEXT 512  
      
struct my_msg_st {  
    long int my_msg_type;  
    char some_text[MAX_TEXT];  
};  
      
int main()  
{  
    int running = 1;  
    struct my_msg_st some_data;  
    int msgid;  
    char buffer[BUFSIZ];  
      
    msgid = msgget((key_t)1234, 0666 | IPC_CREAT);  
      
    if (msgid == -1) {  
        fprintf(stderr, "msgget failed with error: %d\n", errno);  
        exit(EXIT_FAILURE);  
    }  
      
    while(running) {  
        printf("Enter some text: ");  
        fgets(buffer, BUFSIZ, stdin);  
        some_data.my_msg_type = 1;  
        strcpy(some_data.some_text, buffer);  
      
        if (msgsnd(msgid, (void *)&some_data, MAX_TEXT, 0) == -1) {  
            fprintf(stderr, "msgsnd failed\n");  
            exit(EXIT_FAILURE);  
        }  
        if (strncmp(buffer, "end", 3) == 0)   
            running = 0;  
     }  
      
    exit(EXIT_SUCCESS);  
}

運行結果:

huangcheng@ubuntu:~$ ./msgsend  
Enter some text: ctt  
Enter some text: huangcheng  
Enter some text: end  
      
huangcheng@ubuntu:~$ ./msgrcv  
You wrote: ctt  
You wrote: huangcheng  
You wrote: end
Copyright © Linux教程網 All Rights Reserved