DigestCPP

Lets Understand With Example

  • Home
  • Design Principal
  • Design Patterns
  • C++ 11 Features
  • C++11 Multithreading
  • Contact Us

Message Queue

Message Queue is one of the IPC(Inter process communication).

What is message queue in IPC ?

It is queue that is used from inter process communication and it exists independently(if sender/es or receiver/es processes are terminated they will not effect the life of MQ) , It sends “block of data” and each block of data has a type.

Advantage:

  • It is very easy and efficient way of communication.
  • MQ exists independently of both sender and receiver process.
  • We don’t have to think about of opening and closing of MQ like pipe.

Disadvantage:

  • It has restriction on size of each block of data.
  • It has restriction on number of message on queue.

Diagram:

MQ

Header file:

#include<sys/msg.h>

API’s:

int msgget((key_t)key, int msgflag); => create and access.

  • key is the integer, it is name of MQ, it should be unique in running system.
  • msgflag:  this is related to permission, 6(Read+ Write) and IPC_CREAT is used to create a MQ but it will be ignored if MQ exists.

Example: int msgId = msgget((key_t)123456, 0666 | IPC_CREAT);

int msgsnd(int msgId, const void* msgptr, size_t msgSize, int msgFlag);  => to send a block of data.

  • msgID is the ID of MQ that was generated by msgget(), it is identity of MQ.
  • msgptr is the pointer to “structure of message” that we want to send.
//Message structure
struct my_msg
{
	long int type;  = This is type of message, its mandatory field.
	char text[gSize]; = This is actual message, it could be array or int or 
                      or other structure based on message that we want to send. 
}; 
  • msgSize is the size of message, it does not include “long int type” of message.
  • msgFlag is the flag of IPC_NOWAIT, if its set(NO wait) then send api will not wait and return immediately with -1 , if space is not available on MQ otherwise (IPC_NOWAIT is clear) api will wait and suspend until space is not available.

Example int ret = msgsnd(msgId, (void*)&data, gSize, 0);

int msgrcv(int msgId, void* msg_ptr, size_t gSize, long int msgType, int msgflag); => to receive a block of data

  • msgID is the ID of MQ that was generated by msgget(), it is identity of MQ.
  • msgptr is the pointer to “structure of message” that we want to receive.
  • msgSize is the size of message, it does not include “long int type” of message.
  • msgType is long int type and it defines the priority of message, it has 3 value:
    • 0: first available message on queue will be retrieved.
    • >0: Message with “same message type” will be retrieved, if value is 5 then only message with type 5 will be picked.
    • <0: Message with  “same message type” and all smaller type value will be retrieved, if value is -5 then message with type 5 and all smaller will be picked.
  • msgFlag is the flag of IPC_NOWAIT, if its set(NO wait) then send api will not wait and return immediately with -1 , if space is not available on MQ otherwise (IPC_NOWAIT is clear) api will wait and suspend until space is not available.

Example: int ret = msgrcv(msgId, (void*)&data, gSize, msgToReceive, 0);

int msgctl(int msgId, int command, struct msqid_ds * buf); => control commands

  • msgID is the ID of MQ that was generated by msgget(), it is identity of MQ.
  • command is the action we want to perform.
    • IPC_RMID: to remove the MQ
    • IPC_SET:
    • IPC_STAT:

Example int ret = msgctl(msgId, IPC_RMID, 0);

Source Code:

Sender Code:

#include<iostream>
#include<string.h>
#include<sys/msg.h>

//MAX TEXT size
static const int gSize = 512;
bool gRunning = true;

//Message structure
struct my_msg
{
	long int type;
	char text[gSize];
}; 


int main()
{
	std::cout<<"In Main"<<std::endl;

	my_msg data;
	char buffer[gSize];
	
	//create MQ
	int msgId = msgget((key_t)123456, 0666 | IPC_CREAT);
	if(msgId == -1)
	{
		std::cout<<"MQ Creation Failed"<<std::endl;
		return -1;
	}

	while(gRunning)
	{
		std::cout<<"Enter the message that you want send or enter end to stop "<<std::endl;
		std::cin>>buffer;
		data.type = 1;   //this type of message
		strcpy(data.text, buffer);

		//send the message/ drop the message in queue
		if(msgsnd(msgId, (void*)&data, gSize, 0) == -1)
		{
			std::cout<<"MQ Send failed"<<std::endl;
			return -1;
		}

		if(strncmp(buffer, "end", 3) == 0)
		gRunning = false;
	}
	return 0;
}

Reciever Code:

#include<iostream>
#include<string.h>
#include<sys/msg.h>

//MAX TEXT size
static const int gSize = 512;
bool gRunning = true;

//Message structure
struct my_msg
{
	long int type;
	char text[gSize];
}; 


int main()
{
	std::cout<<"In Main"<<std::endl;

	my_msg data;
	char buffer[gSize];
	long int msgToReceive = 0; //First available message on queue will be retrieved

	//Create the MQ or if MQ is exists then return the existing one
	int msgId = msgget((key_t)123456, 0666 | IPC_CREAT);
	if(msgId == -1)
	{
		std::cout<<"MQ Creation Failed"<<std::endl;
		return -1;
	}

	while(gRunning)
	{
		//Retrive the message from MQ
		if(msgrcv(msgId, (void*)&data, gSize, msgToReceive, 0) == -1)
		{	
			std::cout<<"MQ Recieved Failed"<<std::endl;
			return -1;
		}
		
		std::cout<<data.text<<std::endl;
		if(strncmp(data.text, "end", 3)==0)
		{
			gRunning = false;
		}
		
	}

	//Delete the MQ	
	if(msgctl(msgId, IPC_RMID, 0) == -1)
	{
		std::cout<<"MQ delete Failed"<<std::endl;
		return -1;

	}
	
	return 0;
}

Output Code with executable commands:

First start the receiver executable as backed task.

alpha:~/IPC/MQ$ ls
reciever.cpp  sender.cpp
alpha:~/IPC/MQ$ g++ -o rec reciever.cpp 
alpha:~/IPC/MQ$ ls
rec  reciever.cpp  sender.cpp
alpha:~/IPC/MQ$ ./rec &
[1] 8551
alpha:~/IPC/MQ$ In Main
Reciever is waiting for message from MQ, IPC_NOWAIT is clear
DigestCPP
Reciever is waiting for message from MQ, IPC_NOWAIT is clear
CPP
Reciever is waiting for message from MQ, IPC_NOWAIT is clear
end
[1]+  Done                    ./rec

Second start the sender executable.

alpha:~$ cd IPC/MQ
alpha:~/IPC/MQ$ g++ -o sen sender.cpp 
alpha:~/IPC/MQ$ ls
rec  reciever.cpp  sen  sender.cpp
alpha:~/IPC/MQ$ ./sen 
In Main
Enter the message that you want send or enter end to stop 
DigestCPP
Enter the message that you want send or enter end to stop 
CPP
Enter the message that you want send or enter end to stop 
end
alpha:~/IPC/MQ$ 

MQ is exist independently in system so we can check the status using below commands:

MQ life does not dependent on sender or receiver life so we can check the status of MQ.

ipcs -q

s is for “status”

q is for “queue”

rm is for “remove”

alpha:~/IPC/MQ$ ipcs -q

------ Message Queues --------
key        msqid      owner      perms      used-bytes   messages    
0x0001e240 98304      alpha     666        1024         2           

alpha:~/IPC/MQ$ ipcrm -q 98304
alpha:~/IPC/MQ$ ipcs -q

------ Message Queues --------
key        msqid      owner      perms      used-bytes   messages    

alpha:~/IPC/MQ$ 

Primary Sidebar




DigestCPP © 2023. All rights reserved.

    About Privacy Policy Terms and Conditions Contact Us Disclaimer