Server
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <time.h>
#include <unistd.h>
#include <pthread.h>
#include <errno.h>
#include <sys/msg.h>
#include "msgdefs.h"
#define OK 0
#define LOG_ERROR(msg) { printf("%s:%u: %s %s\n", \
__FUNCTION__, __LINE__, msg, \
strerror(errno)); }
int msgQueue;
int CreateMsgQueue(long id)
{
int msgId;
key_t key;
struct msqid_ds ds;
key = ftok("msgserv.c", id);
msgId = msgget(key, 0644 | IPC_CREAT);
if (msgId < 0)
{
LOG_ERROR("msgget");
}
if (EEXIST == errno)
{
// reuse
msgctl(msgId, IPC_STAT, &ds);
key = ftok("msgserv.key", id);
msgId = msgget(key, 0777 | IPC_SET);
}
return msgId;
}
int SendMsg(void *msg, size_t msgsz, int msgflg)
{
Msg_t *pMsg;
char *pData;
int retval = 0;
pMsg = malloc(sizeof(Msg_t) + msgsz);
pMsg->hdr.msgType = 1;
pMsg->hdr.dataSize = msgsz;
pData = (char*)(&pMsg->data[0]);
memcpy(pData, msg, msgsz);
retval = msgsnd(msgQueue, pMsg, msgsz, msgflg);
free(pMsg);
return retval;
}
void *SendMsgTask(void *arg)
{
Msg_t msg;
long tick = 0;
if (arg)
printf("This is task %d\n", *(int *) arg);
else
printf("This is task\n");
#if 1
do {
// send msg
printf("Sending tick=%lu\n", tick);
if (OK != SendMsg(&tick, sizeof(tick), 0))
{
LOG_ERROR("SendMsg failed");
}
tick++;
usleep(500000);
//sleep(1);
} while (tick < 50);
//send quit
msg.hdr.msgType = 2;
msgsnd(msgQueue, &msg, sizeof(msg), 0);
return NULL;
#endif
}
int main()
{
int retval;
pthread_t SendMsgTaskId;
pthread_attr_t attr;
pthread_attr_init(&attr);
msgQueue = CreateMsgQueue(MESSAGE1);
if (msgQueue > 0) {
printf("Start the thread..\n");
retval = pthread_create(&SendMsgTaskId, &attr, SendMsgTask, NULL);
if (OK == retval)
pthread_join(SendMsgTaskId, NULL);
}
else {
LOG_ERROR("CreateMsgQueue");
}
if (OK != msgctl(msgQueue, IPC_RMID, NULL))
{
LOG_ERROR("msgctl");
exit(1);
}
pthread_attr_destroy(&attr);
return 0;
}
Client
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <time.h>
#include <unistd.h>
#include <pthread.h>
#include <errno.h>
#include <sys/msg.h>
#include "msgdefs.h"
#define OK 0
#define LOG_ERROR(msg) { printf("%s:%u: %s %s\n", \
__FUNCTION__, __LINE__, msg, \
strerror(errno)); }
int msgQueue;
int InitMsgQueue(long id)
{
int msgId;
key_t key;
key = ftok("msgserv.c", id);
msgId = msgget(key, 0644);
if (OK != msgId)
{
LOG_ERROR("msgget");
}
return msgId;
}
int ReceiveMsg(Msg_t **pMsg, size_t *msgsz)
{
int sz;
int retval = 0;
sz = 256;
*pMsg = malloc(sizeof(Msg_t) + sz);
retval = msgrcv(msgQueue, *pMsg, sz, 0, 0);
*msgsz = sz;
return retval;
}
void *ReceiveMsgTask(void *arg)
{
Msg_t *pMsg;
size_t sz;
if (arg)
printf("This is task %d\n", *(int *) arg);
else
printf("This is task\n");
for(;;) {
// rcv msg
if (ReceiveMsg(&pMsg, &sz) < 0)
{
LOG_ERROR("ReceiveMsg");
return NULL;
}
else {
printf("MsgType = %lu\n", pMsg->hdr.msgType);
switch (pMsg->hdr.msgType) {
case 1:
printf("Received sz=%u, tick=%lu\n",
pMsg->hdr.dataSize,
*(long*)(&pMsg->data[0]));
break;
default:
printf("End of task\n");
free(pMsg);
return NULL;
}
}
// msg must be freed
free(pMsg);
}
return NULL;
}
int main()
{
int retval;
pthread_t ReceiveMsgTaskId;
pthread_attr_t attr;
pthread_attr_init(&attr);
msgQueue = InitMsgQueue(MESSAGE1);
if (msgQueue > 0) {
printf("Start the thread..\n");
retval = pthread_create(&ReceiveMsgTaskId, &attr, ReceiveMsgTask, NULL);
if (OK == retval)
pthread_join(ReceiveMsgTaskId, NULL);
}
else {
LOG_ERROR("InitMsgQueue");
}
pthread_attr_destroy(&attr);
return 0;
}
msgdefs.h
#ifndef __MSGDEFS_H__
#define __MSGDEFS_H__
enum {
MESSAGE1 = 0x10000000
};
#define MSG_DATA_SIZE 1024
typedef struct {
long msgType;
unsigned int dataSize;
} MsgHdr_t;
typedef struct {
MsgHdr_t hdr; /* our header */
char data[0]; // just a place holder
} Msg_t;
#endif