0%

【OS】HDU-OS-Lab3-Linux进程管理(三)消息队列

实验三的知识点是进程通信,进程通信的方式多种多样,既包括锁机制、信号量机制在内的低级通信方式,低级在于其交换的信息量少且效率较低,又包括共享服务器、消息传递系统、管道通信以及客户-服务器系统通信在内的高级通信方式,本实验是实验三的第三个部分,介绍了利用消息队列通信机制实现两个线程间通信的方法。

源码地址:

https://github.com/leslievan/Operator_System/tree/master/Operator_System_Lab3/Operator_System_Exp3_3

点此下载完整源码

基本介绍

消息缓冲队列通信机制是消息传递系统通信中直接通信方式的一种具体实现,基本思想为:通常由系统一管理一组用于通信的空闲消息缓冲区;某进程要发送消息时,首先在自己的地址空间中设置一个发送区,并把欲发送的消息填入其中形成消息,再申请一个消息缓冲区,把数据从发送区复制到消息缓冲区中,然后再把该消息缓冲区直接发送到接收进程的消息队列里;接收进程从自己的消息队列上取下消息缓冲区,并将其中的数据复制到自己的消息接收区中,最后释放消息缓冲区。

实验内容

利用Linux的消息队列通信机制实现两个线程间的通信

编写程序创建三个线程:sender1线程、sender2线程和receive线程,三个线程的功能描述如下:

  1. sender1线程:允许函数sender1(),它创建一个消息队列,然后等待用户通过终端输入一串字符,并将这串字符通过消息队列发送给receiver线程;可循环发送多个消息,直到用户输入“exit”为止,表示它不再发送消息,最后向receiver线程发送消息“end1”,并且等待 receiver的应答,等到应答消息后,接收到的应答信息显示在终端屏幕上,结束线程的运行。
  2. sender2线程:运行函数sender2(),共享sender1创建的消息队列,等待用户通过终端输入一串字符,并将这串字符通过消息队列发送给receiver线程;可循环发送多个消息,直到用户输入“exit”为止,表示它不再发送消息,最后向receiver线程发送消息“end2“,并且等待 receiver的应答,等到应答消息后将接收到的应答信息显示在终端屏幕上,结束线程的运行。
  3. Receiver线程:运行函数receive(),它通过消息队列接收来自sender1sender2两个线程的消息,将消息显示在终端屏幕上,当收到内容为“end1”的消息时,就向sender1发送一个应答消息“over1”;当收到内容为“end2”的消息时,就向sender2发送一个应答消息“over2”;消息接收完成后删除消息队列,结束线程的运行。选择合适的信号量机制实现三个线程之间的同步与互斥。

该实验主要难点在于消息队列的相关操作函数和对于三个线程之间的同步互斥关系。

消息队列

消息队列需要自定义一个消息缓冲区,这里设计一个只包含两个成员变量的结构体作为消息缓冲区:

1
2
3
4
struct msg_st {
long int message_type;
char buffer[MAX_SIZE + 1];
};

其中message_type为消息种类,buffer用来储存消息的数据段,最大可存储MAX_SIZE大小,+1操作为了给结尾留出\0

消息队列的相关操作有:

  1. int msgget(key_t key, int msgflg);获得消息队列的特征标识符。
    函数需要两个参数,keymsgflg
    key是该消息队列的全局唯一标识符,通过该标识符可以定位消息队列,对其进行相关操作。
    msgflg为权限控制,决定对消息队列进行的操作,一般使用0666 | IPC_CREAT,熟悉文件系统的可以知道,0666表示文件所有者、同组用户和其他用户对于该消息队列的权限均为可读可写,后面对常量IPC_CREAT进行的位运算作用是“若该队列未被创建则创建它”。(对于该函数可以简单理解为创建消息队列)
  2. ssize_t msgrcv(int msqid, void *msgp, size_t msgsz, long msgtyp, int msgflg);消息队列的接收操作。
    函数需要五个参数,msqidmsgpmsgszmsgtypmsgflg
    msqid是函数msgget的返回值,用于表示对哪一个消息队列进行操作。
    msgp是接收消息的指针,指向消息结构体msg_st
    msgsz是接收消息的大小,这里可以看作结构体msg_st中数据段的大小。
    msgtyp是接收消息的类别,函数可以接收指定类别的消息,默认为0,忽视类别,接收队首消息,正值和负值有不同含义,详情查看附录
    msgflg同函数msgget中的msgflg,这里可以直接使用0
    函数的返回值为实际接收到的消息字节数。
  3. int msgsnd(int msqid, const void *msgp, size_t msgsz, int msgflg);消息队列的发送操作。
    函数需要四个参数,msqidmsgpmsgszmsgflg
    参数含义可参考函数msgrcv
  4. int msgctl(int msqid, int cmd, struct msqid_ds *buf);消息队列的控制操作。
    函数需要三个参数,msqidcmdbuf
    msqid同上。
    cmd对消息队列的操作进行选择,需要用到的是IPC_RMID,用于移除msqid指向的消息队列,详情查看附录
    buf为消息队列进行操作的参数,这里不需用到,详情请查看附录

这部分书上应该更详细,可以看书。

demo
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
int mq;
struct msg_st buf;
ssize_t bytes_read, bytes_write;
int MAX_SIZE = 128;

/* Create a queue / Get a queue
* int msgget(key_t key, int msgflg)
*/
mq = msgget((key_t) QUEUE_ID, 0666 | IPC_CREAT);

/* Get message from queue, place the message to buf
* ssize_t msgrcv(int msqid, void *msgp, size_t msgsz, long msgtyp, int msgflg)
*/
bytes_read = msgrcv(mq, (void *) &buf, MAX_SIZE, 0, 0);
/* Get specific type message, such like 1 */
bytes_read = msgrcv(mq, (void *) &buf, MAX_SIZE, 1, 0);

/* Send message to queue with message in buf
* int msgsnd(int msqid, const void *msgp, size_t msgsz, int msgflg)
*/
bytes_read = msgrcv(mq, (void *) &buf, MAX_SIZE, 0, 0);
/* Send specific type message, suck like 1 */
bytes_read = msgrcv(mq, (void *) &buf, MAX_SIZE, 1, 0);

/* Remove(Destroy) the queue
* int msgctl(int msqid, int cmd, struct msqid_ds *buf)
*/
msgctl(mq, IPC_RMID, &t);

同步互斥关系

senderreceiver之间的进程同步比较简单,临界资源为消息队列。是有:

  1. receiver接收消息,sender发送消息,receiversender存在同步关系,使用full=0empty=1进行约束;
  2. sender之间存在互斥关系,两个发送线程不能同时工作 ,使用w_mutex=1进行约束;
  3. receiver等待发送进程结束后,返回应答,sender收到应答后进行输出,receiversender存在同步关系,使用over=0进行约束;
  4. 这里对于终端输出也进行了约束,使用rcv_dpsnd_dp进行约束,可以忽视这部分的处理,这一部分只是为了美观。

代码实现

发送线程

sender1
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
void *sender1() {
int mq;
struct msg_st buf;
ssize_t bytes_read;

/* open the mail queue */
mq = msgget((key_t) QUEUE_ID, 0666 | IPC_CREAT);
CHECK((key_t) -1 != mq);

do {
P(w_mutex); /* 进入互斥区 */
P(snd_dp); /* 这部分只是为了美观 */
printf("sender1> ");
V(rcv_dp);
fflush(stdout);

fgets(buf.buffer, BUFSIZ, stdin); /* 接收终端输入,置入buf.buffer */
buf.message_type = snd_to_rcv1; /* 设置消息类别,接收线程据此判断消息来源 */

/* send the message */
P(empty); /* 使用empty和full进行同步 */
CHECK(0 <= msgsnd(mq, (void*)&buf, MAX_SIZE, 0)); /* Check用于检测参数是否成立,不成立则报错,用于Debug,可忽视其实现,作为宏定义,定义在头文件中 */
V(full);
V(w_mutex);
/* 换行为了使线程随机切换,勿随意删除 */
printf("\n");
} while (strncmp(buf.buffer, MSG_STOP, strlen(MSG_STOP))); /* Terminate when receive 'exit' */

/* wait for response */
P(over);
bytes_read = msgrcv(mq, (void *) &buf, MAX_SIZE, rcv_to_snd1, 0);
CHECK(bytes_read >= 0);
printf("%s", buf.buffer);
printf("--------------------------------------------\n");
V(snd_dp);
pthread_exit(NULL);
}

接收线程

receiver
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
void *receiver() {
struct msg_st buf, over1, over2;
int mq, must_stop = 2;
struct msqid_ds t;

/* 定义两个结束信号 */
over1.message_type = 3;
strcpy(over1.buffer, "over1\n");
over2.message_type = 4;
strcpy(over2.buffer, "over2\n");

/* open the mail queue */
mq = msgget((key_t) QUEUE_ID, 0666 | IPC_CREAT);
CHECK((key_t) -1 != mq);

do {
ssize_t bytes_read, bytes_write;
/* receive the message */
P(full);
bytes_read = msgrcv(mq, (void *) &buf, MAX_SIZE, 0, 0);
V(empty);

CHECK(bytes_read >= 0);
/* 接收到"exit"信号 */
if (!strncmp(buf.buffer, MSG_STOP, strlen(MSG_STOP))) {
if (buf.message_type == 1) {
/* 写入并发送退出消息 */
bytes_write = msgsnd(mq, (void *) &over1, MAX_SIZE, 0);
CHECK(bytes_write >= 0);
V(over);
must_stop--;
} else if (buf.message_type == 2) {
/* 写入并发送退出消息 */
bytes_write = msgsnd(mq, (void *) &over2, MAX_SIZE, 0);
CHECK(bytes_write >= 0);
V(over);
must_stop--;
}
} else {
/* 正常处理消息 */
P(rcv_dp);
printf("Received%d: %s", buf.message_type, buf.buffer);
printf("--------------------------------------------\n");
V(snd_dp);
}
} while (must_stop); /* Terminate when no thread */


/* cleanup */
P(snd_dp);
CHECK(!msgctl(mq, IPC_RMID, &t));
pthread_exit(NULL);
}

实验结果

实验结果如下:

Result

附录

msgrcv

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
MSGRCV(3P)                   POSIX Programmer's Manual                   MSGRCV(3P)

PROLOG
This manual page is part of the POSIX Programmer's Manual. The Linux imple‐
mentation of this interface may differ (consult the corresponding Linux man‐
ual page for details of Linux behavior), or the interface may not be imple‐
mented on Linux.

NAME
msgrcv — XSI message receive operation

SYNOPSIS
#include

ssize_t msgrcv(int msqid, void *msgp, size_t msgsz, long msgtyp,
int msgflg);

DESCRIPTION
The msgrcv() function operates on XSI message queues (see the Base Defini‐
tions volume of POSIX.1‐2008, Section 3.225, Message Queue). It is unspeci‐
fied whether this function interoperates with the realtime interprocess com‐
munication facilities defined in Section 2.8, Realtime.

The msgrcv() function shall read a message from the queue associated with
the message queue identifier specified by msqid and place it in the user-de‐
fined buffer pointed to by msgp.

The application shall ensure that the argument msgp points to a user-defined
buffer that contains first a field of type long specifying the type of the
message, and then a data portion that holds the data bytes of the message.
The structure below is an example of what this user-defined buffer might
look like:

struct mymsg {
long mtype; /* Message type. */
char mtext[1]; /* Message text. */
}

The structure member mtype is the received message's type as specified by
the sending process.

The structure member mtext is the text of the message.

The argument msgsz specifies the size in bytes of mtext. The received mes‐
sage shall be truncated to msgsz bytes if it is larger than msgsz and (ms‐
gflg & MSG_NOERROR) is non-zero. The truncated part of the message shall be
lost and no indication of the truncation shall be given to the calling
process.

If the value of msgsz is greater than {SSIZE_MAX}, the result is implementa‐
tion-defined.

The argument msgtyp specifies the type of message requested as follows:

* If msgtyp is 0, the first message on the queue shall be received.

* If msgtyp is greater than 0, the first message of type msgtyp shall be
received.

* If msgtyp is less than 0, the first message of the lowest type that is
less than or equal to the absolute value of msgtyp shall be received.

The argument msgflg specifies the action to be taken if a message of the de‐
sired type is not on the queue. These are as follows:

* If (msgflg & IPC_NOWAIT) is non-zero, the calling thread shall return
immediately with a return value of −1 and errno set to [ENOMSG].

* If (msgflg & IPC_NOWAIT) is 0, the calling thread shall suspend execu‐
tion until one of the following occurs:

-- A message of the desired type is placed on the queue.

-- The message queue identifier msqid is removed from the system; when
this occurs, errno shall be set to [EIDRM] and −1 shall be returned.

-- The calling thread receives a signal that is to be caught; in this
case a message is not received and the calling thread resumes execu‐
tion in the manner prescribed in sigaction().

Upon successful completion, the following actions are taken with respect to
the data structure associated with msqid:

* msg_qnum shall be decremented by 1.

* msg_lrpid shall be set to the process ID of the calling process.

* msg_rtime shall be set to the current time, as described in Section
2.7.1, IPC General Description.

RETURN VALUE
Upon successful completion, msgrcv() shall return a value equal to the num‐
ber of bytes actually placed into the buffer mtext. Otherwise, no message
shall be received, msgrcv() shall return −1, and errno shall be set to indi‐
cate the error.

msgctl

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
MSGCTL(3P)                   POSIX Programmer's Manual                   MSGCTL(3P)

PROLOG
This manual page is part of the POSIX Programmer's Manual. The Linux imple‐
mentation of this interface may differ (consult the corresponding Linux man‐
ual page for details of Linux behavior), or the interface may not be imple‐
mented on Linux.

NAME
msgctl — XSI message control operations

SYNOPSIS
#include

int msgctl(int msqid, int cmd, struct msqid_ds *buf);

DESCRIPTION
The msgctl() function operates on XSI message queues (see the Base Defini‐
tions volume of POSIX.1‐2008, Section 3.225, Message Queue). It is unspeci‐
fied whether this function interoperates with the realtime interprocess com‐
munication facilities defined in Section 2.8, Realtime.

The msgctl() function shall provide message control operations as specified
by cmd. The following values for cmd, and the message control operations
they specify, are:

IPC_STAT Place the current value of each member of the msqid_ds data
structure associated with msqid into the structure pointed to by
buf. The contents of this structure are defined in .

IPC_SET Set the value of the following members of the msqid_ds data
structure associated with msqid to the corresponding value found
in the structure pointed to by buf:

msg_perm.uid
msg_perm.gid
msg_perm.mode
msg_qbytes

Also, the msg_ctime timestamp shall be set to the current time,
as described in Section 2.7.1, IPC General Description.

IPC_SET can only be executed by a process with appropriate priv‐
ileges or that has an effective user ID equal to the value of
msg_perm.cuid or msg_perm.uid in the msqid_ds data structure as‐
sociated with msqid. Only a process with appropriate privileges
can raise the value of msg_qbytes.

IPC_RMID Remove the message queue identifier specified by msqid from the
system and destroy the message queue and msqid_ds data structure
associated with it. IPC_RMD can only be executed by a process
with appropriate privileges or one that has an effective user ID
equal to the value of msg_perm.cuid or msg_perm.uid in the
msqid_ds data structure associated with msqid.

RETURN VALUE
Upon successful completion, msgctl() shall return 0; otherwise, it shall re‐
turn −1 and set errno to indicate the error.

相关阅读