base/ipc/mq/mq.c

Go to the documentation of this file.
00001 /* 00002 * pqueues interface for Real Time Linux. 00003 * 00004 * Copyright (©) 1999 Zentropic Computing, All rights reserved 00005 * 00006 * Authors: Trevor Woolven (trevw@zentropix.com) 00007 * 00008 * Original date: Thu 15 Jul 1999 00009 * 00010 * This program is free software; you can redistribute it and/or 00011 * modify it under the terms of the GNU General Public License as 00012 * published by the Free Software Foundation; either version 2 of the 00013 * License, or (at your option) any later version. 00014 * 00015 * This program is distributed in the hope that it will be useful, 00016 * but WITHOUT ANY WARRANTY; without even the implied warranty of 00017 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00018 * GNU General Public License for more details. 00019 * 00020 * You should have received a copy of the GNU General Public License 00021 * along with this program; if not, write to the Free Software 00022 * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. 00023 * 00024 * Timed services extension and user space integration for RTAI by 00025 * Paolo Mantegazza <mantegazza@aero.polimi.it>. 00026 * 2005, cleaned and revised Paolo Mantegazza <mantegazza@aero.polimi.it>. 00027 * 00028 */ 00029 00030 #include <linux/module.h> 00031 #include <linux/kernel.h> 00032 #include <linux/version.h> 00033 #include <linux/errno.h> 00034 #include <linux/stat.h> 00035 #include <asm/uaccess.h> 00036 #ifdef CONFIG_PROC_FS 00037 #include <linux/proc_fs.h> 00038 extern struct proc_dir_entry *rtai_proc_root; 00039 #endif 00040 #include <rtai_schedcore.h> 00041 #include <rtai_proc_fs.h> 00042 00043 MODULE_LICENSE("GPL"); 00044 00045 #define pthread_cond_t SEM 00046 #define pthread_mutex_t SEM 00047 #define pthread_mutex_init(mutex, attr) rt_mutex_init(mutex) 00048 #define pthread_mutex_unlock rt_mutex_unlock 00049 #define pthread_mutex_lock rt_mutex_lock 00050 #define pthread_mutex_destroy rt_mutex_destroy 00051 #define pthread_cond_init(cond, attr) rt_cond_init(cond) 00052 #define pthread_cond_wait rt_cond_wait 00053 #define pthread_cond_signal rt_cond_signal 00054 #define pthread_cond_destroy rt_cond_destroy 00055 00056 #ifndef OK 00057 #define OK 0 00058 #endif 00059 #ifndef ERROR 00060 #define ERROR -1 00061 #endif 00062 00063 /////////////////////////////////////////////////////////////////////////////// 00064 // LOCAL DEFINITIONS 00065 /////////////////////////////////////////////////////////////////////////////// 00066 00067 #define MAX_RT_TASKS 128 00068 00069 /////////////////////////////////////////////////////////////////////////////// 00070 // PACKAGE GLOBAL DATA 00071 /////////////////////////////////////////////////////////////////////////////// 00072 00073 static int num_pqueues = 0; 00074 static struct _pqueue_descr_struct rt_pqueue_descr[MAX_PQUEUES] = {{0}}; 00075 static struct _pqueue_access_struct task_pqueue_access[MAX_RT_TASKS] = {{0}}; 00076 static MQ_ATTR default_queue_attrs = { MAX_MSGS, MAX_MSGSIZE, MQ_NONBLOCK, 0 }; 00077 00078 static pthread_mutex_t pqueue_mutex; 00079 00080 /////////////////////////////////////////////////////////////////////////////// 00081 // LOCAL FUNCTIONS 00082 /////////////////////////////////////////////////////////////////////////////// 00083 00084 static int name_to_id(char *name) 00085 { 00086 int ind; 00087 for (ind = 0; ind < MAX_PQUEUES; ind++) { 00088 if ((strcmp(rt_pqueue_descr[ind].q_name, "") != 0) && 00089 (strcmp(rt_pqueue_descr[ind].q_name, name) == 0)) { 00090 return ind; 00091 } 00092 } 00093 return ERROR; 00094 } 00095 00096 00097 static inline mq_bool_t is_empty(struct queue_control *q) 00098 { 00099 return q->attrs.mq_curmsgs == 0 ? TRUE : FALSE; 00100 } 00101 00102 00103 static inline mq_bool_t is_full(struct queue_control *q) 00104 { 00105 return q->attrs.mq_curmsgs == q->attrs.mq_maxmsg ? TRUE : FALSE; 00106 } 00107 00108 00109 static inline MSG_HDR* getnode(Q_CTRL *queue) 00110 { 00111 return queue->nodind < queue->attrs.mq_maxmsg ? queue->nodes[queue->nodind++] : NULL; 00112 } 00113 00114 static inline int freenode(void *node, Q_CTRL *queue) 00115 { 00116 if (queue->nodind > 0) { 00117 queue->nodes[--queue->nodind] = node; 00118 return 0; 00119 } 00120 return -EINVAL; 00121 } 00122 00123 00124 static void insert_message(Q_CTRL *q, MSG_HDR *this_msg) 00125 { 00126 //This function finds the appropriate point in a priority queue to 00127 //insert the supplied message. It preserves FIFO order within each 00128 //priority levela and can therefore be used for FIFO queuing policies 00129 //simply by making the priority equal to the supplied message priority 00130 00131 MSG_HDR *prev, *insertpt; 00132 00133 //Do a quick check in case the message at the back of the queue has 00134 //a higher priority than this one, in which case this one can just 00135 //go at the back of the queue. 00136 //Remember that Posix priorities increase from 0 to (at least) 32 00137 00138 if (((MSG_HDR *)q->tail)->priority >= this_msg->priority) { 00139 ((MSG_HDR*)q->tail)->next = this_msg; 00140 q->tail = this_msg; 00141 } else { 00142 prev = insertpt = q->head; 00143 //POSIX queues preserve FIFO ordering of messages within 00144 //a particular priority level 00145 while (insertpt->priority >= this_msg->priority) { 00146 prev = insertpt; 00147 insertpt = insertpt->next; 00148 } 00149 //We've now found a message (or messages) of equal or lower 00150 //priority than the one we're trying to put onto the queue 00151 if (insertpt == q->head) { 00152 this_msg->next = q->head; 00153 q->head = this_msg; 00154 } else { 00155 this_msg->next = prev->next; 00156 prev->next = this_msg; 00157 } 00158 } 00159 } 00160 00161 #undef mqueues 00162 #define mqueues system_data_ptr 00163 00164 static mq_bool_t is_blocking(MSG_QUEUE *q) 00165 { 00166 int q_ind; 00167 struct _pqueue_access_data *aces; 00168 00169 aces = ((QUEUE_CTRL)_rt_whoami()->mqueues)->q_access; 00170 for (q_ind = 0; q_ind < MQ_OPEN_MAX; q_ind++) { 00171 if (aces[q_ind].q_id == q->q_id) { 00172 return (aces[q_ind].oflags & O_NONBLOCK) ? FALSE : TRUE; 00173 } 00174 } 00175 return FALSE; 00176 } 00177 00178 00179 static mq_bool_t can_access(MSG_QUEUE *q, Q_ACCESS access) 00180 { 00181 RT_TASK *caller = _rt_whoami(); 00182 00183 if (q->owner == caller ? (((access == FOR_READ) && (q->permissions & S_IRUSR)) || ((access == FOR_WRITE) && (q->permissions & S_IWUSR))) : (((access == FOR_READ) && (q->permissions & S_IRGRP)) || ((access == FOR_WRITE) && (q->permissions & S_IWGRP)))) { 00184 int q_ind; 00185 struct _pqueue_access_data *aces; 00186 struct _pqueue_access_struct *task_queue_data_ptr; 00187 int q_access_flags = 0; 00188 00189 task_queue_data_ptr = (QUEUE_CTRL)caller->mqueues; 00190 if (task_queue_data_ptr == NULL) { 00191 return FALSE; 00192 } 00193 aces = task_queue_data_ptr->q_access; 00194 for (q_ind = 0; q_ind < MQ_OPEN_MAX; q_ind++) { 00195 if (aces[q_ind].q_id == q->q_id) { 00196 q_access_flags = aces[q_ind].oflags; 00197 goto set_mode; 00198 } 00199 } 00200 return FALSE; 00201 set_mode: if (access == FOR_WRITE) { 00202 if ((q_access_flags & O_WRONLY) || (q_access_flags & O_RDWR)) { 00203 return TRUE; 00204 } 00205 } else { 00206 return TRUE; 00207 } 00208 } 00209 return FALSE; 00210 } 00211 00212 00213 static inline void initialise_queue(Q_CTRL *q) 00214 { 00215 int msg_size, msg_ind; 00216 void *msg_ptr; 00217 00218 msg_size = q->attrs.mq_msgsize + sizeof(MSG_HDR); 00219 msg_ptr = q->base; 00220 q->nodind = 0; 00221 q->nodes = msg_ptr + msg_size*q->attrs.mq_maxmsg; 00222 for (msg_ind = 0; msg_ind < q->attrs.mq_maxmsg; msg_ind++) { 00223 q->nodes[msg_ind] = msg_ptr; 00224 ((MSG_HDR *)msg_ptr)->size = 0; 00225 ((MSG_HDR *)msg_ptr)->priority = MQ_MIN_MSG_PRIORITY; 00226 ((MSG_HDR *)msg_ptr)->next = NULL; 00227 msg_ptr += msg_size; 00228 } 00229 } 00230 00231 00232 static void delete_queue(int q_index) 00233 { 00234 rt_free(rt_pqueue_descr[q_index].data.base); 00235 00236 rt_pqueue_descr[q_index].owner = NULL; 00237 rt_pqueue_descr[q_index].open_count = 0; 00238 strcpy(rt_pqueue_descr[q_index].q_name, "\0"); 00239 rt_pqueue_descr[q_index].q_id = INVALID_PQUEUE; 00240 rt_pqueue_descr[q_index].data.base = NULL; 00241 rt_pqueue_descr[q_index].data.head = NULL; 00242 rt_pqueue_descr[q_index].data.tail = NULL; 00243 rt_pqueue_descr[q_index].data.attrs = (MQ_ATTR){ 0, 0, 0, 0 }; 00244 rt_pqueue_descr[q_index].permissions = 0; 00245 00246 pthread_mutex_unlock(&rt_pqueue_descr[q_index].mutex); 00247 pthread_mutex_destroy(&rt_pqueue_descr[q_index].mutex); 00248 pthread_cond_destroy(&rt_pqueue_descr[q_index].emp_cond); 00249 pthread_cond_destroy(&rt_pqueue_descr[q_index].full_cond); 00250 00251 if (num_pqueues > 0) { 00252 num_pqueues--; 00253 } 00254 } 00255 00256 /////////////////////////////////////////////////////////////////////////////// 00257 // POSIX MESSAGE QUEUES API 00258 /////////////////////////////////////////////////////////////////////////////// 00259 00260 mqd_t mq_open(char *mq_name, int oflags, mode_t permissions, struct mq_attr *mq_attr) 00261 { 00262 int q_index, t_index, q_ind; 00263 int spare_count = 0, first_spare = 0; 00264 mq_bool_t q_found = FALSE; 00265 RT_TASK *this_task = _rt_whoami(); 00266 struct _pqueue_access_struct *task_data_ptr; 00267 00268 task_data_ptr = (QUEUE_CTRL)this_task->mqueues; 00269 00270 pthread_mutex_lock(&pqueue_mutex); 00271 if ((q_index = name_to_id(mq_name)) >= 0) { 00272 //======================== 00273 // OPEN AN EXISTING QUEUE 00274 //======================== 00275 if ((oflags & O_CREAT) && (oflags & O_EXCL)) { 00276 pthread_mutex_unlock(&pqueue_mutex); 00277 return -EEXIST; 00278 } 00279 if (task_data_ptr == NULL) { 00280 for (t_index = 0; t_index < MAX_RT_TASKS; t_index++) { 00281 if (task_pqueue_access[t_index].this_task == NULL) { 00282 task_data_ptr = &(task_pqueue_access[t_index]); 00283 task_data_ptr->this_task = this_task; 00284 this_task->mqueues = task_data_ptr; 00285 break; 00286 } 00287 } 00288 if (t_index == MAX_RT_TASKS) { 00289 pthread_mutex_unlock(&pqueue_mutex); 00290 return -ENOMEM; 00291 } 00292 } 00293 //Now record that this task has opened this queue and 00294 //the access permissions required 00295 //Check first to see if this task has already opened this queue 00296 //and while doing so, record the number of spare 'slots' for this 00297 //task to have further opened queues 00298 pthread_mutex_lock(&rt_pqueue_descr[q_index].mutex); 00299 for (q_ind = 0; q_ind < MQ_OPEN_MAX; q_ind++) { 00300 if (task_data_ptr->q_access[q_ind].q_id == rt_pqueue_descr[q_index].q_id) { 00301 q_found = TRUE; 00302 break; 00303 } else if(task_data_ptr->q_access[q_ind].q_id == INVALID_PQUEUE) { 00304 if (spare_count == 0) { 00305 first_spare = q_ind; 00306 } 00307 spare_count++; 00308 } 00309 } 00310 //If the task has not already opened this queue and there are no 00311 //more available slots, can't do anymore... 00312 if (!q_found && spare_count == 0) { 00313 pthread_mutex_unlock(&rt_pqueue_descr[q_index].mutex); 00314 pthread_mutex_unlock(&pqueue_mutex); 00315 return -EINVAL; 00316 } 00317 //Either the queue has already been opened and so we can re-use 00318 //it's slot, or a new one is being opened in an unused slot 00319 if (!q_found) { 00320 //Open a new one, using the first free slot 00321 task_data_ptr->n_open_pqueues++; 00322 q_ind = first_spare; 00323 } 00324 task_data_ptr->q_access[q_ind].q_id = rt_pqueue_descr[q_index].q_id; 00325 task_data_ptr->q_access[q_ind].oflags = oflags; 00326 pthread_mutex_unlock(&rt_pqueue_descr[q_index].mutex); 00327 } else if (oflags & O_CREAT) { 00328 //================ 00329 // CREATE A QUEUE 00330 //================ 00331 if(num_pqueues >= MAX_PQUEUES) { 00332 pthread_mutex_unlock(&pqueue_mutex); 00333 return -ENOMEM; 00334 } 00335 //Check the size of the name 00336 if( strlen(mq_name) >= MQ_NAME_MAX) { 00337 pthread_mutex_unlock(&pqueue_mutex); 00338 return -ENAMETOOLONG; 00339 } 00340 //Allocate a task pqueue access structure to this task, if necessary. 00341 //Otherwise, check that this task has not already opened too many 00342 //queues 00343 // 00344 if (task_data_ptr == NULL) { 00345 //Find a spare task pqueue access slot for this task 00346 for (t_index = 0; t_index < MAX_RT_TASKS; t_index++) { 00347 if (task_pqueue_access[t_index].this_task == NULL) { 00348 task_data_ptr = &task_pqueue_access[t_index]; 00349 task_data_ptr->this_task = this_task; 00350 this_task->mqueues = task_data_ptr; 00351 break; 00352 } 00353 } 00354 if (t_index == MAX_RT_TASKS) { 00355 pthread_mutex_unlock(&pqueue_mutex); 00356 return -ENOMEM; 00357 } 00358 } else if (task_data_ptr->n_open_pqueues >= MQ_OPEN_MAX) { 00359 pthread_mutex_unlock(&pqueue_mutex); 00360 return -EINVAL; 00361 } 00362 //Look for default queue attributes 00363 if (mq_attr == NULL) { 00364 mq_attr = &default_queue_attrs; 00365 } 00366 //Find a spare descriptor for this queue 00367 for (q_index = 0; q_index < MAX_PQUEUES; q_index++) { 00368 if (rt_pqueue_descr[q_index].q_id == INVALID_PQUEUE) { 00369 int msg_size, queue_size; 00370 void *mem_ptr; 00371 //Get memory for the data queue 00372 msg_size = mq_attr->mq_msgsize + sizeof(MSG_HDR); 00373 queue_size = (msg_size + sizeof(void *))*mq_attr->mq_maxmsg; 00374 mem_ptr = rt_malloc(queue_size); 00375 if(mem_ptr == NULL) { 00376 pthread_mutex_unlock(&pqueue_mutex); 00377 return -ENOMEM; 00378 } 00379 rt_pqueue_descr[q_index].data.base = mem_ptr; 00380 //Initialise the Message Queue descriptor 00381 rt_pqueue_descr[q_index].owner = this_task; 00382 rt_pqueue_descr[q_index].open_count = 0; 00383 strcpy(rt_pqueue_descr[q_index].q_name, mq_name); 00384 rt_pqueue_descr[q_index].q_id = q_index + 1; 00385 rt_pqueue_descr[q_index].marked_for_deletion = FALSE; 00386 rt_pqueue_descr[q_index].data.head = 00387 rt_pqueue_descr[q_index].data.tail = rt_pqueue_descr[q_index].data.base; 00388 rt_pqueue_descr[q_index].data.attrs = *(mq_attr); 00389 rt_pqueue_descr[q_index].data.attrs.mq_curmsgs = 0; 00390 rt_pqueue_descr[q_index].permissions = permissions; 00391 //Initialise conditional variables used for blocking 00392 pthread_cond_init(&rt_pqueue_descr[q_index].emp_cond, NULL); 00393 pthread_cond_init(&rt_pqueue_descr[q_index].full_cond, NULL); 00394 pthread_mutex_init(&rt_pqueue_descr[q_index].mutex, NULL); 00395 00396 //Clear the queue contents 00397 initialise_queue(&rt_pqueue_descr[q_index].data); 00398 //Initialise the Task Queue access descriptor 00399 q_ind = task_data_ptr->n_open_pqueues++; 00400 task_data_ptr->q_access[q_ind].q_id = q_index + 1; 00401 task_data_ptr->q_access[q_ind].oflags = oflags; 00402 break; 00403 } 00404 } 00405 if(q_index >= MAX_PQUEUES) { 00406 pthread_mutex_unlock(&pqueue_mutex); 00407 return -EMFILE; 00408 } 00409 num_pqueues++; 00410 } else { 00411 //============================== 00412 // OPENING A NON-EXISTANT QUEUE 00413 //============================== 00414 pthread_mutex_unlock(&pqueue_mutex); 00415 return -ENOENT; 00416 } 00417 00418 TRACE_RTAI_POSIX(TRACE_RTAI_EV_POSIX_MQ_OPEN, rt_pqueue_descr[q_index].q_id, 0, 0); 00419 00420 // Return the message queue's id and mark it as open 00421 rt_pqueue_descr[q_index].open_count++; 00422 pthread_mutex_unlock(&pqueue_mutex); 00423 return (mqd_t)rt_pqueue_descr[q_index].q_id; 00424 } 00425 00426 00427 size_t _mq_receive(mqd_t mq, char *msg_buffer, size_t buflen, unsigned int *msgprio, int space) 00428 { 00429 int q_index = mq - 1, size; 00430 MQMSG *msg_ptr; 00431 MSG_QUEUE *q; 00432 00433 TRACE_RTAI_POSIX(TRACE_RTAI_EV_POSIX_MQ_RECV, mq, buflen, 0); 00434 00435 if (q_index < 0 || q_index >= MAX_PQUEUES) { 00436 return -EBADF; 00437 } 00438 q = &rt_pqueue_descr[q_index]; 00439 if (can_access(q, FOR_READ) == FALSE) { 00440 return -EINVAL; 00441 } 00442 pthread_mutex_lock(&q->mutex); 00443 while (is_empty(&q->data)) { 00444 if (is_blocking(q)) { 00445 pthread_cond_wait(&q->emp_cond, &q->mutex); 00446 } else { 00447 pthread_mutex_unlock(&q->mutex); 00448 return -EAGAIN; 00449 } 00450 } 00451 msg_ptr = q->data.head; 00452 if (msg_ptr->hdr.size <= buflen) { 00453 size = msg_ptr->hdr.size; 00454 if (space) { 00455 memcpy(msg_buffer, &msg_ptr->data, size); 00456 *msgprio = msg_ptr->hdr.priority; 00457 } else { 00458 copy_to_user(msg_buffer, &msg_ptr->data, size); 00459 copy_to_user(msgprio, &msg_ptr->hdr.priority, sizeof(msgprio)); 00460 } 00461 } else { 00462 size = ERROR; 00463 } 00464 00465 q->data.head = msg_ptr->hdr.next; 00466 if(q->data.head == NULL) { 00467 q->data.head = q->data.tail = q->data.base; 00468 } 00469 freenode(msg_ptr, &q->data); 00470 msg_ptr->hdr.size = 0; 00471 msg_ptr->hdr.next = NULL; 00472 rt_pqueue_descr[q_index].data.attrs.mq_curmsgs--; 00473 00474 pthread_cond_signal(&q->full_cond); 00475 pthread_mutex_unlock(&q->mutex); 00476 00477 return size; 00478 } 00479 00480 00481 size_t _mq_timedreceive(mqd_t mq, char *msg_buffer, size_t buflen, unsigned int *msgprio, const struct timespec *abstime, int space) 00482 { 00483 int q_index = mq - 1, size; 00484 MQMSG *msg_ptr; 00485 MSG_QUEUE *q; 00486 00487 TRACE_RTAI_POSIX(TRACE_RTAI_EV_POSIX_MQ_RECV, mq, buflen, 0); 00488 00489 if (q_index < 0 || q_index >= MAX_PQUEUES) { 00490 return -EBADF; 00491 } 00492 q = &rt_pqueue_descr[q_index]; 00493 if (can_access(q, FOR_READ) == FALSE) { 00494 return -EINVAL; 00495 } 00496 pthread_mutex_lock(&q->mutex); 00497 while (is_empty(&q->data)) { 00498 if (is_blocking(q)) { 00499 struct timespec time; 00500 if (!space) { 00501 copy_from_user(&time, abstime, sizeof(struct timespec)); 00502 abstime = &time; 00503 } 00504 if (rt_cond_wait_until(&q->emp_cond, &q->mutex, timespec2count(abstime)) > 1) { 00505 pthread_mutex_unlock(&q->mutex); 00506 return -ETIMEDOUT; 00507 } 00508 } else { 00509 return -EAGAIN; 00510 } 00511 } 00512 msg_ptr = q->data.head; 00513 if (buflen < q->data.attrs.mq_msgsize) { 00514 pthread_mutex_unlock(&q->mutex); 00515 return -EMSGSIZE; 00516 } 00517 if (msg_ptr->hdr.size <= buflen) { 00518 size = msg_ptr->hdr.size; 00519 if (space) { 00520 memcpy(msg_buffer, &msg_ptr->data, size); 00521 *msgprio = msg_ptr->hdr.priority; 00522 } else { 00523 copy_to_user(msg_buffer, &msg_ptr->data, size); 00524 copy_to_user(msgprio, &msg_ptr->hdr.priority, sizeof(msgprio)); 00525 } 00526 } else { 00527 size = ERROR; 00528 } 00529 00530 q->data.head = msg_ptr->hdr.next; 00531 if(q->data.head == NULL) { 00532 q->data.head = q->data.tail = q->data.base; 00533 } 00534 freenode(msg_ptr, &q->data); 00535 msg_ptr->hdr.size = 0; 00536 msg_ptr->hdr.next = NULL; 00537 rt_pqueue_descr[q_index].data.attrs.mq_curmsgs--; 00538 00539 pthread_cond_signal(&q->full_cond); 00540 pthread_mutex_unlock(&q->mutex); 00541 00542 return size; 00543 } 00544 00545 00546 int _mq_send(mqd_t mq, const char *msg, size_t msglen, unsigned int msgprio, int space) 00547 { 00548 int q_index = mq - 1; 00549 MSG_QUEUE *q; 00550 MSG_HDR *this_msg; 00551 mq_bool_t q_was_empty; 00552 00553 TRACE_RTAI_POSIX(TRACE_RTAI_EV_POSIX_MQ_SEND, mq, msglen, msgprio); 00554 00555 if (q_index < 0 || q_index >= MAX_PQUEUES) { 00556 return -EBADF; 00557 } 00558 q = &rt_pqueue_descr[q_index]; 00559 if( can_access(q, FOR_WRITE) == FALSE) { 00560 return -EINVAL; 00561 } 00562 if(msgprio > MQ_PRIO_MAX) { 00563 return -EINVAL; 00564 } 00565 pthread_mutex_lock(&q->mutex); 00566 while (is_full(&q->data)) { 00567 if (is_blocking(q)) { 00568 pthread_cond_wait(&q->full_cond, &q->mutex); 00569 } else { 00570 pthread_mutex_unlock(&q->mutex); 00571 return -EAGAIN; 00572 } 00573 } 00574 if( (this_msg = getnode(&q->data)) == NULL) { 00575 pthread_mutex_unlock(&q->mutex); 00576 return -ENOBUFS; 00577 } 00578 q_was_empty = is_empty(&q->data); 00579 q->data.attrs.mq_curmsgs++; 00580 if (msglen > q->data.attrs.mq_msgsize) { 00581 pthread_mutex_unlock(&q->mutex); 00582 return -EMSGSIZE; 00583 } 00584 this_msg->size = msglen; 00585 this_msg->priority = msgprio; 00586 if (space) { 00587 memcpy(&((MQMSG *)this_msg)->data, msg, msglen); 00588 } else { 00589 copy_from_user(&((MQMSG *)this_msg)->data, msg, msglen); 00590 } 00591 insert_message(&q->data, this_msg); 00592 pthread_cond_signal(&q->emp_cond); 00593 00594 if(q_was_empty && rt_pqueue_descr[q_index].notify.task != NULL) { 00595 //TODO: The bit that actually goes here!........... 00596 //Need to think about SIGNALS, the content of struct sigevent 00597 //and how these are/not supported under RTAI 00598 //...then do some rt_schedule() McHackery... 00599 00600 //Finally, remove the notification 00601 rt_pqueue_descr[q_index].notify.task = NULL; 00602 } 00603 pthread_mutex_unlock(&q->mutex); 00604 return msglen; 00605 } 00606 00607 00608 int _mq_timedsend(mqd_t mq, const char *msg, size_t msglen, unsigned int msgprio, const struct timespec *abstime, int space) 00609 { 00610 int q_index = mq - 1; 00611 MSG_QUEUE *q; 00612 MSG_HDR *this_msg; 00613 mq_bool_t q_was_empty; 00614 00615 TRACE_RTAI_POSIX(TRACE_RTAI_EV_POSIX_MQ_SEND, mq, msglen, msgprio); 00616 00617 if (q_index < 0 || q_index >= MAX_PQUEUES) { 00618 return -EBADF; 00619 } 00620 q = &rt_pqueue_descr[q_index]; 00621 if (can_access(q, FOR_WRITE) == FALSE) { 00622 return -EINVAL; 00623 } 00624 if (msgprio > MQ_PRIO_MAX) { 00625 return -EINVAL; 00626 } 00627 pthread_mutex_lock(&q->mutex); 00628 while (is_full(&q->data)) { 00629 if (is_blocking(q)) { 00630 struct timespec time; 00631 if (!space) { 00632 copy_from_user(&time, abstime, sizeof(struct timespec)); 00633 abstime = &time; 00634 } 00635 if (rt_cond_wait_until(&q->full_cond, &q->mutex, timespec2count(abstime)) > 1) { 00636 pthread_mutex_unlock(&q->mutex); 00637 return -ETIMEDOUT; 00638 } 00639 } else { 00640 pthread_mutex_unlock(&q->mutex); 00641 return -EAGAIN; 00642 } 00643 } 00644 if ((this_msg = getnode(&q->data)) == NULL) { 00645 pthread_mutex_unlock(&q->mutex); 00646 return -ENOBUFS; 00647 } 00648 q_was_empty = is_empty(&q->data); 00649 q->data.attrs.mq_curmsgs++; 00650 if (msglen > q->data.attrs.mq_msgsize) { 00651 pthread_mutex_unlock(&q->mutex); 00652 return -EMSGSIZE; 00653 } 00654 this_msg->size = msglen; 00655 this_msg->priority = msgprio; 00656 if (space) { 00657 memcpy(&((MQMSG *)this_msg)->data, msg, msglen); 00658 } else { 00659 copy_from_user(&((MQMSG *)this_msg)->data, msg, msglen); 00660 } 00661 insert_message(&q->data, this_msg); 00662 pthread_cond_signal(&q->emp_cond); 00663 00664 if (q_was_empty && rt_pqueue_descr[q_index].notify.task != NULL) { 00665 00666 //TODO: The bit that actually goes here!........... 00667 //Need to think about SIGNALS and the content of struct sigevent 00668 //and how these are/not supported under RTAI 00669 //...then do some rt_schedule() McHackery... 00670 00671 //Finally, remove the notification 00672 rt_pqueue_descr[q_index].notify.task = NULL; 00673 } 00674 pthread_mutex_unlock(&q->mutex); 00675 return msglen; 00676 00677 } 00678 00679 00680 int mq_close(mqd_t mq) 00681 { 00682 int q_index = mq - 1; 00683 int q_ind; 00684 RT_TASK *this_task = _rt_whoami(); 00685 struct _pqueue_access_struct *task_queue_data_ptr; 00686 00687 TRACE_RTAI_POSIX(TRACE_RTAI_EV_POSIX_MQ_CLOSE, mq, 0, 0); 00688 00689 if (q_index < 0 || q_index >= MAX_PQUEUES) { 00690 return -EINVAL; 00691 } 00692 task_queue_data_ptr = (QUEUE_CTRL)this_task->mqueues; 00693 if (task_queue_data_ptr == NULL ) { 00694 return -EINVAL; 00695 } 00696 pthread_mutex_lock(&pqueue_mutex); 00697 for (q_ind = 0; q_ind < MQ_OPEN_MAX; q_ind++) { 00698 if (task_queue_data_ptr->q_access[q_ind].q_id == mq) { 00699 task_queue_data_ptr->q_access[q_ind].q_id = INVALID_PQUEUE; 00700 task_queue_data_ptr->n_open_pqueues--; 00701 break; 00702 } 00703 } 00704 if (q_ind == MQ_OPEN_MAX) { 00705 pthread_mutex_unlock(&pqueue_mutex); 00706 return -EINVAL; 00707 } 00708 pthread_mutex_lock(&rt_pqueue_descr[q_index].mutex); 00709 if (rt_pqueue_descr[q_index].notify.task == this_task) { 00710 rt_pqueue_descr[q_index].notify.task = NULL; 00711 } 00712 if (--rt_pqueue_descr[q_index].open_count <= 0 && 00713 rt_pqueue_descr[q_index].marked_for_deletion == TRUE ) { 00714 delete_queue(q_index); 00715 } else { 00716 pthread_mutex_unlock(&rt_pqueue_descr[q_index].mutex); 00717 } 00718 pthread_mutex_unlock(&pqueue_mutex); 00719 return OK; 00720 } 00721 00722 00723 int mq_getattr(mqd_t mq, struct mq_attr *attrbuf) 00724 { 00725 int q_index = mq - 1; 00726 00727 TRACE_RTAI_POSIX(TRACE_RTAI_EV_POSIX_MQ_GET_ATTR, mq, 0, 0); 00728 00729 if (0 <= q_index && q_index < MAX_PQUEUES) { 00730 *attrbuf = rt_pqueue_descr[q_index].data.attrs; 00731 return OK; 00732 } 00733 return -EBADF; 00734 } 00735 00736 00737 int mq_setattr(mqd_t mq, const struct mq_attr *new_attrs, struct mq_attr *old_attrs) 00738 { 00739 int q_index = mq - 1; 00740 int q_ind; 00741 RT_TASK *this_task = _rt_whoami(); 00742 struct _pqueue_access_struct *task_queue_data_ptr; 00743 00744 TRACE_RTAI_POSIX(TRACE_RTAI_EV_POSIX_MQ_SET_ATTR, mq, 0, 0); 00745 00746 if (q_index < 0 || q_index >= MAX_PQUEUES) { 00747 return -EBADF; 00748 } 00749 if (old_attrs != NULL) { 00750 *old_attrs = rt_pqueue_descr[q_index].data.attrs; 00751 } 00752 task_queue_data_ptr = (QUEUE_CTRL)this_task->mqueues; 00753 if (task_queue_data_ptr == NULL) { 00754 return -EINVAL; 00755 } 00756 for (q_ind = 0; q_ind < MQ_OPEN_MAX; q_ind++) { 00757 if (task_queue_data_ptr->q_access[q_ind].q_id == mq) { 00758 if(new_attrs->mq_flags == MQ_NONBLOCK) { 00759 task_queue_data_ptr->q_access[q_ind].oflags |= O_NONBLOCK; 00760 } else if (new_attrs->mq_flags == MQ_BLOCK) { 00761 task_queue_data_ptr->q_access[q_ind].oflags &= ~O_NONBLOCK; 00762 } else { 00763 return -EINVAL; 00764 } 00765 break; 00766 } 00767 } 00768 if (q_ind == MQ_OPEN_MAX) { 00769 return -EINVAL; 00770 } 00771 pthread_mutex_lock(&rt_pqueue_descr[q_index].mutex); 00772 rt_pqueue_descr[q_index].data.attrs.mq_flags = new_attrs->mq_flags; 00773 pthread_mutex_unlock(&rt_pqueue_descr[q_index].mutex); 00774 return OK; 00775 } 00776 00777 00778 int mq_notify(mqd_t mq, const struct sigevent *notification) 00779 { 00780 int q_index = mq - 1; 00781 int rtn; 00782 00783 TRACE_RTAI_POSIX(TRACE_RTAI_EV_POSIX_MQ_NOTIFY, mq, 0, 0); 00784 00785 if (q_index < 0 || q_index >= MAX_PQUEUES) { 00786 return -EBADF; 00787 } 00788 pthread_mutex_lock(&rt_pqueue_descr[q_index].mutex); 00789 if (notification != NULL) { 00790 if (rt_pqueue_descr[q_index].notify.task != NULL) { 00791 rt_pqueue_descr[q_index].notify.task = _rt_whoami(); 00792 rt_pqueue_descr[q_index].notify.data = *notification; 00793 rtn = OK; 00794 } else { 00795 rtn = ERROR; 00796 } 00797 } else { 00798 if (rt_pqueue_descr[q_index].notify.task == _rt_whoami()) { 00799 rt_pqueue_descr[q_index].notify.task = NULL; 00800 rtn = OK; 00801 } else { 00802 rtn = ERROR; 00803 } 00804 } 00805 pthread_mutex_unlock(&rt_pqueue_descr[q_index].mutex); 00806 return rtn; 00807 } 00808 00809 int mq_unlink(char *mq_name) 00810 { 00811 int q_index, rtn; 00812 00813 pthread_mutex_lock(&pqueue_mutex); 00814 q_index = name_to_id(mq_name); 00815 00816 TRACE_RTAI_POSIX(TRACE_RTAI_EV_POSIX_MQ_UNLINK, q_index, 0, 0); 00817 00818 if (q_index < 0) { 00819 pthread_mutex_unlock(&pqueue_mutex); 00820 return -ENOENT; 00821 } 00822 pthread_mutex_lock(&rt_pqueue_descr[q_index].mutex); 00823 if (rt_pqueue_descr[q_index].open_count > 0) { 00824 strcpy(rt_pqueue_descr[q_index].q_name, "\0"); 00825 rt_pqueue_descr[q_index].marked_for_deletion = TRUE; 00826 pthread_mutex_unlock(&rt_pqueue_descr[q_index].mutex); 00827 rtn = rt_pqueue_descr[q_index].open_count; 00828 } else { 00829 delete_queue(q_index); 00830 rtn = OK; 00831 } 00832 pthread_mutex_unlock(&pqueue_mutex); 00833 return rtn; 00834 } 00835 00836 /////////////////////////////////////////////////////////////////////////////// 00837 // PROC FILESYSTEM SECTION 00838 /////////////////////////////////////////////////////////////////////////////// 00839 00840 #ifdef CONFIG_PROC_FS 00841 00842 static int pqueue_read_proc(char *page, char **start, off_t off, int count, 00843 int *eof, void *data) 00844 { 00845 PROC_PRINT_VARS; 00846 int ind; 00847 00848 PROC_PRINT("\nRTAI Posix Queue Status\n"); 00849 PROC_PRINT("-----------------------\n\n"); 00850 PROC_PRINT("MAX_PQUEUES = %2d (system wide)\n", MAX_PQUEUES); 00851 PROC_PRINT("MQ_OPEN_MAX = %2d (per RT task)\n", MQ_OPEN_MAX); 00852 PROC_PRINT("MQ_NAME_MAX = %d\n", MQ_NAME_MAX); 00853 00854 PROC_PRINT("\nID NOpen NMsgs MaxMsgs MaxSz Perms Del Name\n"); 00855 PROC_PRINT("--------------------------------------------------------------------------------\n"); 00856 for (ind = 0; ind < MAX_PQUEUES; ind++) { 00857 if (rt_pqueue_descr[ind].q_name[0] || rt_pqueue_descr[ind].open_count) { 00858 PROC_PRINT( "%-3d %-6d ", 00859 rt_pqueue_descr[ind].q_id, 00860 rt_pqueue_descr[ind].open_count 00861 ); 00862 PROC_PRINT( "%-6ld %-6ld %-5ld ", 00863 rt_pqueue_descr[ind].data.attrs.mq_curmsgs, 00864 rt_pqueue_descr[ind].data.attrs.mq_maxmsg, 00865 rt_pqueue_descr[ind].data.attrs.mq_msgsize 00866 ); 00867 PROC_PRINT( "%-4o %c %s\n", 00868 rt_pqueue_descr[ind].permissions, 00869 rt_pqueue_descr[ind].marked_for_deletion ? '*' : ' ', 00870 rt_pqueue_descr[ind].q_name 00871 ); 00872 } 00873 } 00874 PROC_PRINT_DONE; 00875 } 00876 00877 static struct proc_dir_entry *proc_rtai_pqueue; 00878 00879 static int pqueue_proc_register(void) 00880 { 00881 proc_rtai_pqueue = create_proc_entry("pqueue", 0, rtai_proc_root); 00882 proc_rtai_pqueue->read_proc = pqueue_read_proc; 00883 return 0; 00884 } 00885 00886 static int pqueue_proc_unregister(void) 00887 { 00888 remove_proc_entry("pqueue", rtai_proc_root); 00889 return 0; 00890 } 00891 #endif 00892 00893 /////////////////////////////////////////////////////////////////////////////// 00894 // MODULE CONTROL 00895 /////////////////////////////////////////////////////////////////////////////// 00896 00897 struct rt_native_fun_entry rt_pqueue_entries[] = { 00898 { { UR1(1, 5) | UR2(4, 6), mq_open }, MQ_OPEN }, 00899 { { 1, _mq_receive }, MQ_RECEIVE }, 00900 { { 1, _mq_send }, MQ_SEND }, 00901 { { 1, mq_close }, MQ_CLOSE }, 00902 { { UW1(2, 3), mq_getattr }, MQ_GETATTR }, 00903 { { UR1(2, 4) | UW1(3, 4), mq_setattr }, MQ_SETATTR }, 00904 { { UR1(2, 3), mq_notify }, MQ_NOTIFY }, 00905 { { UR1(1, 2), mq_unlink }, MQ_UNLINK }, 00906 { { 1, _mq_timedreceive }, MQ_TIMEDRECEIVE }, 00907 { { 1, _mq_timedsend }, MQ_TIMEDSEND }, 00908 { { 0, 0 }, 000 } 00909 }; 00910 00911 extern int set_rt_fun_entries(struct rt_native_fun_entry *entry); 00912 extern void reset_rt_fun_entries(struct rt_native_fun_entry *entry); 00913 00914 int __rtai_mq_init(void) 00915 { 00916 num_pqueues = 0; 00917 pthread_mutex_init(&pqueue_mutex, NULL); 00918 #ifdef CONFIG_PROC_FS 00919 pqueue_proc_register(); 00920 #endif 00921 printk(KERN_INFO "RTAI[mq]: loaded.\n"); 00922 return set_rt_fun_entries(rt_pqueue_entries); 00923 return OK; 00924 } 00925 00926 void __rtai_mq_exit(void) 00927 { 00928 pthread_mutex_destroy(&pqueue_mutex); 00929 reset_rt_fun_entries(rt_pqueue_entries); 00930 #ifdef CONFIG_PROC_FS 00931 pqueue_proc_unregister(); 00932 #endif 00933 printk(KERN_INFO "RTAI[mq]: unloaded.\n"); 00934 } 00935 00936 #ifndef CONFIG_RTAI_MQ_BUILTIN 00937 module_init(__rtai_mq_init); 00938 module_exit(__rtai_mq_exit); 00939 #endif /* !CONFIG_RTAI_MQ_BUILTIN */ 00940 00941 #ifdef CONFIG_KBUILD 00942 EXPORT_SYMBOL(mq_open); 00943 EXPORT_SYMBOL(_mq_receive); 00944 EXPORT_SYMBOL(_mq_timedreceive); 00945 EXPORT_SYMBOL(_mq_send); 00946 EXPORT_SYMBOL(_mq_timedsend); 00947 EXPORT_SYMBOL(mq_close); 00948 EXPORT_SYMBOL(mq_getattr); 00949 EXPORT_SYMBOL(mq_setattr); 00950 EXPORT_SYMBOL(mq_notify); 00951 EXPORT_SYMBOL(mq_unlink); 00952 #endif /* CONFIG_KBUILD */

Generated on Thu Nov 20 11:49:50 2008 for RTAI API by doxygen 1.3.8