Message Queue is one of the IPC(Inter process communication).
What is message queue in IPC ?
It is queue that is used from inter process communication and it exists independently(if sender/es or receiver/es processes are terminated they will not effect the life of MQ) , It sends “block of data” and each block of data has a type.
Advantage:
- It is very easy and efficient way of communication.
- MQ exists independently of both sender and receiver process.
- We don’t have to think about of opening and closing of MQ like pipe.
Disadvantage:
- It has restriction on size of each block of data.
- It has restriction on number of message on queue.
Diagram:
Header file:
#include<sys/msg.h>
API’s:
int msgget((key_t)key, int msgflag); => create and access.
- key is the integer, it is name of MQ, it should be unique in running system.
- msgflag: this is related to permission, 6(Read+ Write) and IPC_CREAT is used to create a MQ but it will be ignored if MQ exists.
Example: int msgId = msgget((key_t)123456, 0666 | IPC_CREAT);
int msgsnd(int msgId, const void* msgptr, size_t msgSize, int msgFlag); => to send a block of data.
- msgID is the ID of MQ that was generated by msgget(), it is identity of MQ.
- msgptr is the pointer to “structure of message” that we want to send.
//Message structure struct my_msg { long int type; = This is type of message, its mandatory field. char text[gSize]; = This is actual message, it could be array or int or or other structure based on message that we want to send. };
- msgSize is the size of message, it does not include “long int type” of message.
- msgFlag is the flag of IPC_NOWAIT, if its set(NO wait) then send api will not wait and return immediately with -1 , if space is not available on MQ otherwise (IPC_NOWAIT is clear) api will wait and suspend until space is not available.
Example int ret = msgsnd(msgId, (void*)&data, gSize, 0);
int msgrcv(int msgId, void* msg_ptr, size_t gSize, long int msgType, int msgflag); => to receive a block of data
- msgID is the ID of MQ that was generated by msgget(), it is identity of MQ.
- msgptr is the pointer to “structure of message” that we want to receive.
- msgSize is the size of message, it does not include “long int type” of message.
- msgType is long int type and it defines the priority of message, it has 3 value:
- 0: first available message on queue will be retrieved.
- >0: Message with “same message type” will be retrieved, if value is 5 then only message with type 5 will be picked.
- <0: Message with “same message type” and all smaller type value will be retrieved, if value is -5 then message with type 5 and all smaller will be picked.
- msgFlag is the flag of IPC_NOWAIT, if its set(NO wait) then send api will not wait and return immediately with -1 , if space is not available on MQ otherwise (IPC_NOWAIT is clear) api will wait and suspend until space is not available.
Example: int ret = msgrcv(msgId, (void*)&data, gSize, msgToReceive, 0);
int msgctl(int msgId, int command, struct msqid_ds * buf); => control commands
- msgID is the ID of MQ that was generated by msgget(), it is identity of MQ.
- command is the action we want to perform.
- IPC_RMID: to remove the MQ
- IPC_SET:
- IPC_STAT:
Example int ret = msgctl(msgId, IPC_RMID, 0);
Source Code:
Sender Code:
#include<iostream> #include<string.h> #include<sys/msg.h> //MAX TEXT size static const int gSize = 512; bool gRunning = true; //Message structure struct my_msg { long int type; char text[gSize]; }; int main() { std::cout<<"In Main"<<std::endl; my_msg data; char buffer[gSize]; //create MQ int msgId = msgget((key_t)123456, 0666 | IPC_CREAT); if(msgId == -1) { std::cout<<"MQ Creation Failed"<<std::endl; return -1; } while(gRunning) { std::cout<<"Enter the message that you want send or enter end to stop "<<std::endl; std::cin>>buffer; data.type = 1; //this type of message strcpy(data.text, buffer); //send the message/ drop the message in queue if(msgsnd(msgId, (void*)&data, gSize, 0) == -1) { std::cout<<"MQ Send failed"<<std::endl; return -1; } if(strncmp(buffer, "end", 3) == 0) gRunning = false; } return 0; }
Reciever Code:
#include<iostream> #include<string.h> #include<sys/msg.h> //MAX TEXT size static const int gSize = 512; bool gRunning = true; //Message structure struct my_msg { long int type; char text[gSize]; }; int main() { std::cout<<"In Main"<<std::endl; my_msg data; char buffer[gSize]; long int msgToReceive = 0; //First available message on queue will be retrieved //Create the MQ or if MQ is exists then return the existing one int msgId = msgget((key_t)123456, 0666 | IPC_CREAT); if(msgId == -1) { std::cout<<"MQ Creation Failed"<<std::endl; return -1; } while(gRunning) { //Retrive the message from MQ if(msgrcv(msgId, (void*)&data, gSize, msgToReceive, 0) == -1) { std::cout<<"MQ Recieved Failed"<<std::endl; return -1; } std::cout<<data.text<<std::endl; if(strncmp(data.text, "end", 3)==0) { gRunning = false; } } //Delete the MQ if(msgctl(msgId, IPC_RMID, 0) == -1) { std::cout<<"MQ delete Failed"<<std::endl; return -1; } return 0; }
Output Code with executable commands:
First start the receiver executable as backed task.
alpha:~/IPC/MQ$ ls reciever.cpp sender.cpp alpha:~/IPC/MQ$ g++ -o rec reciever.cpp alpha:~/IPC/MQ$ ls rec reciever.cpp sender.cpp alpha:~/IPC/MQ$ ./rec & [1] 8551 alpha:~/IPC/MQ$ In Main Reciever is waiting for message from MQ, IPC_NOWAIT is clear DigestCPP Reciever is waiting for message from MQ, IPC_NOWAIT is clear CPP Reciever is waiting for message from MQ, IPC_NOWAIT is clear end [1]+ Done ./rec
Second start the sender executable.
alpha:~$ cd IPC/MQ alpha:~/IPC/MQ$ g++ -o sen sender.cpp alpha:~/IPC/MQ$ ls rec reciever.cpp sen sender.cpp alpha:~/IPC/MQ$ ./sen In Main Enter the message that you want send or enter end to stop DigestCPP Enter the message that you want send or enter end to stop CPP Enter the message that you want send or enter end to stop end alpha:~/IPC/MQ$
MQ is exist independently in system so we can check the status using below commands:
MQ life does not dependent on sender or receiver life so we can check the status of MQ.
ipcs -q
s is for “status”
q is for “queue”
rm is for “remove”
alpha:~/IPC/MQ$ ipcs -q ------ Message Queues -------- key msqid owner perms used-bytes messages 0x0001e240 98304 alpha 666 1024 2 alpha:~/IPC/MQ$ ipcrm -q 98304 alpha:~/IPC/MQ$ ipcs -q ------ Message Queues -------- key msqid owner perms used-bytes messages alpha:~/IPC/MQ$