1 /*
2  * Copyright (C) 2016 The Android Open Source Project
3  * Copyright (C) 2016 Mopria Alliance, Inc.
4  * Copyright (C) 2013 Hewlett-Packard Development Company, L.P.
5  *
6  * Licensed under the Apache License, Version 2.0 (the "License");
7  * you may not use this file except in compliance with the License.
8  * You may obtain a copy of the License at
9  *
10  *      http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 
19 #ifndef _GNU_SOURCE
20 #define _GNU_SOURCE
21 #endif
22 
23 #include <stdio.h>
24 #include <stdlib.h>
25 #include <string.h>
26 #include <pthread.h>
27 #include <semaphore.h>
28 
29 #include "wprint_msgq.h"
30 #include "wprint_debug.h"
31 
32 #define TAG "wprint_msgq"
33 
34 #define _SEM_NAME_LENGTH    16
35 
36 typedef struct {
37     msg_q_id msgq_id;
38     char name[_SEM_NAME_LENGTH];
39     int max_msgs;
40     int max_msg_length;
41     int num_msgs;
42     sem_t sem_count;
43     sem_t *sem_ptr;
44     pthread_mutex_t mutex;
45     pthread_mutexattr_t mutexattr;
46     unsigned long read_offset;
47     unsigned long write_offset;
48 } _msgq_hdr_t;
49 
msgQCreate(int max_msgs,int max_msg_length)50 msg_q_id msgQCreate(int max_msgs, int max_msg_length) {
51     _msgq_hdr_t *msgq;
52     int msgq_size;
53 
54     msgq_size = sizeof(_msgq_hdr_t) + max_msgs * max_msg_length;
55     msgq = (_msgq_hdr_t *) malloc((size_t)msgq_size);
56 
57     if (msgq) {
58         memset((char *) msgq, 0, (size_t)msgq_size);
59         msgq->msgq_id = (msg_q_id) msgq;
60         msgq->max_msgs = max_msgs;
61         msgq->max_msg_length = max_msg_length;
62         msgq->num_msgs = 0;
63 
64         // create a mutex to protect access to this structure
65         pthread_mutexattr_init(&(msgq->mutexattr));
66         pthread_mutexattr_settype(&(msgq->mutexattr), PTHREAD_MUTEX_RECURSIVE_NP);
67         pthread_mutex_init(&msgq->mutex, &msgq->mutexattr);
68 
69         // create a counting semaphore
70         msgq->sem_ptr = &msgq->sem_count;
71         sem_init(msgq->sem_ptr, 0, 0); // PRIVATE, EMPTY
72 
73         msgq->read_offset = 0;
74         msgq->write_offset = 0;
75     }
76     return ((msg_q_id) msgq);
77 }
78 
msgQDelete(msg_q_id msgQ)79 status_t msgQDelete(msg_q_id msgQ) {
80     _msgq_hdr_t *msgq = (msg_q_id) msgQ;
81 
82     if (msgq) {
83         pthread_mutex_lock(&(msgq->mutex));
84         if (msgq->num_msgs) {
85             LOGE("Warning msgQDelete() called on queue with %d messages", msgq->num_msgs);
86         }
87 
88         sem_destroy(&(msgq->sem_count));
89         pthread_mutex_unlock(&(msgq->mutex));
90         pthread_mutex_destroy(&(msgq->mutex));
91         free((void *) msgq);
92     }
93     return (msgq ? OK : ERROR);
94 }
95 
msgQSend(msg_q_id msgQ,const char * buffer,unsigned long nbytes,int timeout,int priority)96 status_t msgQSend(msg_q_id msgQ, const char *buffer, unsigned long nbytes, int timeout,
97         int priority) {
98     _msgq_hdr_t *msgq = (msg_q_id) msgQ;
99     char *msg_loc;
100     status_t result = ERROR;
101 
102     // validate function arguments
103     if (msgq && (timeout == NO_WAIT) && (priority == MSG_Q_FIFO)) {
104         pthread_mutex_lock(&(msgq->mutex));
105 
106         // ensure the message conforms to size limits and there is room in the msgQ
107         if ((nbytes <= msgq->max_msg_length) && (msgq->num_msgs < msgq->max_msgs)) {
108             msg_loc = (char *) msgq + sizeof(_msgq_hdr_t) +
109                     (msgq->write_offset * msgq->max_msg_length);
110             memcpy(msg_loc, buffer, nbytes);
111             msgq->write_offset = (msgq->write_offset + 1) % msgq->max_msgs;
112             msgq->num_msgs++;
113             sem_post(msgq->sem_ptr);
114             result = OK;
115         }
116 
117         pthread_mutex_unlock(&(msgq->mutex));
118     }
119     return result;
120 }
121 
msgQReceive(msg_q_id msgQ,char * buffer,unsigned long max_nbytes,int timeout)122 status_t msgQReceive(msg_q_id msgQ, char *buffer, unsigned long max_nbytes, int timeout) {
123     _msgq_hdr_t *msgq = (msg_q_id) msgQ;
124     char *msg_loc;
125     status_t result = ERROR;
126 
127     if (msgq && buffer && ((timeout == WAIT_FOREVER) || (timeout == NO_WAIT))) {
128         if (timeout == WAIT_FOREVER) {
129             result = (status_t) sem_wait(msgq->sem_ptr);
130         } else {
131             /* timeout is NO_WAIT */
132             result = (status_t) sem_trywait(msgq->sem_ptr);
133         }
134 
135         if (result == 0) {
136             pthread_mutex_lock(&(msgq->mutex));
137 
138             msg_loc = (char *) msgq + sizeof(_msgq_hdr_t) +
139                     (msgq->read_offset * msgq->max_msg_length);
140             memcpy(buffer, msg_loc, max_nbytes);
141             msgq->read_offset = (msgq->read_offset + 1) % msgq->max_msgs;
142             msgq->num_msgs--;
143             pthread_mutex_unlock(&(msgq->mutex));
144         }
145     }
146     return result;
147 }
148 
msgQNumMsgs(msg_q_id msgQ)149 int msgQNumMsgs(msg_q_id msgQ) {
150     _msgq_hdr_t *msgq = (msg_q_id) msgQ;
151     int num_msgs = -1;
152 
153     if (msgq) {
154         pthread_mutex_lock(&(msgq->mutex));
155         num_msgs = msgq->num_msgs;
156         pthread_mutex_unlock(&(msgq->mutex));
157     }
158     return num_msgs;
159 }
160