00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
#include <linux/module.h>
00031
#include <linux/kernel.h>
00032
#include <linux/version.h>
00033
#include <linux/errno.h>
00034
#include <linux/stat.h>
00035
#include <asm/uaccess.h>
00036
#ifdef CONFIG_PROC_FS
00037
#include <linux/proc_fs.h>
00038
extern struct proc_dir_entry *
rtai_proc_root;
00039
#endif
00040
#include <rtai_schedcore.h>
00041
#include <rtai_proc_fs.h>
00042
00043
MODULE_LICENSE(
"GPL");
00044
00045 #define pthread_cond_t SEM
00046 #define pthread_mutex_t SEM
00047 #define pthread_mutex_init(mutex, attr) rt_mutex_init(mutex)
00048 #define pthread_mutex_unlock rt_mutex_unlock
00049 #define pthread_mutex_lock rt_mutex_lock
00050 #define pthread_mutex_destroy rt_mutex_destroy
00051 #define pthread_cond_init(cond, attr) rt_cond_init(cond)
00052 #define pthread_cond_wait rt_cond_wait
00053 #define pthread_cond_signal rt_cond_signal
00054 #define pthread_cond_destroy rt_cond_destroy
00055
00056
#ifndef OK
00057 #define OK 0
00058
#endif
00059
#ifndef ERROR
00060 #define ERROR -1
00061
#endif
00062
00063
00064
00065
00066
00067 #define MAX_RT_TASKS 128
00068
00069
00070
00071
00072
00073 static int num_pqueues = 0;
00074 static struct _pqueue_descr_struct
rt_pqueue_descr[
MAX_PQUEUES] = {{0}};
00075 static struct _pqueue_access_struct
task_pqueue_access[
MAX_RT_TASKS] = {{0}};
00076 static MQ_ATTR default_queue_attrs = {
MAX_MSGS,
MAX_MSGSIZE,
MQ_NONBLOCK, 0 };
00077
00078 static pthread_mutex_t pqueue_mutex;
00079
00080
00081
00082
00083
00084 static int name_to_id(
char *name)
00085 {
00086
int ind;
00087
for (ind = 0; ind <
MAX_PQUEUES; ind++) {
00088
if ((strcmp(
rt_pqueue_descr[ind].q_name,
"") != 0) &&
00089 (strcmp(
rt_pqueue_descr[ind].q_name, name) == 0)) {
00090
return ind;
00091 }
00092 }
00093
return ERROR;
00094 }
00095
00096
00097 static inline mq_bool_t
is_empty(
struct queue_control *q)
00098 {
00099
return q->attrs.mq_curmsgs == 0 ? TRUE : FALSE;
00100 }
00101
00102
00103 static inline mq_bool_t
is_full(
struct queue_control *q)
00104 {
00105
return q->attrs.mq_curmsgs == q->attrs.mq_maxmsg ? TRUE : FALSE;
00106 }
00107
00108
00109 static inline MSG_HDR*
getnode(Q_CTRL *queue)
00110 {
00111
return queue->nodind < queue->attrs.mq_maxmsg ? queue->nodes[queue->nodind++] : NULL;
00112 }
00113
00114 static inline int freenode(
void *node, Q_CTRL *queue)
00115 {
00116
if (queue->nodind > 0) {
00117 queue->nodes[--queue->nodind] = node;
00118
return 0;
00119 }
00120
return -EINVAL;
00121 }
00122
00123
00124 static void insert_message(Q_CTRL *q, MSG_HDR *this_msg)
00125 {
00126
00127
00128
00129
00130
00131 MSG_HDR *prev, *insertpt;
00132
00133
00134
00135
00136
00137
00138
if (((MSG_HDR *)q->tail)->priority >= this_msg->priority) {
00139 ((MSG_HDR*)q->tail)->next = this_msg;
00140 q->tail = this_msg;
00141 }
else {
00142 prev = insertpt = q->head;
00143
00144
00145
while (insertpt->priority >= this_msg->priority) {
00146 prev = insertpt;
00147 insertpt = insertpt->next;
00148 }
00149
00150
00151
if (insertpt == q->head) {
00152 this_msg->next = q->head;
00153 q->head = this_msg;
00154 }
else {
00155 this_msg->next = prev->next;
00156 prev->next = this_msg;
00157 }
00158 }
00159 }
00160
00161
#undef mqueues
00162 #define mqueues system_data_ptr
00163
00164 static mq_bool_t
is_blocking(MSG_QUEUE *q)
00165 {
00166
int q_ind;
00167
struct _pqueue_access_data *aces;
00168
00169 aces = ((QUEUE_CTRL)_rt_whoami()->mqueues)->q_access;
00170
for (q_ind = 0; q_ind <
MQ_OPEN_MAX; q_ind++) {
00171
if (aces[q_ind].q_id == q->q_id) {
00172
return (aces[q_ind].oflags & O_NONBLOCK) ? FALSE : TRUE;
00173 }
00174 }
00175
return FALSE;
00176 }
00177
00178
00179 static mq_bool_t
can_access(MSG_QUEUE *q, Q_ACCESS access)
00180 {
00181
RT_TASK *caller = _rt_whoami();
00182
00183
if (q->owner == caller ? (((access == FOR_READ) && (q->permissions & S_IRUSR)) || ((access == FOR_WRITE) && (q->permissions & S_IWUSR))) : (((access == FOR_READ) && (q->permissions & S_IRGRP)) || ((access == FOR_WRITE) && (q->permissions & S_IWGRP)))) {
00184
int q_ind;
00185
struct _pqueue_access_data *aces;
00186
struct _pqueue_access_struct *task_queue_data_ptr;
00187
int q_access_flags = 0;
00188
00189 task_queue_data_ptr = (QUEUE_CTRL)caller->mqueues;
00190
if (task_queue_data_ptr == NULL) {
00191
return FALSE;
00192 }
00193 aces = task_queue_data_ptr->q_access;
00194
for (q_ind = 0; q_ind <
MQ_OPEN_MAX; q_ind++) {
00195
if (aces[q_ind].q_id == q->q_id) {
00196 q_access_flags = aces[q_ind].oflags;
00197
goto set_mode;
00198 }
00199 }
00200
return FALSE;
00201 set_mode:
if (access == FOR_WRITE) {
00202
if ((q_access_flags & O_WRONLY) || (q_access_flags & O_RDWR)) {
00203
return TRUE;
00204 }
00205 }
else {
00206
return TRUE;
00207 }
00208 }
00209
return FALSE;
00210 }
00211
00212
00213 static inline void initialise_queue(Q_CTRL *q)
00214 {
00215
int msg_size, msg_ind;
00216
void *msg_ptr;
00217
00218 msg_size = q->attrs.mq_msgsize +
sizeof(MSG_HDR);
00219 msg_ptr = q->base;
00220 q->nodind = 0;
00221 q->nodes = msg_ptr + msg_size*q->attrs.mq_maxmsg;
00222
for (msg_ind = 0; msg_ind < q->attrs.mq_maxmsg; msg_ind++) {
00223 q->nodes[msg_ind] = msg_ptr;
00224 ((MSG_HDR *)msg_ptr)->size = 0;
00225 ((MSG_HDR *)msg_ptr)->priority =
MQ_MIN_MSG_PRIORITY;
00226 ((MSG_HDR *)msg_ptr)->next = NULL;
00227 msg_ptr += msg_size;
00228 }
00229 }
00230
00231
00232 static void delete_queue(
int q_index)
00233 {
00234
rt_free(
rt_pqueue_descr[q_index].data.base);
00235
00236
rt_pqueue_descr[q_index].owner = NULL;
00237
rt_pqueue_descr[q_index].open_count = 0;
00238 strcpy(
rt_pqueue_descr[q_index].q_name,
"\0");
00239
rt_pqueue_descr[q_index].q_id =
INVALID_PQUEUE;
00240
rt_pqueue_descr[q_index].data.base = NULL;
00241
rt_pqueue_descr[q_index].data.head = NULL;
00242
rt_pqueue_descr[q_index].data.tail = NULL;
00243
rt_pqueue_descr[q_index].data.attrs = (
MQ_ATTR){ 0, 0, 0, 0 };
00244
rt_pqueue_descr[q_index].permissions = 0;
00245
00246
pthread_mutex_unlock(&
rt_pqueue_descr[q_index].mutex);
00247
pthread_mutex_destroy(&
rt_pqueue_descr[q_index].mutex);
00248
pthread_cond_destroy(&
rt_pqueue_descr[q_index].emp_cond);
00249
pthread_cond_destroy(&
rt_pqueue_descr[q_index].full_cond);
00250
00251
if (
num_pqueues > 0) {
00252
num_pqueues--;
00253 }
00254 }
00255
00256
00257
00258
00259
00260 mqd_t mq_open(
char *mq_name,
int oflags, mode_t permissions,
struct mq_attr *
mq_attr)
00261 {
00262
int q_index, t_index, q_ind;
00263
int spare_count = 0, first_spare = 0;
00264 mq_bool_t q_found = FALSE;
00265
RT_TASK *this_task = _rt_whoami();
00266
struct _pqueue_access_struct *task_data_ptr;
00267
00268 task_data_ptr = (QUEUE_CTRL)this_task->mqueues;
00269
00270
pthread_mutex_lock(&
pqueue_mutex);
00271
if ((q_index =
name_to_id(mq_name)) >= 0) {
00272
00273
00274
00275
if ((oflags & O_CREAT) && (oflags & O_EXCL)) {
00276
pthread_mutex_unlock(&
pqueue_mutex);
00277
return -EEXIST;
00278 }
00279
if (task_data_ptr == NULL) {
00280
for (t_index = 0; t_index <
MAX_RT_TASKS; t_index++) {
00281
if (
task_pqueue_access[t_index].this_task == NULL) {
00282 task_data_ptr = &(
task_pqueue_access[t_index]);
00283 task_data_ptr->this_task = this_task;
00284 this_task->mqueues = task_data_ptr;
00285
break;
00286 }
00287 }
00288
if (t_index ==
MAX_RT_TASKS) {
00289
pthread_mutex_unlock(&
pqueue_mutex);
00290
return -ENOMEM;
00291 }
00292 }
00293
00294
00295
00296
00297
00298
pthread_mutex_lock(&
rt_pqueue_descr[q_index].mutex);
00299
for (q_ind = 0; q_ind <
MQ_OPEN_MAX; q_ind++) {
00300
if (task_data_ptr->q_access[q_ind].q_id ==
rt_pqueue_descr[q_index].q_id) {
00301 q_found = TRUE;
00302
break;
00303 }
else if(task_data_ptr->q_access[q_ind].q_id ==
INVALID_PQUEUE) {
00304
if (spare_count == 0) {
00305 first_spare = q_ind;
00306 }
00307 spare_count++;
00308 }
00309 }
00310
00311
00312
if (!q_found && spare_count == 0) {
00313
pthread_mutex_unlock(&
rt_pqueue_descr[q_index].mutex);
00314
pthread_mutex_unlock(&
pqueue_mutex);
00315
return -EINVAL;
00316 }
00317
00318
00319
if (!q_found) {
00320
00321 task_data_ptr->n_open_pqueues++;
00322 q_ind = first_spare;
00323 }
00324 task_data_ptr->q_access[q_ind].q_id =
rt_pqueue_descr[q_index].q_id;
00325 task_data_ptr->q_access[q_ind].oflags = oflags;
00326
pthread_mutex_unlock(&
rt_pqueue_descr[q_index].mutex);
00327 }
else if (oflags & O_CREAT) {
00328
00329
00330
00331
if(
num_pqueues >=
MAX_PQUEUES) {
00332
pthread_mutex_unlock(&
pqueue_mutex);
00333
return -ENOMEM;
00334 }
00335
00336
if( strlen(mq_name) >=
MQ_NAME_MAX) {
00337
pthread_mutex_unlock(&
pqueue_mutex);
00338
return -ENAMETOOLONG;
00339 }
00340
00341
00342
00343
00344
if (task_data_ptr == NULL) {
00345
00346
for (t_index = 0; t_index <
MAX_RT_TASKS; t_index++) {
00347
if (
task_pqueue_access[t_index].this_task == NULL) {
00348 task_data_ptr = &
task_pqueue_access[t_index];
00349 task_data_ptr->this_task = this_task;
00350 this_task->mqueues = task_data_ptr;
00351
break;
00352 }
00353 }
00354
if (t_index ==
MAX_RT_TASKS) {
00355
pthread_mutex_unlock(&
pqueue_mutex);
00356
return -ENOMEM;
00357 }
00358 }
else if (task_data_ptr->n_open_pqueues >=
MQ_OPEN_MAX) {
00359
pthread_mutex_unlock(&
pqueue_mutex);
00360
return -EINVAL;
00361 }
00362
00363
if (mq_attr == NULL) {
00364 mq_attr = &
default_queue_attrs;
00365 }
00366
00367
for (q_index = 0; q_index <
MAX_PQUEUES; q_index++) {
00368
if (
rt_pqueue_descr[q_index].q_id ==
INVALID_PQUEUE) {
00369
int msg_size, queue_size;
00370
void *mem_ptr;
00371
00372 msg_size = mq_attr->
mq_msgsize +
sizeof(MSG_HDR);
00373 queue_size = (msg_size +
sizeof(
void *))*mq_attr->
mq_maxmsg;
00374 mem_ptr =
rt_malloc(queue_size);
00375
if(mem_ptr == NULL) {
00376
pthread_mutex_unlock(&
pqueue_mutex);
00377
return -ENOMEM;
00378 }
00379
rt_pqueue_descr[q_index].data.base = mem_ptr;
00380
00381
rt_pqueue_descr[q_index].owner = this_task;
00382
rt_pqueue_descr[q_index].open_count = 0;
00383 strcpy(
rt_pqueue_descr[q_index].q_name, mq_name);
00384
rt_pqueue_descr[q_index].q_id = q_index + 1;
00385
rt_pqueue_descr[q_index].marked_for_deletion = FALSE;
00386
rt_pqueue_descr[q_index].data.head =
00387
rt_pqueue_descr[q_index].data.tail =
rt_pqueue_descr[q_index].data.base;
00388
rt_pqueue_descr[q_index].data.attrs = *(mq_attr);
00389
rt_pqueue_descr[q_index].data.attrs.mq_curmsgs = 0;
00390
rt_pqueue_descr[q_index].permissions = permissions;
00391
00392
pthread_cond_init(&
rt_pqueue_descr[q_index].emp_cond, NULL);
00393
pthread_cond_init(&
rt_pqueue_descr[q_index].full_cond, NULL);
00394
pthread_mutex_init(&
rt_pqueue_descr[q_index].mutex, NULL);
00395
00396
00397
initialise_queue(&
rt_pqueue_descr[q_index].data);
00398
00399 q_ind = task_data_ptr->n_open_pqueues++;
00400 task_data_ptr->q_access[q_ind].q_id = q_index + 1;
00401 task_data_ptr->q_access[q_ind].oflags = oflags;
00402
break;
00403 }
00404 }
00405
if(q_index >=
MAX_PQUEUES) {
00406
pthread_mutex_unlock(&
pqueue_mutex);
00407
return -EMFILE;
00408 }
00409
num_pqueues++;
00410 }
else {
00411
00412
00413
00414
pthread_mutex_unlock(&
pqueue_mutex);
00415
return -ENOENT;
00416 }
00417
00418
TRACE_RTAI_POSIX(TRACE_RTAI_EV_POSIX_MQ_OPEN,
rt_pqueue_descr[q_index].q_id, 0, 0);
00419
00420
00421
rt_pqueue_descr[q_index].open_count++;
00422
pthread_mutex_unlock(&
pqueue_mutex);
00423
return (
mqd_t)
rt_pqueue_descr[q_index].q_id;
00424 }
00425
00426
00427 size_t
_mq_receive(
mqd_t mq,
char *msg_buffer, size_t buflen,
unsigned int *msgprio,
int space)
00428 {
00429
int q_index = mq - 1, size;
00430 MQMSG *msg_ptr;
00431 MSG_QUEUE *q;
00432
00433
TRACE_RTAI_POSIX(TRACE_RTAI_EV_POSIX_MQ_RECV, mq, buflen, 0);
00434
00435
if (q_index < 0 || q_index >=
MAX_PQUEUES) {
00436
return -EBADF;
00437 }
00438 q = &
rt_pqueue_descr[q_index];
00439
if (
can_access(q, FOR_READ) == FALSE) {
00440
return -EINVAL;
00441 }
00442
pthread_mutex_lock(&q->mutex);
00443
while (
is_empty(&q->data)) {
00444
if (
is_blocking(q)) {
00445
pthread_cond_wait(&q->emp_cond, &q->mutex);
00446 }
else {
00447
pthread_mutex_unlock(&q->mutex);
00448
return -EAGAIN;
00449 }
00450 }
00451 msg_ptr = q->data.head;
00452
if (msg_ptr->hdr.size <= buflen) {
00453 size = msg_ptr->hdr.size;
00454
if (space) {
00455 memcpy(msg_buffer, &msg_ptr->data, size);
00456 *msgprio = msg_ptr->hdr.priority;
00457 }
else {
00458 copy_to_user(msg_buffer, &msg_ptr->data, size);
00459 copy_to_user(msgprio, &msg_ptr->hdr.priority,
sizeof(msgprio));
00460 }
00461 }
else {
00462 size =
ERROR;
00463 }
00464
00465 q->data.head = msg_ptr->hdr.next;
00466
if(q->data.head == NULL) {
00467 q->data.head = q->data.tail = q->data.base;
00468 }
00469
freenode(msg_ptr, &q->data);
00470 msg_ptr->hdr.size = 0;
00471 msg_ptr->hdr.next = NULL;
00472
rt_pqueue_descr[q_index].data.attrs.mq_curmsgs--;
00473
00474
pthread_cond_signal(&q->full_cond);
00475
pthread_mutex_unlock(&q->mutex);
00476
00477
return size;
00478 }
00479
00480
00481 size_t
_mq_timedreceive(
mqd_t mq,
char *msg_buffer, size_t buflen,
unsigned int *msgprio,
const struct timespec *abstime,
int space)
00482 {
00483
int q_index = mq - 1, size;
00484 MQMSG *msg_ptr;
00485 MSG_QUEUE *q;
00486
00487
TRACE_RTAI_POSIX(TRACE_RTAI_EV_POSIX_MQ_RECV, mq, buflen, 0);
00488
00489
if (q_index < 0 || q_index >=
MAX_PQUEUES) {
00490
return -EBADF;
00491 }
00492 q = &
rt_pqueue_descr[q_index];
00493
if (
can_access(q, FOR_READ) == FALSE) {
00494
return -EINVAL;
00495 }
00496
pthread_mutex_lock(&q->mutex);
00497
while (
is_empty(&q->data)) {
00498
if (
is_blocking(q)) {
00499
struct timespec time;
00500
if (!space) {
00501 copy_from_user(&time, abstime,
sizeof(
struct timespec));
00502 abstime = &time;
00503 }
00504
if (
rt_cond_wait_until(&q->emp_cond, &q->mutex,
timespec2count(abstime)) > 1) {
00505
pthread_mutex_unlock(&q->mutex);
00506
return -ETIMEDOUT;
00507 }
00508 }
else {
00509
return -EAGAIN;
00510 }
00511 }
00512 msg_ptr = q->data.head;
00513
if (buflen < q->data.attrs.mq_msgsize) {
00514
pthread_mutex_unlock(&q->mutex);
00515
return -EMSGSIZE;
00516 }
00517
if (msg_ptr->hdr.size <= buflen) {
00518 size = msg_ptr->hdr.size;
00519
if (space) {
00520 memcpy(msg_buffer, &msg_ptr->data, size);
00521 *msgprio = msg_ptr->hdr.priority;
00522 }
else {
00523 copy_to_user(msg_buffer, &msg_ptr->data, size);
00524 copy_to_user(msgprio, &msg_ptr->hdr.priority,
sizeof(msgprio));
00525 }
00526 }
else {
00527 size =
ERROR;
00528 }
00529
00530 q->data.head = msg_ptr->hdr.next;
00531
if(q->data.head == NULL) {
00532 q->data.head = q->data.tail = q->data.base;
00533 }
00534
freenode(msg_ptr, &q->data);
00535 msg_ptr->hdr.size = 0;
00536 msg_ptr->hdr.next = NULL;
00537
rt_pqueue_descr[q_index].data.attrs.mq_curmsgs--;
00538
00539
pthread_cond_signal(&q->full_cond);
00540
pthread_mutex_unlock(&q->mutex);
00541
00542
return size;
00543 }
00544
00545
00546 int _mq_send(
mqd_t mq,
const char *msg, size_t msglen,
unsigned int msgprio,
int space)
00547 {
00548
int q_index = mq - 1;
00549 MSG_QUEUE *q;
00550 MSG_HDR *this_msg;
00551 mq_bool_t q_was_empty;
00552
00553
TRACE_RTAI_POSIX(TRACE_RTAI_EV_POSIX_MQ_SEND, mq, msglen, msgprio);
00554
00555
if (q_index < 0 || q_index >=
MAX_PQUEUES) {
00556
return -EBADF;
00557 }
00558 q = &
rt_pqueue_descr[q_index];
00559
if(
can_access(q, FOR_WRITE) == FALSE) {
00560
return -EINVAL;
00561 }
00562
if(msgprio >
MQ_PRIO_MAX) {
00563
return -EINVAL;
00564 }
00565
pthread_mutex_lock(&q->mutex);
00566
while (
is_full(&q->data)) {
00567
if (
is_blocking(q)) {
00568
pthread_cond_wait(&q->full_cond, &q->mutex);
00569 }
else {
00570
pthread_mutex_unlock(&q->mutex);
00571
return -EAGAIN;
00572 }
00573 }
00574
if( (this_msg =
getnode(&q->data)) == NULL) {
00575
pthread_mutex_unlock(&q->mutex);
00576
return -ENOBUFS;
00577 }
00578 q_was_empty =
is_empty(&q->data);
00579 q->data.attrs.mq_curmsgs++;
00580
if (msglen > q->data.attrs.mq_msgsize) {
00581
pthread_mutex_unlock(&q->mutex);
00582
return -EMSGSIZE;
00583 }
00584 this_msg->size = msglen;
00585 this_msg->priority = msgprio;
00586
if (space) {
00587 memcpy(&((MQMSG *)this_msg)->data, msg, msglen);
00588 }
else {
00589 copy_from_user(&((MQMSG *)this_msg)->data, msg, msglen);
00590 }
00591
insert_message(&q->data, this_msg);
00592
pthread_cond_signal(&q->emp_cond);
00593
00594
if(q_was_empty &&
rt_pqueue_descr[q_index].notify.task != NULL) {
00595
00596
00597
00598
00599
00600
00601
rt_pqueue_descr[q_index].notify.task = NULL;
00602 }
00603
pthread_mutex_unlock(&q->mutex);
00604
return msglen;
00605 }
00606
00607
00608 int _mq_timedsend(
mqd_t mq,
const char *msg, size_t msglen,
unsigned int msgprio,
const struct timespec *abstime,
int space)
00609 {
00610
int q_index = mq - 1;
00611 MSG_QUEUE *q;
00612 MSG_HDR *this_msg;
00613 mq_bool_t q_was_empty;
00614
00615
TRACE_RTAI_POSIX(TRACE_RTAI_EV_POSIX_MQ_SEND, mq, msglen, msgprio);
00616
00617
if (q_index < 0 || q_index >=
MAX_PQUEUES) {
00618
return -EBADF;
00619 }
00620 q = &
rt_pqueue_descr[q_index];
00621
if (
can_access(q, FOR_WRITE) == FALSE) {
00622
return -EINVAL;
00623 }
00624
if (msgprio >
MQ_PRIO_MAX) {
00625
return -EINVAL;
00626 }
00627
pthread_mutex_lock(&q->mutex);
00628
while (
is_full(&q->data)) {
00629
if (
is_blocking(q)) {
00630
struct timespec time;
00631
if (!space) {
00632 copy_from_user(&time, abstime,
sizeof(
struct timespec));
00633 abstime = &time;
00634 }
00635
if (
rt_cond_wait_until(&q->full_cond, &q->mutex,
timespec2count(abstime)) > 1) {
00636
pthread_mutex_unlock(&q->mutex);
00637
return -ETIMEDOUT;
00638 }
00639 }
else {
00640
pthread_mutex_unlock(&q->mutex);
00641
return -EAGAIN;
00642 }
00643 }
00644
if ((this_msg =
getnode(&q->data)) == NULL) {
00645
pthread_mutex_unlock(&q->mutex);
00646
return -ENOBUFS;
00647 }
00648 q_was_empty =
is_empty(&q->data);
00649 q->data.attrs.mq_curmsgs++;
00650
if (msglen > q->data.attrs.mq_msgsize) {
00651
pthread_mutex_unlock(&q->mutex);
00652
return -EMSGSIZE;
00653 }
00654 this_msg->size = msglen;
00655 this_msg->priority = msgprio;
00656
if (space) {
00657 memcpy(&((MQMSG *)this_msg)->data, msg, msglen);
00658 }
else {
00659 copy_from_user(&((MQMSG *)this_msg)->data, msg, msglen);
00660 }
00661
insert_message(&q->data, this_msg);
00662
pthread_cond_signal(&q->emp_cond);
00663
00664
if (q_was_empty &&
rt_pqueue_descr[q_index].notify.task != NULL) {
00665
00666
00667
00668
00669
00670
00671
00672
rt_pqueue_descr[q_index].notify.task = NULL;
00673 }
00674
pthread_mutex_unlock(&q->mutex);
00675
return msglen;
00676
00677 }
00678
00679
00680 int mq_close(
mqd_t mq)
00681 {
00682
int q_index = mq - 1;
00683
int q_ind;
00684
RT_TASK *this_task = _rt_whoami();
00685
struct _pqueue_access_struct *task_queue_data_ptr;
00686
00687
TRACE_RTAI_POSIX(TRACE_RTAI_EV_POSIX_MQ_CLOSE, mq, 0, 0);
00688
00689
if (q_index < 0 || q_index >=
MAX_PQUEUES) {
00690
return -EINVAL;
00691 }
00692 task_queue_data_ptr = (QUEUE_CTRL)this_task->mqueues;
00693
if (task_queue_data_ptr == NULL ) {
00694
return -EINVAL;
00695 }
00696
pthread_mutex_lock(&
pqueue_mutex);
00697
for (q_ind = 0; q_ind <
MQ_OPEN_MAX; q_ind++) {
00698
if (task_queue_data_ptr->q_access[q_ind].q_id == mq) {
00699 task_queue_data_ptr->q_access[q_ind].q_id =
INVALID_PQUEUE;
00700 task_queue_data_ptr->n_open_pqueues--;
00701
break;
00702 }
00703 }
00704
if (q_ind ==
MQ_OPEN_MAX) {
00705
pthread_mutex_unlock(&
pqueue_mutex);
00706
return -EINVAL;
00707 }
00708
pthread_mutex_lock(&
rt_pqueue_descr[q_index].mutex);
00709
if (
rt_pqueue_descr[q_index].notify.task == this_task) {
00710
rt_pqueue_descr[q_index].notify.task = NULL;
00711 }
00712
if (--
rt_pqueue_descr[q_index].open_count <= 0 &&
00713
rt_pqueue_descr[q_index].marked_for_deletion == TRUE ) {
00714
delete_queue(q_index);
00715 }
else {
00716
pthread_mutex_unlock(&
rt_pqueue_descr[q_index].mutex);
00717 }
00718
pthread_mutex_unlock(&
pqueue_mutex);
00719
return OK;
00720 }
00721
00722
00723 int mq_getattr(
mqd_t mq,
struct mq_attr *attrbuf)
00724 {
00725
int q_index = mq - 1;
00726
00727
TRACE_RTAI_POSIX(TRACE_RTAI_EV_POSIX_MQ_GET_ATTR, mq, 0, 0);
00728
00729
if (0 <= q_index && q_index <
MAX_PQUEUES) {
00730 *attrbuf =
rt_pqueue_descr[q_index].data.attrs;
00731
return OK;
00732 }
00733
return -EBADF;
00734 }
00735
00736
00737 int mq_setattr(
mqd_t mq,
const struct mq_attr *new_attrs,
struct mq_attr *old_attrs)
00738 {
00739
int q_index = mq - 1;
00740
int q_ind;
00741
RT_TASK *this_task = _rt_whoami();
00742
struct _pqueue_access_struct *task_queue_data_ptr;
00743
00744
TRACE_RTAI_POSIX(TRACE_RTAI_EV_POSIX_MQ_SET_ATTR, mq, 0, 0);
00745
00746
if (q_index < 0 || q_index >=
MAX_PQUEUES) {
00747
return -EBADF;
00748 }
00749
if (old_attrs != NULL) {
00750 *old_attrs =
rt_pqueue_descr[q_index].data.attrs;
00751 }
00752 task_queue_data_ptr = (QUEUE_CTRL)this_task->mqueues;
00753
if (task_queue_data_ptr == NULL) {
00754
return -EINVAL;
00755 }
00756
for (q_ind = 0; q_ind <
MQ_OPEN_MAX; q_ind++) {
00757
if (task_queue_data_ptr->q_access[q_ind].q_id == mq) {
00758
if(new_attrs->
mq_flags ==
MQ_NONBLOCK) {
00759 task_queue_data_ptr->q_access[q_ind].oflags |= O_NONBLOCK;
00760 }
else if (new_attrs->
mq_flags ==
MQ_BLOCK) {
00761 task_queue_data_ptr->q_access[q_ind].oflags &= ~O_NONBLOCK;
00762 }
else {
00763
return -EINVAL;
00764 }
00765
break;
00766 }
00767 }
00768
if (q_ind ==
MQ_OPEN_MAX) {
00769
return -EINVAL;
00770 }
00771
pthread_mutex_lock(&
rt_pqueue_descr[q_index].mutex);
00772
rt_pqueue_descr[q_index].data.attrs.mq_flags = new_attrs->
mq_flags;
00773
pthread_mutex_unlock(&
rt_pqueue_descr[q_index].mutex);
00774
return OK;
00775 }
00776
00777
00778 int mq_notify(
mqd_t mq,
const struct sigevent *notification)
00779 {
00780
int q_index = mq - 1;
00781
int rtn;
00782
00783
TRACE_RTAI_POSIX(TRACE_RTAI_EV_POSIX_MQ_NOTIFY, mq, 0, 0);
00784
00785
if (q_index < 0 || q_index >=
MAX_PQUEUES) {
00786
return -EBADF;
00787 }
00788
pthread_mutex_lock(&
rt_pqueue_descr[q_index].mutex);
00789
if (notification != NULL) {
00790
if (
rt_pqueue_descr[q_index].notify.task != NULL) {
00791
rt_pqueue_descr[q_index].notify.task = _rt_whoami();
00792
rt_pqueue_descr[q_index].notify.data = *notification;
00793 rtn =
OK;
00794 }
else {
00795 rtn =
ERROR;
00796 }
00797 }
else {
00798
if (
rt_pqueue_descr[q_index].notify.task == _rt_whoami()) {
00799
rt_pqueue_descr[q_index].notify.task = NULL;
00800 rtn =
OK;
00801 }
else {
00802 rtn =
ERROR;
00803 }
00804 }
00805
pthread_mutex_unlock(&
rt_pqueue_descr[q_index].mutex);
00806
return rtn;
00807 }
00808
00809 int mq_unlink(
char *mq_name)
00810 {
00811
int q_index, rtn;
00812
00813
pthread_mutex_lock(&
pqueue_mutex);
00814 q_index =
name_to_id(mq_name);
00815
00816
TRACE_RTAI_POSIX(TRACE_RTAI_EV_POSIX_MQ_UNLINK, q_index, 0, 0);
00817
00818
if (q_index < 0) {
00819
pthread_mutex_unlock(&
pqueue_mutex);
00820
return -ENOENT;
00821 }
00822
pthread_mutex_lock(&
rt_pqueue_descr[q_index].mutex);
00823
if (
rt_pqueue_descr[q_index].open_count > 0) {
00824 strcpy(
rt_pqueue_descr[q_index].q_name,
"\0");
00825
rt_pqueue_descr[q_index].marked_for_deletion = TRUE;
00826
pthread_mutex_unlock(&
rt_pqueue_descr[q_index].mutex);
00827 rtn =
rt_pqueue_descr[q_index].open_count;
00828 }
else {
00829
delete_queue(q_index);
00830 rtn =
OK;
00831 }
00832
pthread_mutex_unlock(&
pqueue_mutex);
00833
return rtn;
00834 }
00835
00836
00837
00838
00839
00840
#ifdef CONFIG_PROC_FS
00841
00842
static int pqueue_read_proc(
char *page,
char **start, off_t off,
int count,
00843
int *eof,
void *data)
00844 {
00845
PROC_PRINT_VARS;
00846
int ind;
00847
00848
PROC_PRINT(
"\nRTAI Posix Queue Status\n");
00849
PROC_PRINT(
"-----------------------\n\n");
00850
PROC_PRINT(
"MAX_PQUEUES = %2d (system wide)\n", MAX_PQUEUES);
00851
PROC_PRINT(
"MQ_OPEN_MAX = %2d (per RT task)\n", MQ_OPEN_MAX);
00852
PROC_PRINT(
"MQ_NAME_MAX = %d\n", MQ_NAME_MAX);
00853
00854
PROC_PRINT(
"\nID NOpen NMsgs MaxMsgs MaxSz Perms Del Name\n");
00855
PROC_PRINT(
"--------------------------------------------------------------------------------\n");
00856
for (ind = 0; ind <
MAX_PQUEUES; ind++) {
00857
if (
rt_pqueue_descr[ind].q_name[0] ||
rt_pqueue_descr[ind].open_count) {
00858
PROC_PRINT(
"%-3d %-6d ",
00859 rt_pqueue_descr[ind].q_id,
00860 rt_pqueue_descr[ind].open_count
00861 );
00862
PROC_PRINT(
"%-6ld %-6ld %-5ld ",
00863 rt_pqueue_descr[ind].data.attrs.mq_curmsgs,
00864 rt_pqueue_descr[ind].data.attrs.mq_maxmsg,
00865 rt_pqueue_descr[ind].data.attrs.mq_msgsize
00866 );
00867
PROC_PRINT(
"%-4o %c %s\n",
00868 rt_pqueue_descr[ind].permissions,
00869 rt_pqueue_descr[ind].marked_for_deletion ?
'*' :
' ',
00870 rt_pqueue_descr[ind].q_name
00871 );
00872 }
00873 }
00874
PROC_PRINT_DONE;
00875 }
00876
00877
static struct proc_dir_entry *proc_rtai_pqueue;
00878
00879
static int pqueue_proc_register(
void)
00880 {
00881 proc_rtai_pqueue = create_proc_entry(
"pqueue", 0, rtai_proc_root);
00882 proc_rtai_pqueue->read_proc = pqueue_read_proc;
00883
return 0;
00884 }
00885
00886
static int pqueue_proc_unregister(
void)
00887 {
00888 remove_proc_entry(
"pqueue", rtai_proc_root);
00889
return 0;
00890 }
00891
#endif
00892
00893
00894
00895
00896
00897 struct rt_native_fun_entry
rt_pqueue_entries[] = {
00898 { {
UR1(1, 5) |
UR2(4, 6),
mq_open },
MQ_OPEN },
00899 { { 1,
_mq_receive },
MQ_RECEIVE },
00900 { { 1,
_mq_send },
MQ_SEND },
00901 { { 1,
mq_close },
MQ_CLOSE },
00902 { {
UW1(2, 3),
mq_getattr },
MQ_GETATTR },
00903 { {
UR1(2, 4) |
UW1(3, 4),
mq_setattr },
MQ_SETATTR },
00904 { {
UR1(2, 3),
mq_notify },
MQ_NOTIFY },
00905 { {
UR1(1, 2),
mq_unlink },
MQ_UNLINK },
00906 { { 1,
_mq_timedreceive },
MQ_TIMEDRECEIVE },
00907 { { 1,
_mq_timedsend },
MQ_TIMEDSEND },
00908 { { 0, 0 }, 000 }
00909 };
00910
00911
extern int set_rt_fun_entries(
struct rt_native_fun_entry *entry);
00912
extern void reset_rt_fun_entries(
struct rt_native_fun_entry *entry);
00913
00914 int __rtai_mq_init(
void)
00915 {
00916
num_pqueues = 0;
00917
pthread_mutex_init(&
pqueue_mutex, NULL);
00918
#ifdef CONFIG_PROC_FS
00919
pqueue_proc_register();
00920
#endif
00921
printk(KERN_INFO
"RTAI[mq]: loaded.\n");
00922
return set_rt_fun_entries(
rt_pqueue_entries);
00923
return OK;
00924 }
00925
00926 void __rtai_mq_exit(
void)
00927 {
00928
pthread_mutex_destroy(&
pqueue_mutex);
00929
reset_rt_fun_entries(
rt_pqueue_entries);
00930
#ifdef CONFIG_PROC_FS
00931
pqueue_proc_unregister();
00932
#endif
00933
printk(KERN_INFO
"RTAI[mq]: unloaded.\n");
00934 }
00935
00936
#ifndef CONFIG_RTAI_MQ_BUILTIN
00937
module_init(__rtai_mq_init);
00938
module_exit(__rtai_mq_exit);
00939
#endif
00940
00941
#ifdef CONFIG_KBUILD
00942
EXPORT_SYMBOL(mq_open);
00943
EXPORT_SYMBOL(_mq_receive);
00944
EXPORT_SYMBOL(_mq_timedreceive);
00945
EXPORT_SYMBOL(_mq_send);
00946
EXPORT_SYMBOL(_mq_timedsend);
00947
EXPORT_SYMBOL(mq_close);
00948
EXPORT_SYMBOL(mq_getattr);
00949
EXPORT_SYMBOL(mq_setattr);
00950
EXPORT_SYMBOL(mq_notify);
00951
EXPORT_SYMBOL(mq_unlink);
00952
#endif