base/ipc/mq/mq.c

Go to the documentation of this file.
00001 /*
00002  * pqueues interface for Real Time Linux.
00003  *
00004  * Copyright (©) 1999 Zentropic Computing, All rights reserved
00005  *  
00006  * Authors:             Trevor Woolven (trevw@zentropix.com)
00007  *
00008  * Original date:       Thu 15 Jul 1999
00009  *
00010  * This program is free software; you can redistribute it and/or
00011  * modify it under the terms of the GNU General Public License as
00012  * published by the Free Software Foundation; either version 2 of the
00013  * License, or (at your option) any later version.
00014  *
00015  * This program is distributed in the hope that it will be useful,
00016  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00017  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00018  * GNU General Public License for more details.
00019  *
00020  * You should have received a copy of the GNU General Public License
00021  * along with this program; if not, write to the Free Software
00022  * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
00023  *
00024  * Timed services extension and user space integration for RTAI by
00025  * Paolo Mantegazza <mantegazza@aero.polimi.it>.
00026  * 2005, cleaned and revised Paolo Mantegazza <mantegazza@aero.polimi.it>.
00027  *
00028  */
00029 
00030 #include <linux/module.h>
00031 #include <linux/kernel.h>
00032 #include <linux/version.h>
00033 #include <linux/errno.h>
00034 #include <linux/stat.h>
00035 #include <asm/uaccess.h>
00036 #ifdef CONFIG_PROC_FS
00037 #include <linux/proc_fs.h>
00038 extern struct proc_dir_entry *rtai_proc_root;
00039 #endif
00040 #include <rtai_schedcore.h>
00041 #include <rtai_proc_fs.h>
00042 #include <rtai_signal.h>
00043 
00044 MODULE_LICENSE("GPL");
00045 
00046 #define mq_cond_t                   SEM
00047 #define mq_mutex_t                  SEM
00048 #define mq_mutex_init(mutex, attr)  rt_typed_sem_init(mutex, 1, BIN_SEM | PRIO_Q)
00049 #define mq_mutex_unlock             rt_sem_signal
00050 #define mq_mutex_lock(mutex) \
00051     do { \
00052         if (abs(rt_sem_wait(mutex)) >= RTE_LOWERR) { \
00053             return -EBADF; \
00054         } \
00055     } while (0)     
00056 #define mq_mutex_timedlock(mutex, abstime) \
00057     do { \
00058         RTIME t = timespec2count(abstime); \
00059         int ret; \
00060         if (abs(ret = rt_sem_wait_until(mutex, t)) >= RTE_LOWERR) { \
00061             return ret == RTE_TIMOUT ? -ETIMEDOUT : -EBADF; \
00062         } \
00063     } while (0)
00064 #define mq_mutex_trylock            rt_sem_wait_if
00065 #define mq_mutex_destroy            rt_sem_delete
00066 #define mq_cond_init(cond, attr)    rt_sem_init(cond, 0)
00067 #define mq_cond_wait(cond, mutex) \
00068     do { \
00069         rt_sem_signal(mutex); \
00070         if (abs(rt_sem_wait(cond)) >= RTE_LOWERR) { \
00071             return -EBADF; \
00072         } \
00073         if (abs(rt_sem_wait(mutex)) >= RTE_LOWERR) { \
00074             rt_sem_signal(cond); \
00075             return -EBADF; \
00076         } \
00077     } while (0)
00078 #define mq_cond_timedwait(cond, mutex, abstime) \
00079     do { \
00080         RTIME t = timespec2count(abstime); \
00081         int ret; \
00082         rt_sem_signal(mutex); \
00083         if (abs(ret = rt_sem_wait_until(cond, t)) >= RTE_LOWERR) { \
00084             return ret == RTE_TIMOUT ? -ETIMEDOUT : -EBADF; \
00085         } \
00086         if (abs(ret = rt_sem_wait_until(mutex, t)) >= RTE_LOWERR) { \
00087             rt_sem_signal(cond); \
00088             return ret == RTE_TIMOUT ? -ETIMEDOUT : -EBADF; \
00089         } \
00090     } while (0)
00091 #define mq_cond_signal              rt_sem_signal
00092 #define mq_cond_destroy             rt_sem_delete
00093 
00094 #ifndef OK
00095 #define OK  0
00096 #endif
00097 #ifndef ERROR
00098 #define ERROR  -1
00099 #endif
00100 
00101 ///////////////////////////////////////////////////////////////////////////////
00102 //      LOCAL DEFINITIONS
00103 ///////////////////////////////////////////////////////////////////////////////
00104 
00105 #define MAX_RT_TASKS 128
00106 
00107 ///////////////////////////////////////////////////////////////////////////////
00108 //      PACKAGE GLOBAL DATA
00109 ///////////////////////////////////////////////////////////////////////////////
00110 
00111 static int num_pqueues = 0;
00112 static struct _pqueue_descr_struct rt_pqueue_descr[MAX_PQUEUES] = {{0}};
00113 static struct _pqueue_access_struct task_pqueue_access[MAX_RT_TASKS] = {{0}};
00114 static MQ_ATTR default_queue_attrs = { MAX_MSGS, MAX_MSGSIZE, MQ_NONBLOCK, 0 };
00115 
00116 static mq_mutex_t pqueue_mutex;
00117 
00118 ///////////////////////////////////////////////////////////////////////////////
00119 //      LOCAL FUNCTIONS
00120 ///////////////////////////////////////////////////////////////////////////////
00121 
00122 static int name_to_id(char *name)
00123 {
00124     int ind;
00125     for (ind = 0; ind < MAX_PQUEUES; ind++) {
00126         if (rt_pqueue_descr[ind].q_name[0] && !strcmp(rt_pqueue_descr[ind].q_name, name)) {
00127             return ind;
00128         }
00129     } 
00130     return ERROR;
00131 }
00132 
00133 
00134 static inline mq_bool_t is_empty(struct queue_control *q)
00135 {
00136     return !q->attrs.mq_curmsgs;
00137 }
00138 
00139 
00140 static inline mq_bool_t is_full(struct queue_control *q)
00141 {
00142     return q->attrs.mq_curmsgs == q->attrs.mq_maxmsg;
00143 }
00144 
00145 
00146 static inline MSG_HDR* getnode(Q_CTRL *queue)
00147 {
00148     return queue->attrs.mq_curmsgs < queue->attrs.mq_maxmsg ? queue->nodes[queue->attrs.mq_curmsgs++] : NULL;
00149 }
00150 
00151 static inline int freenode(void *node, Q_CTRL *queue)
00152 {
00153     if (queue->attrs.mq_curmsgs > 0) {
00154                 queue->nodes[--queue->attrs.mq_curmsgs] = node;
00155                 return 0;
00156         }
00157         return -EINVAL;
00158 }
00159 
00160 
00161 static void insert_message(Q_CTRL *q, MSG_HDR *this_msg)
00162 {
00163 //This function finds the appropriate point in a priority queue to
00164 //insert the supplied message. It preserves FIFO order within each
00165 //priority levela and can therefore be used for FIFO queuing policies
00166 //simply by making the priority equal to the supplied message priority
00167 
00168     MSG_HDR *prev, *insertpt;
00169 
00170 //Do a quick check in case the message at the back of the queue has
00171 //a higher priority than this one, in which case this one can just
00172 //go at the back of the queue. 
00173 //Remember that Posix priorities increase from 0 to (at least) 32
00174 
00175     if (((MSG_HDR *)q->tail)->priority >= this_msg->priority) {
00176         ((MSG_HDR*)q->tail)->next = this_msg;
00177         q->tail = this_msg;
00178     } else {
00179         prev = insertpt = q->head;
00180 //POSIX queues preserve FIFO ordering of messages within
00181 //a particular priority level
00182         while (insertpt->priority >= this_msg->priority) {
00183                 prev = insertpt;
00184                 insertpt = insertpt->next; 
00185         }
00186 //We've now found a message (or messages) of equal or lower
00187 //priority than the one we're trying to put onto the queue
00188         if (insertpt == q->head) {
00189             this_msg->next = q->head;
00190             q->head = this_msg;
00191         } else {
00192             this_msg->next = prev->next;
00193             prev->next = this_msg;
00194         }   
00195     }
00196 }
00197 
00198 #undef mqueues
00199 #define mqueues system_data_ptr
00200 
00201 static mq_bool_t is_blocking(MSG_QUEUE *q)
00202 {
00203     int q_ind;
00204     struct _pqueue_access_data *aces;
00205 
00206     aces = ((QUEUE_CTRL)_rt_whoami()->mqueues)->q_access;
00207     for (q_ind = 0; q_ind < MQ_OPEN_MAX; q_ind++) {
00208         if (aces[q_ind].q_id == q->q_id) {
00209             return !(aces[q_ind].oflags & O_NONBLOCK);
00210         }
00211     }
00212     return FALSE;
00213 }
00214 
00215 
00216 static mq_bool_t can_access(MSG_QUEUE *q, Q_ACCESS access)
00217 {
00218     RT_TASK *caller = _rt_whoami();
00219 
00220     if (q->owner == caller ? (((access == FOR_READ) && (q->permissions & S_IRUSR)) || ((access == FOR_WRITE) && (q->permissions & S_IWUSR))) : (((access == FOR_READ) && (q->permissions & S_IRGRP)) || ((access == FOR_WRITE) && (q->permissions & S_IWGRP)))) {
00221         int q_ind;
00222         struct _pqueue_access_data *aces;
00223         struct _pqueue_access_struct *task_queue_data_ptr;
00224         int q_access_flags = 0;
00225 
00226         task_queue_data_ptr = (QUEUE_CTRL)caller->mqueues;
00227         if (task_queue_data_ptr == NULL) {
00228             return FALSE;
00229         }
00230         aces = task_queue_data_ptr->q_access;
00231         for (q_ind = 0; q_ind < MQ_OPEN_MAX; q_ind++) {
00232             if (aces[q_ind].q_id == q->q_id) {
00233                 q_access_flags = aces[q_ind].oflags;
00234                 goto set_mode;
00235             }
00236         }
00237         return FALSE;
00238 set_mode:   if (access == FOR_WRITE) {
00239             if ((q_access_flags & O_WRONLY) || (q_access_flags & O_RDWR)) {
00240                 return TRUE;
00241             }
00242         } else {
00243             return TRUE;
00244         }
00245     }
00246     return FALSE;
00247 }
00248 
00249 
00250 static inline void initialise_queue(Q_CTRL *q)
00251 {
00252     int msg_size, msg_ind;
00253     void *msg_ptr;
00254 
00255     msg_size = q->attrs.mq_msgsize + sizeof(MSG_HDR);
00256     msg_ptr = q->base;
00257     q->nodes = msg_ptr + msg_size*q->attrs.mq_maxmsg; 
00258     for (msg_ind = 0; msg_ind < q->attrs.mq_maxmsg; msg_ind++) {
00259         q->nodes[msg_ind] = msg_ptr;
00260         ((MSG_HDR *)msg_ptr)->size = 0;
00261         ((MSG_HDR *)msg_ptr)->priority = MQ_MIN_MSG_PRIORITY;
00262         ((MSG_HDR *)msg_ptr)->next = NULL;
00263         msg_ptr += msg_size;
00264     }
00265 }
00266 
00267 
00268 static void delete_queue(int q_index)
00269 {
00270     rt_free(rt_pqueue_descr[q_index].data.base);
00271 
00272     rt_pqueue_descr[q_index].owner = NULL;
00273     rt_pqueue_descr[q_index].open_count = 0;
00274     strcpy(rt_pqueue_descr[q_index].q_name, "\0");
00275     rt_pqueue_descr[q_index].q_id = INVALID_PQUEUE;
00276     rt_pqueue_descr[q_index].data.base = NULL;
00277     rt_pqueue_descr[q_index].data.head = NULL;
00278     rt_pqueue_descr[q_index].data.tail = NULL;
00279     rt_pqueue_descr[q_index].data.attrs = (MQ_ATTR){ 0, 0, 0, 0 };
00280     rt_pqueue_descr[q_index].permissions = 0;
00281 
00282     mq_mutex_unlock(&rt_pqueue_descr[q_index].mutex);
00283     mq_mutex_destroy(&rt_pqueue_descr[q_index].mutex);
00284     mq_cond_destroy(&rt_pqueue_descr[q_index].emp_cond);
00285     mq_cond_destroy(&rt_pqueue_descr[q_index].full_cond);
00286 
00287     if (num_pqueues > 0) {  
00288         num_pqueues--;  
00289     }
00290 }
00291 
00292 static void signal_suprt_fun_mq(void *fun_arg)
00293 {           
00294     struct suprt_fun_arg { RT_TASK *sigtask; RT_TASK *task; mqd_t mq; } arg = *(struct suprt_fun_arg *)fun_arg;
00295     
00296     arg.sigtask = RT_CURRENT;
00297     if (!rt_request_signal_(arg.sigtask, arg.task, (arg.mq + MAXSIGNALS))) {
00298         while (rt_wait_signal(arg.sigtask, arg.task)) {
00299             rt_pqueue_descr[arg.mq - 1].notify.data._sigev_un._sigev_thread._function((sigval_t)rt_pqueue_descr[arg.mq - 1].notify.data.sigev_value.sival_ptr);
00300         }
00301     } else {
00302         rt_task_resume(arg.task);
00303     }
00304 }
00305 
00306 int rt_request_signal_mq(mqd_t mq)
00307 {
00308         RT_TASK *sigtask;
00309         struct suprt_fun_arg { RT_TASK *sigtask; RT_TASK *task; mqd_t mq; } arg = { NULL, rt_whoami(), mq };
00310         if ((sigtask = rt_malloc(sizeof(RT_TASK)))) {
00311             rt_task_init_cpuid(sigtask, (void *)signal_suprt_fun_mq, (long)&arg, SIGNAL_TASK_STACK_SIZE, arg.task->priority, 0, 0, RT_CURRENT->runnable_on_cpus);
00312             rt_task_resume(sigtask);
00313             rt_task_suspend(arg.task);
00314             return arg.task->retval;
00315         }
00316     return -EINVAL;
00317 }
00318 
00319 ///////////////////////////////////////////////////////////////////////////////
00320 //      POSIX MESSAGE QUEUES API
00321 ///////////////////////////////////////////////////////////////////////////////
00322 
00323 RTAI_SYSCALL_MODE mqd_t _mq_open(char *mq_name, int oflags, mode_t permissions, struct mq_attr *mq_attr, long space)
00324 {
00325     int q_index, t_index, q_ind;
00326     int spare_count = 0, first_spare = 0;
00327     mq_bool_t q_found = FALSE;
00328     RT_TASK *this_task = _rt_whoami();
00329     struct _pqueue_access_struct *task_data_ptr;
00330 
00331     task_data_ptr = (QUEUE_CTRL)this_task->mqueues;
00332 
00333     mq_mutex_lock(&pqueue_mutex);
00334     if ((q_index = name_to_id(mq_name)) >= 0) {
00335 //========================
00336 // OPEN AN EXISTING QUEUE
00337 //========================
00338         if ((oflags & O_CREAT) && (oflags & O_EXCL)) {
00339             mq_mutex_unlock(&pqueue_mutex);
00340             return -EEXIST;
00341         }
00342         if (task_data_ptr == NULL) {
00343             for (t_index = 0; t_index < MAX_RT_TASKS; t_index++) {
00344                 if (task_pqueue_access[t_index].this_task == NULL) {
00345                     task_data_ptr = &(task_pqueue_access[t_index]);
00346                     task_data_ptr->this_task = this_task;
00347                     this_task->mqueues = task_data_ptr;
00348                     break;
00349                 }
00350             }
00351             if (t_index == MAX_RT_TASKS) {
00352                 mq_mutex_unlock(&pqueue_mutex);
00353                 return -ENOMEM;
00354             }
00355         }
00356     //Now record that this task has opened this queue and
00357     //the access permissions required
00358     //Check first to see if this task has already opened this queue
00359     //and while doing so, record the number of spare 'slots' for this
00360     //task to have further opened queues
00361     mq_mutex_lock(&rt_pqueue_descr[q_index].mutex);
00362         for (q_ind = 0; q_ind < MQ_OPEN_MAX; q_ind++) {
00363             if (task_data_ptr->q_access[q_ind].q_id == rt_pqueue_descr[q_index].q_id) { 
00364                 q_found = TRUE;
00365                 break;
00366             } else if(task_data_ptr->q_access[q_ind].q_id == INVALID_PQUEUE) {
00367                 if (spare_count == 0) {
00368                     first_spare = q_ind;
00369                 }           
00370                 spare_count++;
00371             }
00372         }   
00373     //If the task has not already opened this queue and there are no
00374     //more available slots, can't do anymore...
00375         if (!q_found && spare_count == 0) {
00376             mq_mutex_unlock(&rt_pqueue_descr[q_index].mutex);
00377             mq_mutex_unlock(&pqueue_mutex);
00378             return -EINVAL;
00379         }
00380     //Either the queue has already been opened and so we can re-use
00381     //it's slot, or a new one is being opened in an unused slot
00382         if (!q_found) {
00383         //Open a new one, using the first free slot
00384             task_data_ptr->n_open_pqueues++;
00385             q_ind = first_spare;
00386         }
00387         task_data_ptr->q_access[q_ind].q_id = rt_pqueue_descr[q_index].q_id;
00388         task_data_ptr->q_access[q_ind].oflags = oflags;
00389         mq_mutex_unlock(&rt_pqueue_descr[q_index].mutex);
00390     } else if (oflags & O_CREAT) {
00391 //================
00392 // CREATE A QUEUE 
00393 //================
00394         if(num_pqueues >= MAX_PQUEUES) {
00395             mq_mutex_unlock(&pqueue_mutex);
00396             return -ENOMEM;
00397         }
00398     //Check the size of the name
00399         if( strlen(mq_name) >= MQ_NAME_MAX) {
00400             mq_mutex_unlock(&pqueue_mutex);
00401             return -ENAMETOOLONG;
00402         }
00403     //Allocate a task pqueue access structure to this task, if necessary.
00404     //Otherwise, check that this task has not already opened too many 
00405     //queues
00406     //
00407         if (task_data_ptr == NULL) {
00408         //Find a spare task pqueue access slot for this task
00409             for (t_index = 0; t_index < MAX_RT_TASKS; t_index++) {
00410                 if (task_pqueue_access[t_index].this_task == NULL) {
00411                     task_data_ptr = &task_pqueue_access[t_index];
00412                     task_data_ptr->this_task = this_task;
00413                     this_task->mqueues = task_data_ptr;
00414                     break;
00415                 }
00416             }
00417             if (t_index == MAX_RT_TASKS) {
00418                 mq_mutex_unlock(&pqueue_mutex);
00419                 return -ENOMEM;
00420             }
00421         } else if (task_data_ptr->n_open_pqueues >= MQ_OPEN_MAX) {
00422             mq_mutex_unlock(&pqueue_mutex);
00423             return -EINVAL;
00424         }
00425     //Look for default queue attributes
00426         if (mq_attr == NULL) {
00427             mq_attr = &default_queue_attrs;
00428         }
00429     //Find a spare descriptor for this queue
00430         for (q_index = 0; q_index < MAX_PQUEUES; q_index++) {
00431             if (rt_pqueue_descr[q_index].q_id == INVALID_PQUEUE) {
00432                 int msg_size, queue_size;
00433                 void *mem_ptr;
00434         //Get memory for the data queue
00435                 msg_size = mq_attr->mq_msgsize + sizeof(MSG_HDR);
00436                 queue_size = (msg_size + sizeof(void *))*mq_attr->mq_maxmsg;
00437                 mem_ptr = rt_malloc(queue_size);
00438                 if(mem_ptr == NULL) {
00439                     mq_mutex_unlock(&pqueue_mutex);
00440                     return -ENOMEM;
00441                 }
00442                 rt_pqueue_descr[q_index].data.base = mem_ptr; 
00443         //Initialise the Message Queue descriptor
00444                     rt_pqueue_descr[q_index].owner = this_task;
00445                     rt_pqueue_descr[q_index].open_count = 0;
00446                 strcpy(rt_pqueue_descr[q_index].q_name, mq_name);
00447                 rt_pqueue_descr[q_index].q_id = q_index + 1;
00448                 rt_pqueue_descr[q_index].marked_for_deletion = FALSE;
00449                 rt_pqueue_descr[q_index].data.head = 
00450                 rt_pqueue_descr[q_index].data.tail = rt_pqueue_descr[q_index].data.base;
00451                 rt_pqueue_descr[q_index].data.attrs = *(mq_attr);
00452                 rt_pqueue_descr[q_index].data.attrs.mq_curmsgs = 0;
00453                 rt_pqueue_descr[q_index].permissions = permissions;
00454         //Initialise conditional variables used for blocking
00455                 mq_cond_init(&rt_pqueue_descr[q_index].emp_cond, NULL);
00456                 mq_cond_init(&rt_pqueue_descr[q_index].full_cond, NULL);
00457                 mq_mutex_init(&rt_pqueue_descr[q_index].mutex, NULL);
00458 
00459         //Clear the queue contents
00460                 initialise_queue(&rt_pqueue_descr[q_index].data);
00461         //Initialise the Task Queue access descriptor
00462                 q_ind = task_data_ptr->n_open_pqueues++;
00463                 task_data_ptr->q_access[q_ind].q_id = q_index + 1;
00464                 task_data_ptr->q_access[q_ind].oflags = oflags;
00465                 break;
00466             }
00467             }
00468         if(q_index >= MAX_PQUEUES) {
00469             mq_mutex_unlock(&pqueue_mutex);
00470             return -EMFILE;
00471         }
00472         num_pqueues++;
00473     } else {
00474 //==============================
00475 // OPENING A NON-EXISTANT QUEUE
00476 //==============================
00477         mq_mutex_unlock(&pqueue_mutex);
00478         return -ENOENT;
00479     }
00480     
00481     // Return the message queue's id and mark it as open
00482     rt_pqueue_descr[q_index].open_count++;
00483     mq_mutex_unlock(&pqueue_mutex);
00484     
00485     // Prepare notify task 
00486     if ((oflags & O_NOTIFY_NP) && space == 0)   {
00487         rt_request_signal_mq(rt_pqueue_descr[q_index].q_id);
00488     }
00489     
00490     return (mqd_t)rt_pqueue_descr[q_index].q_id;
00491 }
00492 EXPORT_SYMBOL(_mq_open);
00493 
00494 RTAI_SYSCALL_MODE size_t _mq_receive(mqd_t mq, char *msg_buffer, size_t buflen, unsigned int *msgprio, int space)
00495 {
00496     int q_index = mq - 1, size;
00497     MQMSG *msg_ptr;
00498     MSG_QUEUE *q;
00499 
00500     if (q_index < 0 || q_index >= MAX_PQUEUES) { 
00501         return -EBADF;
00502     }
00503     q = &rt_pqueue_descr[q_index];
00504     if (buflen < q->data.attrs.mq_msgsize) {
00505         return -EMSGSIZE;
00506     }
00507     if (can_access(q, FOR_READ) == FALSE) {
00508         return -EINVAL;
00509     }
00510     if (is_blocking(q)) {
00511         mq_mutex_lock(&q->mutex);
00512     } else if (mq_mutex_trylock(&q->mutex) <= 0) {
00513         return -EAGAIN;
00514     }
00515         while (is_empty(&q->data)) {
00516         if (is_blocking(q)) {
00517             mq_cond_wait(&q->emp_cond, &q->mutex);
00518         } else {
00519             mq_mutex_unlock(&q->mutex);
00520             return -EAGAIN;
00521         }
00522     }
00523         msg_ptr = q->data.head;
00524         if (msg_ptr->hdr.size <= buflen) {
00525         size = msg_ptr->hdr.size;
00526         if (space) {
00527             memcpy(msg_buffer, &msg_ptr->data, size);
00528             if (msgprio) {
00529                 *msgprio = msg_ptr->hdr.priority;
00530             }
00531         } else {
00532             rt_copy_to_user(msg_buffer, &msg_ptr->data, size);
00533             if (msgprio) {
00534                 rt_put_user(msg_ptr->hdr.priority, msgprio);
00535             }
00536         }
00537     } else {
00538         size = ERROR;
00539     }
00540     q->data.head = msg_ptr->hdr.next;
00541     msg_ptr->hdr.size = 0;
00542         msg_ptr->hdr.next = NULL;
00543         freenode(msg_ptr, &q->data);
00544     if(q->data.head == NULL) {
00545         q->data.head = q->data.tail = q->data.nodes[0];
00546     }
00547     mq_cond_signal(&q->full_cond);
00548     mq_mutex_unlock(&q->mutex);
00549     return size;
00550 }
00551 EXPORT_SYMBOL(_mq_receive);
00552 
00553 RTAI_SYSCALL_MODE size_t _mq_timedreceive(mqd_t mq, char *msg_buffer, size_t buflen, unsigned int *msgprio, const struct timespec *abstime, int space)
00554 {
00555     int q_index = mq - 1, size;
00556     MQMSG *msg_ptr;
00557     MSG_QUEUE *q;
00558     struct timespec time;
00559     
00560     if (q_index < 0 || q_index >= MAX_PQUEUES) { 
00561         return -EBADF;
00562     }
00563     q = &rt_pqueue_descr[q_index];
00564     if (buflen < q->data.attrs.mq_msgsize) {
00565         return -EMSGSIZE;
00566     }
00567     if (can_access(q, FOR_READ) == FALSE) {
00568         return -EINVAL;
00569     }
00570     if (!space) {
00571         rt_copy_from_user(&time, abstime, sizeof(struct timespec));
00572         abstime = &time;
00573     }
00574     if (is_blocking(q)) {
00575         mq_mutex_timedlock(&q->mutex, abstime);
00576     } else if (mq_mutex_trylock(&q->mutex) <= 0) {
00577         return -EAGAIN;
00578     }
00579     while (is_empty(&q->data)) {
00580         if (is_blocking(q)) {
00581             mq_cond_timedwait(&q->emp_cond, &q->mutex, abstime);
00582         } else {
00583             return -EAGAIN;
00584         }
00585     }
00586     msg_ptr = q->data.head;
00587     if (msg_ptr->hdr.size <= buflen) {
00588         size = msg_ptr->hdr.size;
00589         if (space) {
00590             memcpy(msg_buffer, &msg_ptr->data, size);
00591             if (msgprio) {
00592                 *msgprio = msg_ptr->hdr.priority;
00593             }
00594         } else {
00595             rt_copy_to_user(msg_buffer, &msg_ptr->data, size);
00596             if (msgprio) {
00597                 rt_put_user(msg_ptr->hdr.priority, msgprio);
00598             }
00599         }
00600     } else {
00601         size = ERROR;
00602     }
00603     q->data.head = msg_ptr->hdr.next;
00604     msg_ptr->hdr.size = 0;
00605     msg_ptr->hdr.next = NULL;
00606         freenode(msg_ptr, &q->data);
00607     if(q->data.head == NULL) {
00608         q->data.head = q->data.tail = q->data.nodes[0];
00609     }
00610     mq_cond_signal(&q->full_cond);
00611     mq_mutex_unlock(&q->mutex);
00612     return size;
00613 }
00614 EXPORT_SYMBOL(_mq_timedreceive);
00615 
00616 RTAI_SYSCALL_MODE int _mq_send(mqd_t mq, const char *msg, size_t msglen, unsigned int msgprio, int space)
00617 {
00618     int q_index = mq - 1;
00619     MSG_QUEUE *q;
00620     MSG_HDR *this_msg;
00621     mq_bool_t q_was_empty;
00622 
00623     if (q_index < 0 || q_index >= MAX_PQUEUES) { 
00624         return -EBADF;
00625     }
00626     q = &rt_pqueue_descr[q_index];
00627     if( can_access(q, FOR_WRITE) == FALSE) {
00628         return -EINVAL;
00629     }
00630     if(msgprio > MQ_PRIO_MAX) {
00631         return -EINVAL;
00632     }
00633     if (is_blocking(q)) {
00634         mq_mutex_lock(&q->mutex);
00635     } else if (mq_mutex_trylock(&q->mutex) <= 0) {
00636         return -EAGAIN;
00637     }
00638     q_was_empty = is_empty(&q->data);
00639     while (is_full(&q->data)) {
00640         if (is_blocking(q)) {
00641             mq_cond_wait(&q->full_cond, &q->mutex);
00642         } else {
00643             mq_mutex_unlock(&q->mutex);
00644             return -EAGAIN;
00645         }
00646         }
00647     if( (this_msg = getnode(&q->data)) == NULL) {
00648         mq_mutex_unlock(&q->mutex);
00649         return -ENOBUFS;
00650     }
00651     if (msglen > q->data.attrs.mq_msgsize) {
00652         mq_mutex_unlock(&q->mutex);
00653         return -EMSGSIZE;
00654     }
00655     this_msg->size = msglen;
00656     this_msg->priority = msgprio;
00657     if (space) {
00658         memcpy(&((MQMSG *)this_msg)->data, msg, msglen);
00659     } else {
00660         rt_copy_from_user(&((MQMSG *)this_msg)->data, msg, msglen);
00661     }
00662     insert_message(&q->data, this_msg);
00663     mq_cond_signal(&q->emp_cond);
00664     if(q_was_empty && rt_pqueue_descr[q_index].notify.task != NULL) {
00665         rt_trigger_signal((MAXSIGNALS + mq), rt_pqueue_descr[q_index].notify.task);
00666         rt_pqueue_descr[q_index].notify.task = NULL;
00667     }
00668     mq_mutex_unlock(&q->mutex);
00669     return msglen;
00670 }
00671 EXPORT_SYMBOL(_mq_send);
00672 
00673 RTAI_SYSCALL_MODE int _mq_timedsend(mqd_t mq, const char *msg, size_t msglen, unsigned int msgprio, const struct timespec *abstime, int space)
00674 {
00675     int q_index = mq - 1;
00676     MSG_QUEUE *q;
00677     MSG_HDR *this_msg;
00678     mq_bool_t q_was_empty;
00679     struct timespec time;
00680 
00681     if (q_index < 0 || q_index >= MAX_PQUEUES) { 
00682         return -EBADF;
00683     }
00684     q = &rt_pqueue_descr[q_index];
00685     if (can_access(q, FOR_WRITE) == FALSE) {
00686         return -EINVAL;
00687     }
00688     if (msgprio > MQ_PRIO_MAX) {
00689         return -EINVAL;
00690     }           
00691     if (!space) {
00692         rt_copy_from_user(&time, abstime, sizeof(struct timespec));
00693         abstime = &time;
00694     }
00695     if (is_blocking(q)) {
00696         mq_mutex_timedlock(&q->mutex, abstime);
00697     } else if (mq_mutex_trylock(&q->mutex) <= 0) {
00698         return -EAGAIN;
00699     }
00700     q_was_empty = is_empty(&q->data);
00701     while (is_full(&q->data)) {
00702         if (is_blocking(q)) {
00703             mq_cond_timedwait(&q->full_cond, &q->mutex, abstime);
00704         } else {
00705             mq_mutex_unlock(&q->mutex);
00706             return -EAGAIN;
00707         }
00708     }
00709     if ((this_msg = getnode(&q->data)) == NULL) {
00710         mq_mutex_unlock(&q->mutex);
00711         return -ENOBUFS;
00712     }
00713     if (msglen > q->data.attrs.mq_msgsize) {
00714         mq_mutex_unlock(&q->mutex);
00715         return -EMSGSIZE;
00716     }
00717     this_msg->size = msglen;
00718     this_msg->priority = msgprio;
00719     if (space) {
00720         memcpy(&((MQMSG *)this_msg)->data, msg, msglen);
00721     } else {
00722         rt_copy_from_user(&((MQMSG *)this_msg)->data, msg, msglen);
00723     }
00724     insert_message(&q->data, this_msg);
00725     mq_cond_signal(&q->emp_cond);
00726     if (q_was_empty && rt_pqueue_descr[q_index].notify.task != NULL) {
00727         rt_trigger_signal((MAXSIGNALS + mq), rt_pqueue_descr[q_index].notify.task);
00728         rt_pqueue_descr[q_index].notify.task = NULL;
00729     }
00730     mq_mutex_unlock(&q->mutex);
00731     return msglen;
00732 
00733 }
00734 EXPORT_SYMBOL(_mq_timedsend);
00735 
00736 RTAI_SYSCALL_MODE int mq_close(mqd_t mq)
00737 {
00738     int q_index = mq - 1;
00739     int q_ind;
00740     RT_TASK *this_task = _rt_whoami();
00741     struct _pqueue_access_struct *task_queue_data_ptr;
00742 
00743     if (q_index < 0 || q_index >= MAX_PQUEUES) { 
00744         return -EINVAL;
00745     }
00746     task_queue_data_ptr = (QUEUE_CTRL)this_task->mqueues;
00747     if (task_queue_data_ptr == NULL ) {
00748         return -EINVAL;
00749     }
00750     mq_mutex_lock(&pqueue_mutex);
00751     for (q_ind = 0; q_ind < MQ_OPEN_MAX; q_ind++) {
00752         if (task_queue_data_ptr->q_access[q_ind].q_id == mq) {
00753             task_queue_data_ptr->q_access[q_ind].q_id = INVALID_PQUEUE;
00754             task_queue_data_ptr->q_access[q_ind].usp_notifier = NULL;
00755             rt_release_signal((mq + MAXSIGNALS), task_queue_data_ptr->this_task);
00756             task_queue_data_ptr->n_open_pqueues--;
00757             break;
00758         }
00759     }
00760     if (q_ind == MQ_OPEN_MAX) {
00761         mq_mutex_unlock(&pqueue_mutex);
00762         return -EINVAL;
00763     }
00764     mq_mutex_lock(&rt_pqueue_descr[q_index].mutex);
00765     if (rt_pqueue_descr[q_index].notify.task == this_task) {
00766         rt_pqueue_descr[q_index].notify.task = NULL;
00767     }
00768         if (--rt_pqueue_descr[q_index].open_count <= 0 &&
00769         rt_pqueue_descr[q_index].marked_for_deletion == TRUE ) {
00770         delete_queue(q_index);
00771     }
00772     mq_mutex_unlock(&rt_pqueue_descr[q_index].mutex);
00773     mq_mutex_unlock(&pqueue_mutex);
00774         return OK;
00775 }
00776 EXPORT_SYMBOL(mq_close);
00777 
00778 RTAI_SYSCALL_MODE int mq_getattr(mqd_t mq, struct mq_attr *attrbuf)
00779 {
00780     int q_index = mq - 1;
00781 
00782     if (0 <= q_index && q_index < MAX_PQUEUES) { 
00783         *attrbuf = rt_pqueue_descr[q_index].data.attrs;
00784         return OK;
00785     }
00786     return -EBADF;
00787 }
00788 EXPORT_SYMBOL(mq_getattr);
00789 
00790 RTAI_SYSCALL_MODE int mq_setattr(mqd_t mq, const struct mq_attr *new_attrs, struct mq_attr *old_attrs)
00791 {
00792     int q_index = mq - 1;
00793     int q_ind;
00794     RT_TASK *this_task = _rt_whoami();
00795     struct _pqueue_access_struct *task_queue_data_ptr;
00796 
00797     if (q_index < 0 || q_index >= MAX_PQUEUES) {
00798         return -EBADF;
00799     }
00800     if (old_attrs != NULL) { 
00801         *old_attrs = rt_pqueue_descr[q_index].data.attrs;
00802     }
00803     task_queue_data_ptr = (QUEUE_CTRL)this_task->mqueues;
00804     if (task_queue_data_ptr == NULL) {
00805         return -EINVAL;
00806     }
00807     for (q_ind = 0; q_ind < MQ_OPEN_MAX; q_ind++) {
00808         if (task_queue_data_ptr->q_access[q_ind].q_id == mq) {
00809             if(new_attrs->mq_flags == MQ_NONBLOCK) {
00810                 task_queue_data_ptr->q_access[q_ind].oflags |= O_NONBLOCK;
00811             } else if (new_attrs->mq_flags == MQ_BLOCK) {
00812                 task_queue_data_ptr->q_access[q_ind].oflags &= ~O_NONBLOCK;
00813             } else {
00814                 return -EINVAL;
00815             }
00816                 break;
00817         }
00818     }
00819     if (q_ind == MQ_OPEN_MAX) {
00820         return -EINVAL;
00821     }
00822     mq_mutex_lock(&rt_pqueue_descr[q_index].mutex);
00823     rt_pqueue_descr[q_index].data.attrs.mq_flags = new_attrs->mq_flags;
00824     mq_mutex_unlock(&rt_pqueue_descr[q_index].mutex);
00825     return OK;
00826 }
00827 EXPORT_SYMBOL(mq_setattr);
00828 
00829 RTAI_SYSCALL_MODE int mq_reg_usp_notifier(mqd_t mq, RT_TASK *task, struct sigevent *usp_notification)
00830 {
00831     mq_mutex_lock(&rt_pqueue_descr[mq -1].mutex);
00832     ((QUEUE_CTRL)task->mqueues)->q_access[mq -1].usp_notifier = usp_notification;
00833     rt_copy_to_user(usp_notification, &rt_pqueue_descr[mq -1].notify.data, sizeof(struct sigevent));
00834     mq_mutex_unlock(&rt_pqueue_descr[mq -1].mutex);
00835     return 0;
00836 }
00837 
00838 RTAI_SYSCALL_MODE int _mq_notify(mqd_t mq, RT_TASK *task, long space, long rem, const struct sigevent *notification)
00839 {
00840     int q_index = mq - 1;
00841     int rtn;
00842     if (q_index < 0 || q_index >= MAX_PQUEUES) {
00843         return -EBADF;
00844     }
00845     if (rem) {
00846         if (rt_pqueue_descr[q_index].notify.task == task) {
00847             mq_mutex_lock(&rt_pqueue_descr[q_index].mutex);
00848             rt_pqueue_descr[q_index].notify.task = NULL;
00849             mq_mutex_unlock(&rt_pqueue_descr[q_index].mutex);
00850             return OK;
00851         } else {
00852             return -EBUSY;
00853         }
00854     }
00855     if (!space && !task->rt_signals) {
00856         rt_request_signal_mq(mq);
00857     } else if (!space && !((struct rt_signal_t *)task->rt_signals)[MAXSIGNALS + mq].sigtask) {
00858         rt_request_signal_mq(mq);
00859     }
00860     if (!space && (notification->sigev_notify != SIGEV_THREAD)){
00861         return ERROR;
00862     }
00863     mq_mutex_lock(&rt_pqueue_descr[q_index].mutex);
00864     if (rt_pqueue_descr[q_index].notify.task == NULL) {
00865             rt_pqueue_descr[q_index].notify.task = task;
00866             rt_pqueue_descr[q_index].notify.data = *notification;
00867             if (space) {
00868                 if (((QUEUE_CTRL)task->mqueues)->q_access[mq -1].usp_notifier) {
00869                     rt_copy_to_user(((QUEUE_CTRL)task->mqueues)->q_access[mq -1].usp_notifier, &rt_pqueue_descr[mq -1].notify.data, sizeof(struct sigevent));
00870                     rtn = OK;
00871                 } else {
00872                     rtn = O_NOTIFY_NP;
00873                 }
00874             } else {
00875                 rtn = OK;
00876             }
00877     } else {
00878         rtn = -EBUSY;
00879     }
00880     mq_mutex_unlock(&rt_pqueue_descr[q_index].mutex);
00881     return rtn;
00882 }
00883 EXPORT_SYMBOL(_mq_notify);
00884 
00885 RTAI_SYSCALL_MODE int mq_unlink(char *mq_name)
00886 {
00887     int q_index, rtn;
00888 
00889     mq_mutex_lock(&pqueue_mutex);
00890     q_index = name_to_id(mq_name);
00891 
00892     if (q_index < 0) {
00893         mq_mutex_unlock(&pqueue_mutex);
00894         return -ENOENT;
00895     }
00896     mq_mutex_lock(&rt_pqueue_descr[q_index].mutex);
00897     if (rt_pqueue_descr[q_index].open_count > 0) {
00898         strcpy(rt_pqueue_descr[q_index].q_name, "\0");
00899         rt_pqueue_descr[q_index].marked_for_deletion = TRUE;
00900         rtn = rt_pqueue_descr[q_index].open_count;
00901     } else {
00902         delete_queue(q_index);
00903         rtn = OK;
00904     }
00905     mq_mutex_unlock(&rt_pqueue_descr[q_index].mutex);
00906     mq_mutex_unlock(&pqueue_mutex);
00907     return rtn;
00908 }
00909 EXPORT_SYMBOL(mq_unlink);
00910 
00911 ///////////////////////////////////////////////////////////////////////////////
00912 //      PROC FILESYSTEM SECTION
00913 ///////////////////////////////////////////////////////////////////////////////
00914 
00915 #ifdef CONFIG_PROC_FS
00916 
00917 static int pqueue_read_proc(char *page, char **start, off_t off, int count,
00918                 int *eof, void *data)
00919 {
00920 PROC_PRINT_VARS;
00921 int ind;
00922 
00923     PROC_PRINT("\nRTAI Posix Queue Status\n");
00924     PROC_PRINT("-----------------------\n\n");
00925     PROC_PRINT("MAX_PQUEUES = %2d (system wide)\n", MAX_PQUEUES);
00926     PROC_PRINT("MQ_OPEN_MAX = %2d (per RT task)\n", MQ_OPEN_MAX);
00927     PROC_PRINT("MQ_NAME_MAX = %d\n", MQ_NAME_MAX);
00928 
00929     PROC_PRINT("\nID  NOpen  NMsgs  MaxMsgs  MaxSz  Perms  Del  Name\n");
00930     PROC_PRINT("--------------------------------------------------------------------------------\n");
00931     for (ind = 0; ind < MAX_PQUEUES; ind++) {
00932     if (rt_pqueue_descr[ind].q_name[0] || rt_pqueue_descr[ind].open_count) {
00933         PROC_PRINT( "%-3d %-6d ",
00934             rt_pqueue_descr[ind].q_id,
00935             rt_pqueue_descr[ind].open_count
00936             );
00937         PROC_PRINT( "%-6ld %-6ld   %-5ld  ",
00938                 rt_pqueue_descr[ind].data.attrs.mq_curmsgs,
00939             rt_pqueue_descr[ind].data.attrs.mq_maxmsg,
00940             rt_pqueue_descr[ind].data.attrs.mq_msgsize
00941             );
00942         PROC_PRINT( "%-4o   %c    %s\n",
00943             rt_pqueue_descr[ind].permissions,
00944             rt_pqueue_descr[ind].marked_for_deletion ? '*' : ' ',
00945             rt_pqueue_descr[ind].q_name
00946             );
00947     }
00948     }
00949     PROC_PRINT_DONE;
00950 }
00951 
00952 static struct proc_dir_entry *proc_rtai_pqueue;
00953 
00954 static int pqueue_proc_register(void)
00955 {
00956     proc_rtai_pqueue = create_proc_entry("pqueue", 0, rtai_proc_root);
00957     proc_rtai_pqueue->read_proc = pqueue_read_proc;
00958     return 0;
00959 }
00960 
00961 static int pqueue_proc_unregister(void)
00962 {
00963     remove_proc_entry("pqueue", rtai_proc_root);
00964     return 0;
00965 }
00966 #endif
00967 
00968 ///////////////////////////////////////////////////////////////////////////////
00969 //      MODULE CONTROL
00970 ///////////////////////////////////////////////////////////////////////////////
00971 
00972 struct rt_native_fun_entry rt_pqueue_entries[] = {
00973     { { UR1(1, 5) | UR2(4, 6), _mq_open },              MQ_OPEN },
00974         { { 1, _mq_receive },                       MQ_RECEIVE },
00975         { { 1, _mq_send },                          MQ_SEND },
00976         { { 1, mq_close },                              MQ_CLOSE },
00977         { { UW1(2, 3), mq_getattr },                    MQ_GETATTR },
00978         { { UR1(2, 4) | UW1(3, 4), mq_setattr },    MQ_SETATTR },
00979         { { UR1(5, 6), _mq_notify },                     MQ_NOTIFY },
00980         { { UR1(1, 2), mq_unlink },                     MQ_UNLINK },      
00981         { { 1, _mq_timedreceive },          MQ_TIMEDRECEIVE },
00982         { { 1, _mq_timedsend },                 MQ_TIMEDSEND },
00983         { { 1,  mq_reg_usp_notifier },              MQ_REG_USP_NOTIFIER },
00984     { { 0, 0 },                             000 }
00985 };
00986 
00987 extern int set_rt_fun_entries(struct rt_native_fun_entry *entry);
00988 extern void reset_rt_fun_entries(struct rt_native_fun_entry *entry);
00989 
00990 int __rtai_mq_init(void) 
00991 {
00992     num_pqueues = 0;
00993     mq_mutex_init(&pqueue_mutex, NULL);
00994 #ifdef CONFIG_PROC_FS
00995     pqueue_proc_register();
00996 #endif
00997     printk(KERN_INFO "RTAI[mq]: loaded.\n");
00998     return set_rt_fun_entries(rt_pqueue_entries);
00999     return OK;
01000 }
01001 
01002 void __rtai_mq_exit(void) 
01003 {
01004     mq_mutex_destroy(&pqueue_mutex);
01005     reset_rt_fun_entries(rt_pqueue_entries);
01006 #ifdef CONFIG_PROC_FS
01007     pqueue_proc_unregister();
01008 #endif
01009     printk(KERN_INFO "RTAI[mq]: unloaded.\n");
01010 }
01011 
01012 #ifndef CONFIG_RTAI_MQ_BUILTIN
01013 module_init(__rtai_mq_init);
01014 module_exit(__rtai_mq_exit);
01015 #endif /* !CONFIG_RTAI_MQ_BUILTIN */

Generated on Tue Feb 2 17:46:05 2010 for RTAI API by  doxygen 1.4.7