1 /*
2 * Copyright (C) 2016 The Android Open Source Project
3 * Copyright (C) 2016 Mopria Alliance, Inc.
4 * Copyright (C) 2013 Hewlett-Packard Development Company, L.P.
5 *
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19 #ifndef _GNU_SOURCE
20 #define _GNU_SOURCE
21 #endif
22
23 #include <stdio.h>
24 #include <stdlib.h>
25 #include <pthread.h>
26 #include <semaphore.h>
27
28 #include "wprint_msgq.h"
29 #include "wprint_debug.h"
30
31 #define TAG "wprint_msgq"
32
33 #define _SEM_NAME_LENGTH 16
34
35 typedef struct {
36 msg_q_id msgq_id;
37 char name[_SEM_NAME_LENGTH];
38 int max_msgs;
39 int max_msg_length;
40 int num_msgs;
41 sem_t sem_count;
42 sem_t *sem_ptr;
43 pthread_mutex_t mutex;
44 pthread_mutexattr_t mutexattr;
45 unsigned long read_offset;
46 unsigned long write_offset;
47 } _msgq_hdr_t;
48
msgQCreate(int max_msgs,int max_msg_length)49 msg_q_id msgQCreate(int max_msgs, int max_msg_length) {
50 _msgq_hdr_t *msgq;
51 int msgq_size;
52
53 msgq_size = sizeof(_msgq_hdr_t) + max_msgs * max_msg_length;
54 msgq = (_msgq_hdr_t *) malloc((size_t)msgq_size);
55
56 if (msgq) {
57 memset((char *) msgq, 0, (size_t)msgq_size);
58 msgq->msgq_id = (msg_q_id) msgq;
59 msgq->max_msgs = max_msgs;
60 msgq->max_msg_length = max_msg_length;
61 msgq->num_msgs = 0;
62
63 // create a mutex to protect access to this structure
64 pthread_mutexattr_init(&(msgq->mutexattr));
65 pthread_mutexattr_settype(&(msgq->mutexattr), PTHREAD_MUTEX_RECURSIVE_NP);
66 pthread_mutex_init(&msgq->mutex, &msgq->mutexattr);
67
68 // create a counting semaphore
69 msgq->sem_ptr = &msgq->sem_count;
70 sem_init(msgq->sem_ptr, 0, 0); // PRIVATE, EMPTY
71
72 msgq->read_offset = 0;
73 msgq->write_offset = 0;
74 }
75 return ((msg_q_id) msgq);
76 }
77
msgQDelete(msg_q_id msgQ)78 status_t msgQDelete(msg_q_id msgQ) {
79 _msgq_hdr_t *msgq = (msg_q_id) msgQ;
80
81 if (msgq) {
82 pthread_mutex_lock(&(msgq->mutex));
83 if (msgq->num_msgs) {
84 LOGE("Warning msgQDelete() called on queue with %d messages", msgq->num_msgs);
85 }
86
87 sem_destroy(&(msgq->sem_count));
88 pthread_mutex_unlock(&(msgq->mutex));
89 pthread_mutex_destroy(&(msgq->mutex));
90 free((void *) msgq);
91 }
92 return (msgq ? OK : ERROR);
93 }
94
msgQSend(msg_q_id msgQ,const char * buffer,unsigned long nbytes,int timeout,int priority)95 status_t msgQSend(msg_q_id msgQ, const char *buffer, unsigned long nbytes, int timeout,
96 int priority) {
97 _msgq_hdr_t *msgq = (msg_q_id) msgQ;
98 char *msg_loc;
99 status_t result = ERROR;
100
101 // validate function arguments
102 if (msgq && (timeout == NO_WAIT) && (priority == MSG_Q_FIFO)) {
103 pthread_mutex_lock(&(msgq->mutex));
104
105 // ensure the message conforms to size limits and there is room in the msgQ
106 if ((nbytes <= msgq->max_msg_length) && (msgq->num_msgs < msgq->max_msgs)) {
107 msg_loc = (char *) msgq + sizeof(_msgq_hdr_t) +
108 (msgq->write_offset * msgq->max_msg_length);
109 memcpy(msg_loc, buffer, nbytes);
110 msgq->write_offset = (msgq->write_offset + 1) % msgq->max_msgs;
111 msgq->num_msgs++;
112 sem_post(msgq->sem_ptr);
113 result = OK;
114 }
115
116 pthread_mutex_unlock(&(msgq->mutex));
117 }
118 return result;
119 }
120
msgQReceive(msg_q_id msgQ,char * buffer,unsigned long max_nbytes,int timeout)121 status_t msgQReceive(msg_q_id msgQ, char *buffer, unsigned long max_nbytes, int timeout) {
122 _msgq_hdr_t *msgq = (msg_q_id) msgQ;
123 char *msg_loc;
124 status_t result = ERROR;
125
126 if (msgq && buffer && ((timeout == WAIT_FOREVER) || (timeout == NO_WAIT))) {
127 if (timeout == WAIT_FOREVER) {
128 result = (status_t) sem_wait(msgq->sem_ptr);
129 } else {
130 /* timeout is NO_WAIT */
131 result = (status_t) sem_trywait(msgq->sem_ptr);
132 }
133
134 if (result == 0) {
135 pthread_mutex_lock(&(msgq->mutex));
136
137 msg_loc = (char *) msgq + sizeof(_msgq_hdr_t) +
138 (msgq->read_offset * msgq->max_msg_length);
139 memcpy(buffer, msg_loc, max_nbytes);
140 msgq->read_offset = (msgq->read_offset + 1) % msgq->max_msgs;
141 msgq->num_msgs--;
142 pthread_mutex_unlock(&(msgq->mutex));
143 }
144 }
145 return result;
146 }
147
msgQNumMsgs(msg_q_id msgQ)148 int msgQNumMsgs(msg_q_id msgQ) {
149 _msgq_hdr_t *msgq = (msg_q_id) msgQ;
150 int num_msgs = -1;
151
152 if (msgq) {
153 pthread_mutex_lock(&(msgq->mutex));
154 num_msgs = msgq->num_msgs;
155 pthread_mutex_unlock(&(msgq->mutex));
156 }
157 return num_msgs;
158 }