00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030 #include <linux/module.h>
00031 #include <linux/kernel.h>
00032 #include <linux/version.h>
00033 #include <linux/errno.h>
00034 #include <linux/stat.h>
00035 #include <asm/uaccess.h>
00036 #ifdef CONFIG_PROC_FS
00037 #include <linux/proc_fs.h>
00038 extern struct proc_dir_entry *rtai_proc_root;
00039 #endif
00040 #include <rtai_schedcore.h>
00041 #include <rtai_proc_fs.h>
00042 #include <rtai_signal.h>
00043
00044 MODULE_LICENSE("GPL");
00045
00046 #define mq_cond_t SEM
00047 #define mq_mutex_t SEM
00048 #define mq_mutex_init(mutex, attr) rt_typed_sem_init(mutex, 1, BIN_SEM | PRIO_Q)
00049 #define mq_mutex_unlock rt_sem_signal
00050 #define mq_mutex_lock(mutex) \
00051 do { \
00052 if (abs(rt_sem_wait(mutex)) >= RTE_LOWERR) { \
00053 return -EBADF; \
00054 } \
00055 } while (0)
00056 #define mq_mutex_timedlock(mutex, abstime) \
00057 do { \
00058 RTIME t = timespec2count(abstime); \
00059 int ret; \
00060 if (abs(ret = rt_sem_wait_until(mutex, t)) >= RTE_LOWERR) { \
00061 return ret == RTE_TIMOUT ? -ETIMEDOUT : -EBADF; \
00062 } \
00063 } while (0)
00064 #define mq_mutex_trylock rt_sem_wait_if
00065 #define mq_mutex_destroy rt_sem_delete
00066 #define mq_cond_init(cond, attr) rt_sem_init(cond, 0)
00067 #define mq_cond_wait(cond, mutex) \
00068 do { \
00069 rt_sem_signal(mutex); \
00070 if (abs(rt_sem_wait(cond)) >= RTE_LOWERR) { \
00071 return -EBADF; \
00072 } \
00073 if (abs(rt_sem_wait(mutex)) >= RTE_LOWERR) { \
00074 rt_sem_signal(cond); \
00075 return -EBADF; \
00076 } \
00077 } while (0)
00078 #define mq_cond_timedwait(cond, mutex, abstime) \
00079 do { \
00080 RTIME t = timespec2count(abstime); \
00081 int ret; \
00082 rt_sem_signal(mutex); \
00083 if (abs(ret = rt_sem_wait_until(cond, t)) >= RTE_LOWERR) { \
00084 return ret == RTE_TIMOUT ? -ETIMEDOUT : -EBADF; \
00085 } \
00086 if (abs(ret = rt_sem_wait_until(mutex, t)) >= RTE_LOWERR) { \
00087 rt_sem_signal(cond); \
00088 return ret == RTE_TIMOUT ? -ETIMEDOUT : -EBADF; \
00089 } \
00090 } while (0)
00091 #define mq_cond_signal rt_sem_signal
00092 #define mq_cond_destroy rt_sem_delete
00093
00094 #ifndef OK
00095 #define OK 0
00096 #endif
00097 #ifndef ERROR
00098 #define ERROR -1
00099 #endif
00100
00101
00102
00103
00104
00105 #define MAX_RT_TASKS 128
00106
00107
00108
00109
00110
00111 static int num_pqueues = 0;
00112 static struct _pqueue_descr_struct rt_pqueue_descr[MAX_PQUEUES] = {{0}};
00113 static struct _pqueue_access_struct task_pqueue_access[MAX_RT_TASKS] = {{0}};
00114 static MQ_ATTR default_queue_attrs = { MAX_MSGS, MAX_MSGSIZE, MQ_NONBLOCK, 0 };
00115
00116 static mq_mutex_t pqueue_mutex;
00117
00118
00119
00120
00121
00122 static int name_to_id(char *name)
00123 {
00124 int ind;
00125 for (ind = 0; ind < MAX_PQUEUES; ind++) {
00126 if (rt_pqueue_descr[ind].q_name[0] && !strcmp(rt_pqueue_descr[ind].q_name, name)) {
00127 return ind;
00128 }
00129 }
00130 return ERROR;
00131 }
00132
00133
00134 static inline mq_bool_t is_empty(struct queue_control *q)
00135 {
00136 return !q->attrs.mq_curmsgs;
00137 }
00138
00139
00140 static inline mq_bool_t is_full(struct queue_control *q)
00141 {
00142 return q->attrs.mq_curmsgs == q->attrs.mq_maxmsg;
00143 }
00144
00145
00146 static inline MSG_HDR* getnode(Q_CTRL *queue)
00147 {
00148 return queue->attrs.mq_curmsgs < queue->attrs.mq_maxmsg ? queue->nodes[queue->attrs.mq_curmsgs++] : NULL;
00149 }
00150
00151 static inline int freenode(void *node, Q_CTRL *queue)
00152 {
00153 if (queue->attrs.mq_curmsgs > 0) {
00154 queue->nodes[--queue->attrs.mq_curmsgs] = node;
00155 return 0;
00156 }
00157 return -EINVAL;
00158 }
00159
00160
00161 static void insert_message(Q_CTRL *q, MSG_HDR *this_msg)
00162 {
00163
00164
00165
00166
00167
00168 MSG_HDR *prev, *insertpt;
00169
00170
00171
00172
00173
00174
00175 if (((MSG_HDR *)q->tail)->priority >= this_msg->priority) {
00176 ((MSG_HDR*)q->tail)->next = this_msg;
00177 q->tail = this_msg;
00178 } else {
00179 prev = insertpt = q->head;
00180
00181
00182 while (insertpt->priority >= this_msg->priority) {
00183 prev = insertpt;
00184 insertpt = insertpt->next;
00185 }
00186
00187
00188 if (insertpt == q->head) {
00189 this_msg->next = q->head;
00190 q->head = this_msg;
00191 } else {
00192 this_msg->next = prev->next;
00193 prev->next = this_msg;
00194 }
00195 }
00196 }
00197
00198 #undef mqueues
00199 #define mqueues system_data_ptr
00200
00201 static mq_bool_t is_blocking(MSG_QUEUE *q)
00202 {
00203 int q_ind;
00204 struct _pqueue_access_data *aces;
00205
00206 aces = ((QUEUE_CTRL)_rt_whoami()->mqueues)->q_access;
00207 for (q_ind = 0; q_ind < MQ_OPEN_MAX; q_ind++) {
00208 if (aces[q_ind].q_id == q->q_id) {
00209 return !(aces[q_ind].oflags & O_NONBLOCK);
00210 }
00211 }
00212 return FALSE;
00213 }
00214
00215
00216 static mq_bool_t can_access(MSG_QUEUE *q, Q_ACCESS access)
00217 {
00218 RT_TASK *caller = _rt_whoami();
00219
00220 if (q->owner == caller ? (((access == FOR_READ) && (q->permissions & S_IRUSR)) || ((access == FOR_WRITE) && (q->permissions & S_IWUSR))) : (((access == FOR_READ) && (q->permissions & S_IRGRP)) || ((access == FOR_WRITE) && (q->permissions & S_IWGRP)))) {
00221 int q_ind;
00222 struct _pqueue_access_data *aces;
00223 struct _pqueue_access_struct *task_queue_data_ptr;
00224 int q_access_flags = 0;
00225
00226 task_queue_data_ptr = (QUEUE_CTRL)caller->mqueues;
00227 if (task_queue_data_ptr == NULL) {
00228 return FALSE;
00229 }
00230 aces = task_queue_data_ptr->q_access;
00231 for (q_ind = 0; q_ind < MQ_OPEN_MAX; q_ind++) {
00232 if (aces[q_ind].q_id == q->q_id) {
00233 q_access_flags = aces[q_ind].oflags;
00234 goto set_mode;
00235 }
00236 }
00237 return FALSE;
00238 set_mode: if (access == FOR_WRITE) {
00239 if ((q_access_flags & O_WRONLY) || (q_access_flags & O_RDWR)) {
00240 return TRUE;
00241 }
00242 } else {
00243 return TRUE;
00244 }
00245 }
00246 return FALSE;
00247 }
00248
00249
00250 static inline void initialise_queue(Q_CTRL *q)
00251 {
00252 int msg_size, msg_ind;
00253 void *msg_ptr;
00254
00255 msg_size = q->attrs.mq_msgsize + sizeof(MSG_HDR);
00256 msg_ptr = q->base;
00257 q->nodes = msg_ptr + msg_size*q->attrs.mq_maxmsg;
00258 for (msg_ind = 0; msg_ind < q->attrs.mq_maxmsg; msg_ind++) {
00259 q->nodes[msg_ind] = msg_ptr;
00260 ((MSG_HDR *)msg_ptr)->size = 0;
00261 ((MSG_HDR *)msg_ptr)->priority = MQ_MIN_MSG_PRIORITY;
00262 ((MSG_HDR *)msg_ptr)->next = NULL;
00263 msg_ptr += msg_size;
00264 }
00265 }
00266
00267
00268 static void delete_queue(int q_index)
00269 {
00270 rt_free(rt_pqueue_descr[q_index].data.base);
00271
00272 rt_pqueue_descr[q_index].owner = NULL;
00273 rt_pqueue_descr[q_index].open_count = 0;
00274 strcpy(rt_pqueue_descr[q_index].q_name, "\0");
00275 rt_pqueue_descr[q_index].q_id = INVALID_PQUEUE;
00276 rt_pqueue_descr[q_index].data.base = NULL;
00277 rt_pqueue_descr[q_index].data.head = NULL;
00278 rt_pqueue_descr[q_index].data.tail = NULL;
00279 rt_pqueue_descr[q_index].data.attrs = (MQ_ATTR){ 0, 0, 0, 0 };
00280 rt_pqueue_descr[q_index].permissions = 0;
00281
00282 mq_mutex_unlock(&rt_pqueue_descr[q_index].mutex);
00283 mq_mutex_destroy(&rt_pqueue_descr[q_index].mutex);
00284 mq_cond_destroy(&rt_pqueue_descr[q_index].emp_cond);
00285 mq_cond_destroy(&rt_pqueue_descr[q_index].full_cond);
00286
00287 if (num_pqueues > 0) {
00288 num_pqueues--;
00289 }
00290 }
00291
00292 static void signal_suprt_fun_mq(void *fun_arg)
00293 {
00294 struct suprt_fun_arg { RT_TASK *sigtask; RT_TASK *task; mqd_t mq; } arg = *(struct suprt_fun_arg *)fun_arg;
00295
00296 arg.sigtask = RT_CURRENT;
00297 if (!rt_request_signal_(arg.sigtask, arg.task, (arg.mq + MAXSIGNALS))) {
00298 while (rt_wait_signal(arg.sigtask, arg.task)) {
00299 rt_pqueue_descr[arg.mq - 1].notify.data._sigev_un._sigev_thread._function((sigval_t)rt_pqueue_descr[arg.mq - 1].notify.data.sigev_value.sival_ptr);
00300 }
00301 } else {
00302 rt_task_resume(arg.task);
00303 }
00304 }
00305
00306 int rt_request_signal_mq(mqd_t mq)
00307 {
00308 RT_TASK *sigtask;
00309 struct suprt_fun_arg { RT_TASK *sigtask; RT_TASK *task; mqd_t mq; } arg = { NULL, rt_whoami(), mq };
00310 if ((sigtask = rt_malloc(sizeof(RT_TASK)))) {
00311 rt_task_init_cpuid(sigtask, (void *)signal_suprt_fun_mq, (long)&arg, SIGNAL_TASK_STACK_SIZE, arg.task->priority, 0, 0, RT_CURRENT->runnable_on_cpus);
00312 rt_task_resume(sigtask);
00313 rt_task_suspend(arg.task);
00314 return arg.task->retval;
00315 }
00316 return -EINVAL;
00317 }
00318
00319
00320
00321
00322
00323 RTAI_SYSCALL_MODE mqd_t _mq_open(char *mq_name, int oflags, mode_t permissions, struct mq_attr *mq_attr, long space)
00324 {
00325 int q_index, t_index, q_ind;
00326 int spare_count = 0, first_spare = 0;
00327 mq_bool_t q_found = FALSE;
00328 RT_TASK *this_task = _rt_whoami();
00329 struct _pqueue_access_struct *task_data_ptr;
00330
00331 task_data_ptr = (QUEUE_CTRL)this_task->mqueues;
00332
00333 mq_mutex_lock(&pqueue_mutex);
00334 if ((q_index = name_to_id(mq_name)) >= 0) {
00335
00336
00337
00338 if ((oflags & O_CREAT) && (oflags & O_EXCL)) {
00339 mq_mutex_unlock(&pqueue_mutex);
00340 return -EEXIST;
00341 }
00342 if (task_data_ptr == NULL) {
00343 for (t_index = 0; t_index < MAX_RT_TASKS; t_index++) {
00344 if (task_pqueue_access[t_index].this_task == NULL) {
00345 task_data_ptr = &(task_pqueue_access[t_index]);
00346 task_data_ptr->this_task = this_task;
00347 this_task->mqueues = task_data_ptr;
00348 break;
00349 }
00350 }
00351 if (t_index == MAX_RT_TASKS) {
00352 mq_mutex_unlock(&pqueue_mutex);
00353 return -ENOMEM;
00354 }
00355 }
00356
00357
00358
00359
00360
00361 mq_mutex_lock(&rt_pqueue_descr[q_index].mutex);
00362 for (q_ind = 0; q_ind < MQ_OPEN_MAX; q_ind++) {
00363 if (task_data_ptr->q_access[q_ind].q_id == rt_pqueue_descr[q_index].q_id) {
00364 q_found = TRUE;
00365 break;
00366 } else if(task_data_ptr->q_access[q_ind].q_id == INVALID_PQUEUE) {
00367 if (spare_count == 0) {
00368 first_spare = q_ind;
00369 }
00370 spare_count++;
00371 }
00372 }
00373
00374
00375 if (!q_found && spare_count == 0) {
00376 mq_mutex_unlock(&rt_pqueue_descr[q_index].mutex);
00377 mq_mutex_unlock(&pqueue_mutex);
00378 return -EINVAL;
00379 }
00380
00381
00382 if (!q_found) {
00383
00384 task_data_ptr->n_open_pqueues++;
00385 q_ind = first_spare;
00386 }
00387 task_data_ptr->q_access[q_ind].q_id = rt_pqueue_descr[q_index].q_id;
00388 task_data_ptr->q_access[q_ind].oflags = oflags;
00389 mq_mutex_unlock(&rt_pqueue_descr[q_index].mutex);
00390 } else if (oflags & O_CREAT) {
00391
00392
00393
00394 if(num_pqueues >= MAX_PQUEUES) {
00395 mq_mutex_unlock(&pqueue_mutex);
00396 return -ENOMEM;
00397 }
00398
00399 if( strlen(mq_name) >= MQ_NAME_MAX) {
00400 mq_mutex_unlock(&pqueue_mutex);
00401 return -ENAMETOOLONG;
00402 }
00403
00404
00405
00406
00407 if (task_data_ptr == NULL) {
00408
00409 for (t_index = 0; t_index < MAX_RT_TASKS; t_index++) {
00410 if (task_pqueue_access[t_index].this_task == NULL) {
00411 task_data_ptr = &task_pqueue_access[t_index];
00412 task_data_ptr->this_task = this_task;
00413 this_task->mqueues = task_data_ptr;
00414 break;
00415 }
00416 }
00417 if (t_index == MAX_RT_TASKS) {
00418 mq_mutex_unlock(&pqueue_mutex);
00419 return -ENOMEM;
00420 }
00421 } else if (task_data_ptr->n_open_pqueues >= MQ_OPEN_MAX) {
00422 mq_mutex_unlock(&pqueue_mutex);
00423 return -EINVAL;
00424 }
00425
00426 if (mq_attr == NULL) {
00427 mq_attr = &default_queue_attrs;
00428 }
00429
00430 for (q_index = 0; q_index < MAX_PQUEUES; q_index++) {
00431 if (rt_pqueue_descr[q_index].q_id == INVALID_PQUEUE) {
00432 int msg_size, queue_size;
00433 void *mem_ptr;
00434
00435 msg_size = mq_attr->mq_msgsize + sizeof(MSG_HDR);
00436 queue_size = (msg_size + sizeof(void *))*mq_attr->mq_maxmsg;
00437 mem_ptr = rt_malloc(queue_size);
00438 if(mem_ptr == NULL) {
00439 mq_mutex_unlock(&pqueue_mutex);
00440 return -ENOMEM;
00441 }
00442 rt_pqueue_descr[q_index].data.base = mem_ptr;
00443
00444 rt_pqueue_descr[q_index].owner = this_task;
00445 rt_pqueue_descr[q_index].open_count = 0;
00446 strcpy(rt_pqueue_descr[q_index].q_name, mq_name);
00447 rt_pqueue_descr[q_index].q_id = q_index + 1;
00448 rt_pqueue_descr[q_index].marked_for_deletion = FALSE;
00449 rt_pqueue_descr[q_index].data.head =
00450 rt_pqueue_descr[q_index].data.tail = rt_pqueue_descr[q_index].data.base;
00451 rt_pqueue_descr[q_index].data.attrs = *(mq_attr);
00452 rt_pqueue_descr[q_index].data.attrs.mq_curmsgs = 0;
00453 rt_pqueue_descr[q_index].permissions = permissions;
00454
00455 mq_cond_init(&rt_pqueue_descr[q_index].emp_cond, NULL);
00456 mq_cond_init(&rt_pqueue_descr[q_index].full_cond, NULL);
00457 mq_mutex_init(&rt_pqueue_descr[q_index].mutex, NULL);
00458
00459
00460 initialise_queue(&rt_pqueue_descr[q_index].data);
00461
00462 q_ind = task_data_ptr->n_open_pqueues++;
00463 task_data_ptr->q_access[q_ind].q_id = q_index + 1;
00464 task_data_ptr->q_access[q_ind].oflags = oflags;
00465 break;
00466 }
00467 }
00468 if(q_index >= MAX_PQUEUES) {
00469 mq_mutex_unlock(&pqueue_mutex);
00470 return -EMFILE;
00471 }
00472 num_pqueues++;
00473 } else {
00474
00475
00476
00477 mq_mutex_unlock(&pqueue_mutex);
00478 return -ENOENT;
00479 }
00480
00481
00482 rt_pqueue_descr[q_index].open_count++;
00483 mq_mutex_unlock(&pqueue_mutex);
00484
00485
00486 if ((oflags & O_NOTIFY_NP) && space == 0) {
00487 rt_request_signal_mq(rt_pqueue_descr[q_index].q_id);
00488 }
00489
00490 return (mqd_t)rt_pqueue_descr[q_index].q_id;
00491 }
00492 EXPORT_SYMBOL(_mq_open);
00493
00494 RTAI_SYSCALL_MODE size_t _mq_receive(mqd_t mq, char *msg_buffer, size_t buflen, unsigned int *msgprio, int space)
00495 {
00496 int q_index = mq - 1, size;
00497 MQMSG *msg_ptr;
00498 MSG_QUEUE *q;
00499
00500 if (q_index < 0 || q_index >= MAX_PQUEUES) {
00501 return -EBADF;
00502 }
00503 q = &rt_pqueue_descr[q_index];
00504 if (buflen < q->data.attrs.mq_msgsize) {
00505 return -EMSGSIZE;
00506 }
00507 if (can_access(q, FOR_READ) == FALSE) {
00508 return -EINVAL;
00509 }
00510 if (is_blocking(q)) {
00511 mq_mutex_lock(&q->mutex);
00512 } else if (mq_mutex_trylock(&q->mutex) <= 0) {
00513 return -EAGAIN;
00514 }
00515 while (is_empty(&q->data)) {
00516 if (is_blocking(q)) {
00517 mq_cond_wait(&q->emp_cond, &q->mutex);
00518 } else {
00519 mq_mutex_unlock(&q->mutex);
00520 return -EAGAIN;
00521 }
00522 }
00523 msg_ptr = q->data.head;
00524 if (msg_ptr->hdr.size <= buflen) {
00525 size = msg_ptr->hdr.size;
00526 if (space) {
00527 memcpy(msg_buffer, &msg_ptr->data, size);
00528 if (msgprio) {
00529 *msgprio = msg_ptr->hdr.priority;
00530 }
00531 } else {
00532 rt_copy_to_user(msg_buffer, &msg_ptr->data, size);
00533 if (msgprio) {
00534 rt_put_user(msg_ptr->hdr.priority, msgprio);
00535 }
00536 }
00537 } else {
00538 size = ERROR;
00539 }
00540 q->data.head = msg_ptr->hdr.next;
00541 msg_ptr->hdr.size = 0;
00542 msg_ptr->hdr.next = NULL;
00543 freenode(msg_ptr, &q->data);
00544 if(q->data.head == NULL) {
00545 q->data.head = q->data.tail = q->data.nodes[0];
00546 }
00547 mq_cond_signal(&q->full_cond);
00548 mq_mutex_unlock(&q->mutex);
00549 return size;
00550 }
00551 EXPORT_SYMBOL(_mq_receive);
00552
00553 RTAI_SYSCALL_MODE size_t _mq_timedreceive(mqd_t mq, char *msg_buffer, size_t buflen, unsigned int *msgprio, const struct timespec *abstime, int space)
00554 {
00555 int q_index = mq - 1, size;
00556 MQMSG *msg_ptr;
00557 MSG_QUEUE *q;
00558 struct timespec time;
00559
00560 if (q_index < 0 || q_index >= MAX_PQUEUES) {
00561 return -EBADF;
00562 }
00563 q = &rt_pqueue_descr[q_index];
00564 if (buflen < q->data.attrs.mq_msgsize) {
00565 return -EMSGSIZE;
00566 }
00567 if (can_access(q, FOR_READ) == FALSE) {
00568 return -EINVAL;
00569 }
00570 if (!space) {
00571 rt_copy_from_user(&time, abstime, sizeof(struct timespec));
00572 abstime = &time;
00573 }
00574 if (is_blocking(q)) {
00575 mq_mutex_timedlock(&q->mutex, abstime);
00576 } else if (mq_mutex_trylock(&q->mutex) <= 0) {
00577 return -EAGAIN;
00578 }
00579 while (is_empty(&q->data)) {
00580 if (is_blocking(q)) {
00581 mq_cond_timedwait(&q->emp_cond, &q->mutex, abstime);
00582 } else {
00583 return -EAGAIN;
00584 }
00585 }
00586 msg_ptr = q->data.head;
00587 if (msg_ptr->hdr.size <= buflen) {
00588 size = msg_ptr->hdr.size;
00589 if (space) {
00590 memcpy(msg_buffer, &msg_ptr->data, size);
00591 if (msgprio) {
00592 *msgprio = msg_ptr->hdr.priority;
00593 }
00594 } else {
00595 rt_copy_to_user(msg_buffer, &msg_ptr->data, size);
00596 if (msgprio) {
00597 rt_put_user(msg_ptr->hdr.priority, msgprio);
00598 }
00599 }
00600 } else {
00601 size = ERROR;
00602 }
00603 q->data.head = msg_ptr->hdr.next;
00604 msg_ptr->hdr.size = 0;
00605 msg_ptr->hdr.next = NULL;
00606 freenode(msg_ptr, &q->data);
00607 if(q->data.head == NULL) {
00608 q->data.head = q->data.tail = q->data.nodes[0];
00609 }
00610 mq_cond_signal(&q->full_cond);
00611 mq_mutex_unlock(&q->mutex);
00612 return size;
00613 }
00614 EXPORT_SYMBOL(_mq_timedreceive);
00615
00616 RTAI_SYSCALL_MODE int _mq_send(mqd_t mq, const char *msg, size_t msglen, unsigned int msgprio, int space)
00617 {
00618 int q_index = mq - 1;
00619 MSG_QUEUE *q;
00620 MSG_HDR *this_msg;
00621 mq_bool_t q_was_empty;
00622
00623 if (q_index < 0 || q_index >= MAX_PQUEUES) {
00624 return -EBADF;
00625 }
00626 q = &rt_pqueue_descr[q_index];
00627 if( can_access(q, FOR_WRITE) == FALSE) {
00628 return -EINVAL;
00629 }
00630 if(msgprio > MQ_PRIO_MAX) {
00631 return -EINVAL;
00632 }
00633 if (is_blocking(q)) {
00634 mq_mutex_lock(&q->mutex);
00635 } else if (mq_mutex_trylock(&q->mutex) <= 0) {
00636 return -EAGAIN;
00637 }
00638 q_was_empty = is_empty(&q->data);
00639 while (is_full(&q->data)) {
00640 if (is_blocking(q)) {
00641 mq_cond_wait(&q->full_cond, &q->mutex);
00642 } else {
00643 mq_mutex_unlock(&q->mutex);
00644 return -EAGAIN;
00645 }
00646 }
00647 if( (this_msg = getnode(&q->data)) == NULL) {
00648 mq_mutex_unlock(&q->mutex);
00649 return -ENOBUFS;
00650 }
00651 if (msglen > q->data.attrs.mq_msgsize) {
00652 mq_mutex_unlock(&q->mutex);
00653 return -EMSGSIZE;
00654 }
00655 this_msg->size = msglen;
00656 this_msg->priority = msgprio;
00657 if (space) {
00658 memcpy(&((MQMSG *)this_msg)->data, msg, msglen);
00659 } else {
00660 rt_copy_from_user(&((MQMSG *)this_msg)->data, msg, msglen);
00661 }
00662 insert_message(&q->data, this_msg);
00663 mq_cond_signal(&q->emp_cond);
00664 if(q_was_empty && rt_pqueue_descr[q_index].notify.task != NULL) {
00665 rt_trigger_signal((MAXSIGNALS + mq), rt_pqueue_descr[q_index].notify.task);
00666 rt_pqueue_descr[q_index].notify.task = NULL;
00667 }
00668 mq_mutex_unlock(&q->mutex);
00669 return msglen;
00670 }
00671 EXPORT_SYMBOL(_mq_send);
00672
00673 RTAI_SYSCALL_MODE int _mq_timedsend(mqd_t mq, const char *msg, size_t msglen, unsigned int msgprio, const struct timespec *abstime, int space)
00674 {
00675 int q_index = mq - 1;
00676 MSG_QUEUE *q;
00677 MSG_HDR *this_msg;
00678 mq_bool_t q_was_empty;
00679 struct timespec time;
00680
00681 if (q_index < 0 || q_index >= MAX_PQUEUES) {
00682 return -EBADF;
00683 }
00684 q = &rt_pqueue_descr[q_index];
00685 if (can_access(q, FOR_WRITE) == FALSE) {
00686 return -EINVAL;
00687 }
00688 if (msgprio > MQ_PRIO_MAX) {
00689 return -EINVAL;
00690 }
00691 if (!space) {
00692 rt_copy_from_user(&time, abstime, sizeof(struct timespec));
00693 abstime = &time;
00694 }
00695 if (is_blocking(q)) {
00696 mq_mutex_timedlock(&q->mutex, abstime);
00697 } else if (mq_mutex_trylock(&q->mutex) <= 0) {
00698 return -EAGAIN;
00699 }
00700 q_was_empty = is_empty(&q->data);
00701 while (is_full(&q->data)) {
00702 if (is_blocking(q)) {
00703 mq_cond_timedwait(&q->full_cond, &q->mutex, abstime);
00704 } else {
00705 mq_mutex_unlock(&q->mutex);
00706 return -EAGAIN;
00707 }
00708 }
00709 if ((this_msg = getnode(&q->data)) == NULL) {
00710 mq_mutex_unlock(&q->mutex);
00711 return -ENOBUFS;
00712 }
00713 if (msglen > q->data.attrs.mq_msgsize) {
00714 mq_mutex_unlock(&q->mutex);
00715 return -EMSGSIZE;
00716 }
00717 this_msg->size = msglen;
00718 this_msg->priority = msgprio;
00719 if (space) {
00720 memcpy(&((MQMSG *)this_msg)->data, msg, msglen);
00721 } else {
00722 rt_copy_from_user(&((MQMSG *)this_msg)->data, msg, msglen);
00723 }
00724 insert_message(&q->data, this_msg);
00725 mq_cond_signal(&q->emp_cond);
00726 if (q_was_empty && rt_pqueue_descr[q_index].notify.task != NULL) {
00727 rt_trigger_signal((MAXSIGNALS + mq), rt_pqueue_descr[q_index].notify.task);
00728 rt_pqueue_descr[q_index].notify.task = NULL;
00729 }
00730 mq_mutex_unlock(&q->mutex);
00731 return msglen;
00732
00733 }
00734 EXPORT_SYMBOL(_mq_timedsend);
00735
00736 RTAI_SYSCALL_MODE int mq_close(mqd_t mq)
00737 {
00738 int q_index = mq - 1;
00739 int q_ind;
00740 RT_TASK *this_task = _rt_whoami();
00741 struct _pqueue_access_struct *task_queue_data_ptr;
00742
00743 if (q_index < 0 || q_index >= MAX_PQUEUES) {
00744 return -EINVAL;
00745 }
00746 task_queue_data_ptr = (QUEUE_CTRL)this_task->mqueues;
00747 if (task_queue_data_ptr == NULL ) {
00748 return -EINVAL;
00749 }
00750 mq_mutex_lock(&pqueue_mutex);
00751 for (q_ind = 0; q_ind < MQ_OPEN_MAX; q_ind++) {
00752 if (task_queue_data_ptr->q_access[q_ind].q_id == mq) {
00753 task_queue_data_ptr->q_access[q_ind].q_id = INVALID_PQUEUE;
00754 task_queue_data_ptr->q_access[q_ind].usp_notifier = NULL;
00755 rt_release_signal((mq + MAXSIGNALS), task_queue_data_ptr->this_task);
00756 task_queue_data_ptr->n_open_pqueues--;
00757 break;
00758 }
00759 }
00760 if (q_ind == MQ_OPEN_MAX) {
00761 mq_mutex_unlock(&pqueue_mutex);
00762 return -EINVAL;
00763 }
00764 mq_mutex_lock(&rt_pqueue_descr[q_index].mutex);
00765 if (rt_pqueue_descr[q_index].notify.task == this_task) {
00766 rt_pqueue_descr[q_index].notify.task = NULL;
00767 }
00768 if (--rt_pqueue_descr[q_index].open_count <= 0 &&
00769 rt_pqueue_descr[q_index].marked_for_deletion == TRUE ) {
00770 delete_queue(q_index);
00771 }
00772 mq_mutex_unlock(&rt_pqueue_descr[q_index].mutex);
00773 mq_mutex_unlock(&pqueue_mutex);
00774 return OK;
00775 }
00776 EXPORT_SYMBOL(mq_close);
00777
00778 RTAI_SYSCALL_MODE int mq_getattr(mqd_t mq, struct mq_attr *attrbuf)
00779 {
00780 int q_index = mq - 1;
00781
00782 if (0 <= q_index && q_index < MAX_PQUEUES) {
00783 *attrbuf = rt_pqueue_descr[q_index].data.attrs;
00784 return OK;
00785 }
00786 return -EBADF;
00787 }
00788 EXPORT_SYMBOL(mq_getattr);
00789
00790 RTAI_SYSCALL_MODE int mq_setattr(mqd_t mq, const struct mq_attr *new_attrs, struct mq_attr *old_attrs)
00791 {
00792 int q_index = mq - 1;
00793 int q_ind;
00794 RT_TASK *this_task = _rt_whoami();
00795 struct _pqueue_access_struct *task_queue_data_ptr;
00796
00797 if (q_index < 0 || q_index >= MAX_PQUEUES) {
00798 return -EBADF;
00799 }
00800 if (old_attrs != NULL) {
00801 *old_attrs = rt_pqueue_descr[q_index].data.attrs;
00802 }
00803 task_queue_data_ptr = (QUEUE_CTRL)this_task->mqueues;
00804 if (task_queue_data_ptr == NULL) {
00805 return -EINVAL;
00806 }
00807 for (q_ind = 0; q_ind < MQ_OPEN_MAX; q_ind++) {
00808 if (task_queue_data_ptr->q_access[q_ind].q_id == mq) {
00809 if(new_attrs->mq_flags == MQ_NONBLOCK) {
00810 task_queue_data_ptr->q_access[q_ind].oflags |= O_NONBLOCK;
00811 } else if (new_attrs->mq_flags == MQ_BLOCK) {
00812 task_queue_data_ptr->q_access[q_ind].oflags &= ~O_NONBLOCK;
00813 } else {
00814 return -EINVAL;
00815 }
00816 break;
00817 }
00818 }
00819 if (q_ind == MQ_OPEN_MAX) {
00820 return -EINVAL;
00821 }
00822 mq_mutex_lock(&rt_pqueue_descr[q_index].mutex);
00823 rt_pqueue_descr[q_index].data.attrs.mq_flags = new_attrs->mq_flags;
00824 mq_mutex_unlock(&rt_pqueue_descr[q_index].mutex);
00825 return OK;
00826 }
00827 EXPORT_SYMBOL(mq_setattr);
00828
00829 RTAI_SYSCALL_MODE int mq_reg_usp_notifier(mqd_t mq, RT_TASK *task, struct sigevent *usp_notification)
00830 {
00831 mq_mutex_lock(&rt_pqueue_descr[mq -1].mutex);
00832 ((QUEUE_CTRL)task->mqueues)->q_access[mq -1].usp_notifier = usp_notification;
00833 rt_copy_to_user(usp_notification, &rt_pqueue_descr[mq -1].notify.data, sizeof(struct sigevent));
00834 mq_mutex_unlock(&rt_pqueue_descr[mq -1].mutex);
00835 return 0;
00836 }
00837
00838 RTAI_SYSCALL_MODE int _mq_notify(mqd_t mq, RT_TASK *task, long space, long rem, const struct sigevent *notification)
00839 {
00840 int q_index = mq - 1;
00841 int rtn;
00842 if (q_index < 0 || q_index >= MAX_PQUEUES) {
00843 return -EBADF;
00844 }
00845 if (rem) {
00846 if (rt_pqueue_descr[q_index].notify.task == task) {
00847 mq_mutex_lock(&rt_pqueue_descr[q_index].mutex);
00848 rt_pqueue_descr[q_index].notify.task = NULL;
00849 mq_mutex_unlock(&rt_pqueue_descr[q_index].mutex);
00850 return OK;
00851 } else {
00852 return -EBUSY;
00853 }
00854 }
00855 if (!space && !task->rt_signals) {
00856 rt_request_signal_mq(mq);
00857 } else if (!space && !((struct rt_signal_t *)task->rt_signals)[MAXSIGNALS + mq].sigtask) {
00858 rt_request_signal_mq(mq);
00859 }
00860 if (!space && (notification->sigev_notify != SIGEV_THREAD)){
00861 return ERROR;
00862 }
00863 mq_mutex_lock(&rt_pqueue_descr[q_index].mutex);
00864 if (rt_pqueue_descr[q_index].notify.task == NULL) {
00865 rt_pqueue_descr[q_index].notify.task = task;
00866 rt_pqueue_descr[q_index].notify.data = *notification;
00867 if (space) {
00868 if (((QUEUE_CTRL)task->mqueues)->q_access[mq -1].usp_notifier) {
00869 rt_copy_to_user(((QUEUE_CTRL)task->mqueues)->q_access[mq -1].usp_notifier, &rt_pqueue_descr[mq -1].notify.data, sizeof(struct sigevent));
00870 rtn = OK;
00871 } else {
00872 rtn = O_NOTIFY_NP;
00873 }
00874 } else {
00875 rtn = OK;
00876 }
00877 } else {
00878 rtn = -EBUSY;
00879 }
00880 mq_mutex_unlock(&rt_pqueue_descr[q_index].mutex);
00881 return rtn;
00882 }
00883 EXPORT_SYMBOL(_mq_notify);
00884
00885 RTAI_SYSCALL_MODE int mq_unlink(char *mq_name)
00886 {
00887 int q_index, rtn;
00888
00889 mq_mutex_lock(&pqueue_mutex);
00890 q_index = name_to_id(mq_name);
00891
00892 if (q_index < 0) {
00893 mq_mutex_unlock(&pqueue_mutex);
00894 return -ENOENT;
00895 }
00896 mq_mutex_lock(&rt_pqueue_descr[q_index].mutex);
00897 if (rt_pqueue_descr[q_index].open_count > 0) {
00898 strcpy(rt_pqueue_descr[q_index].q_name, "\0");
00899 rt_pqueue_descr[q_index].marked_for_deletion = TRUE;
00900 rtn = rt_pqueue_descr[q_index].open_count;
00901 } else {
00902 delete_queue(q_index);
00903 rtn = OK;
00904 }
00905 mq_mutex_unlock(&rt_pqueue_descr[q_index].mutex);
00906 mq_mutex_unlock(&pqueue_mutex);
00907 return rtn;
00908 }
00909 EXPORT_SYMBOL(mq_unlink);
00910
00911
00912
00913
00914
00915 #ifdef CONFIG_PROC_FS
00916
00917 static int pqueue_read_proc(char *page, char **start, off_t off, int count,
00918 int *eof, void *data)
00919 {
00920 PROC_PRINT_VARS;
00921 int ind;
00922
00923 PROC_PRINT("\nRTAI Posix Queue Status\n");
00924 PROC_PRINT("-----------------------\n\n");
00925 PROC_PRINT("MAX_PQUEUES = %2d (system wide)\n", MAX_PQUEUES);
00926 PROC_PRINT("MQ_OPEN_MAX = %2d (per RT task)\n", MQ_OPEN_MAX);
00927 PROC_PRINT("MQ_NAME_MAX = %d\n", MQ_NAME_MAX);
00928
00929 PROC_PRINT("\nID NOpen NMsgs MaxMsgs MaxSz Perms Del Name\n");
00930 PROC_PRINT("--------------------------------------------------------------------------------\n");
00931 for (ind = 0; ind < MAX_PQUEUES; ind++) {
00932 if (rt_pqueue_descr[ind].q_name[0] || rt_pqueue_descr[ind].open_count) {
00933 PROC_PRINT( "%-3d %-6d ",
00934 rt_pqueue_descr[ind].q_id,
00935 rt_pqueue_descr[ind].open_count
00936 );
00937 PROC_PRINT( "%-6ld %-6ld %-5ld ",
00938 rt_pqueue_descr[ind].data.attrs.mq_curmsgs,
00939 rt_pqueue_descr[ind].data.attrs.mq_maxmsg,
00940 rt_pqueue_descr[ind].data.attrs.mq_msgsize
00941 );
00942 PROC_PRINT( "%-4o %c %s\n",
00943 rt_pqueue_descr[ind].permissions,
00944 rt_pqueue_descr[ind].marked_for_deletion ? '*' : ' ',
00945 rt_pqueue_descr[ind].q_name
00946 );
00947 }
00948 }
00949 PROC_PRINT_DONE;
00950 }
00951
00952 static struct proc_dir_entry *proc_rtai_pqueue;
00953
00954 static int pqueue_proc_register(void)
00955 {
00956 proc_rtai_pqueue = create_proc_entry("pqueue", 0, rtai_proc_root);
00957 proc_rtai_pqueue->read_proc = pqueue_read_proc;
00958 return 0;
00959 }
00960
00961 static int pqueue_proc_unregister(void)
00962 {
00963 remove_proc_entry("pqueue", rtai_proc_root);
00964 return 0;
00965 }
00966 #endif
00967
00968
00969
00970
00971
00972 struct rt_native_fun_entry rt_pqueue_entries[] = {
00973 { { UR1(1, 5) | UR2(4, 6), _mq_open }, MQ_OPEN },
00974 { { 1, _mq_receive }, MQ_RECEIVE },
00975 { { 1, _mq_send }, MQ_SEND },
00976 { { 1, mq_close }, MQ_CLOSE },
00977 { { UW1(2, 3), mq_getattr }, MQ_GETATTR },
00978 { { UR1(2, 4) | UW1(3, 4), mq_setattr }, MQ_SETATTR },
00979 { { UR1(5, 6), _mq_notify }, MQ_NOTIFY },
00980 { { UR1(1, 2), mq_unlink }, MQ_UNLINK },
00981 { { 1, _mq_timedreceive }, MQ_TIMEDRECEIVE },
00982 { { 1, _mq_timedsend }, MQ_TIMEDSEND },
00983 { { 1, mq_reg_usp_notifier }, MQ_REG_USP_NOTIFIER },
00984 { { 0, 0 }, 000 }
00985 };
00986
00987 extern int set_rt_fun_entries(struct rt_native_fun_entry *entry);
00988 extern void reset_rt_fun_entries(struct rt_native_fun_entry *entry);
00989
00990 int __rtai_mq_init(void)
00991 {
00992 num_pqueues = 0;
00993 mq_mutex_init(&pqueue_mutex, NULL);
00994 #ifdef CONFIG_PROC_FS
00995 pqueue_proc_register();
00996 #endif
00997 printk(KERN_INFO "RTAI[mq]: loaded.\n");
00998 return set_rt_fun_entries(rt_pqueue_entries);
00999 return OK;
01000 }
01001
01002 void __rtai_mq_exit(void)
01003 {
01004 mq_mutex_destroy(&pqueue_mutex);
01005 reset_rt_fun_entries(rt_pqueue_entries);
01006 #ifdef CONFIG_PROC_FS
01007 pqueue_proc_unregister();
01008 #endif
01009 printk(KERN_INFO "RTAI[mq]: unloaded.\n");
01010 }
01011
01012 #ifndef CONFIG_RTAI_MQ_BUILTIN
01013 module_init(__rtai_mq_init);
01014 module_exit(__rtai_mq_exit);
01015 #endif