00001
00002
00003
00004 #include "ace/MEM_IO.h"
00005 #include "ace/Handle_Set.h"
00006
00007 #if (ACE_HAS_POSITION_INDEPENDENT_POINTERS == 1)
00008
00009 #if !defined (__ACE_INLINE__)
00010 #include "ace/MEM_IO.inl"
00011 #endif
00012
00013 ACE_RCSID(ace, MEM_IO, "$Id: MEM_IO.cpp 88708 2010-01-25 18:58:54Z johnnyw $")
00014
00015 ACE_BEGIN_VERSIONED_NAMESPACE_DECL
00016
00017 ACE_ALLOC_HOOK_DEFINE(ACE_MEM_IO)
00018
00019 ACE_Reactive_MEM_IO::~ACE_Reactive_MEM_IO (void)
00020 {
00021 }
00022
00023 int
00024 ACE_Reactive_MEM_IO::init (ACE_HANDLE handle,
00025 const ACE_TCHAR *name,
00026 MALLOC_OPTIONS *options)
00027 {
00028 ACE_TRACE ("ACE_Reactive_MEM_IO::init");
00029 this->handle_ = handle;
00030 return this->create_shm_malloc (name,
00031 options);
00032 }
00033
00034 ssize_t
00035 ACE_Reactive_MEM_IO::recv_buf (ACE_MEM_SAP_Node *&buf,
00036 int flags,
00037 const ACE_Time_Value *timeout)
00038 {
00039 ACE_TRACE ("ACE_Reactive_MEM_IO::recv_buf");
00040
00041 if (this->shm_malloc_ == 0 || this->handle_ == ACE_INVALID_HANDLE)
00042 return -1;
00043
00044 ACE_OFF_T new_offset = 0;
00045 ssize_t retv = ACE::recv (this->handle_,
00046 (char *) &new_offset,
00047 sizeof (ACE_OFF_T),
00048 flags,
00049 timeout);
00050
00051 if (retv == 0)
00052 {
00053
00054 buf = 0;
00055 return 0;
00056 }
00057 else if (retv != static_cast <ssize_t> (sizeof (ACE_OFF_T)))
00058 {
00059
00060 buf = 0;
00061 return -1;
00062 }
00063
00064 return this->get_buf_len (new_offset, buf);
00065 }
00066
00067 ssize_t
00068 ACE_Reactive_MEM_IO::send_buf (ACE_MEM_SAP_Node *buf,
00069 int flags,
00070 const ACE_Time_Value *timeout)
00071 {
00072 ACE_TRACE ("ACE_Reactive_MEM_IO::send_buf");
00073
00074 if (this->shm_malloc_ == 0 || this->handle_ == ACE_INVALID_HANDLE)
00075 {
00076 return -1;
00077 }
00078
00079
00080 ACE_OFF_T offset =
00081 ACE_Utils::truncate_cast<ACE_OFF_T> (
00082 reinterpret_cast<char *> (buf)
00083 - static_cast<char *> (this->shm_malloc_->base_addr ()));
00084
00085
00086 if (ACE::send (this->handle_,
00087 (const char *) &offset,
00088 sizeof (offset),
00089 flags,
00090 timeout) != static_cast <ssize_t> (sizeof (offset)))
00091 {
00092
00093 this->release_buffer (buf);
00094
00095 return -1;
00096 }
00097
00098 return ACE_Utils::truncate_cast<ssize_t> (buf->size ());
00099 }
00100
00101 #if defined (ACE_WIN32) || !defined (_ACE_USE_SV_SEM)
00102 int
00103 ACE_MT_MEM_IO::Simple_Queue::write (ACE_MEM_SAP_Node *new_node)
00104 {
00105 if (this->mq_ == 0)
00106 return -1;
00107
00108
00109
00110 if (this->mq_->tail_.addr () == 0)
00111 {
00112 this->mq_->head_ = new_node;
00113 this->mq_->tail_ = new_node;
00114 new_node->next_ = 0;
00115 }
00116 else
00117 {
00118 this->mq_->tail_->next_ = new_node;
00119 new_node->next_ = 0;
00120 this->mq_->tail_ = new_node;
00121 }
00122 return 0;
00123 }
00124
00125 ACE_MEM_SAP_Node *
00126 ACE_MT_MEM_IO::Simple_Queue::read ()
00127 {
00128 if (this->mq_ == 0)
00129 return 0;
00130
00131 ACE_MEM_SAP_Node *retv = 0;
00132
00133 ACE_SEH_TRY
00134 {
00135 retv = this->mq_->head_;
00136
00137
00138 if (this->mq_->head_ == this->mq_->tail_)
00139 {
00140
00141 this->mq_->head_ = 0;
00142 this->mq_->tail_ = 0;
00143 }
00144 else
00145 this->mq_->head_ = retv->next_;
00146 }
00147 ACE_SEH_EXCEPT (this->malloc_->memory_pool ().seh_selector (GetExceptionInformation ()))
00148 {
00149 }
00150
00151 return retv;
00152 }
00153
00154 ACE_MT_MEM_IO::~ACE_MT_MEM_IO ()
00155 {
00156 delete this->recv_channel_.sema_;
00157 delete this->recv_channel_.lock_;
00158 delete this->send_channel_.sema_;
00159 delete this->send_channel_.lock_;
00160 }
00161
00162 int
00163 ACE_MT_MEM_IO::init (ACE_HANDLE handle,
00164 const ACE_TCHAR *name,
00165 MALLOC_OPTIONS *options)
00166 {
00167 ACE_TRACE ("ACE_MT_MEM_IO::init");
00168 ACE_UNUSED_ARG (handle);
00169
00170
00171
00172
00173 if (this->create_shm_malloc (name, options) == -1)
00174 return -1;
00175
00176 ACE_TCHAR server_sema [MAXPATHLEN];
00177 ACE_TCHAR client_sema [MAXPATHLEN];
00178 ACE_TCHAR server_lock [MAXPATHLEN];
00179 ACE_TCHAR client_lock [MAXPATHLEN];
00180 const ACE_TCHAR *basename = ACE::basename (name);
00181
00182
00183
00184 ACE_OS::strcpy (server_sema, basename);
00185 ACE_OS::strcat (server_sema, ACE_TEXT ("_sema_to_server"));
00186 ACE_OS::strcpy (client_sema, basename);
00187 ACE_OS::strcat (client_sema, ACE_TEXT ("_sema_to_client"));
00188 ACE_OS::strcpy (server_lock, basename);
00189 ACE_OS::strcat (server_lock, ACE_TEXT ("_lock_to_server"));
00190 ACE_OS::strcpy (client_lock, basename);
00191 ACE_OS::strcat (client_lock, ACE_TEXT ("_lock_to_client"));
00192
00193 void *to_server_ptr = 0;
00194
00195
00196
00197
00198 if (this->shm_malloc_->find ("to_server", to_server_ptr) == -1)
00199 {
00200 void *ptr = 0;
00201
00202 ACE_ALLOCATOR_RETURN (ptr,
00203 this->shm_malloc_->malloc (2 * sizeof (MQ_Struct)),
00204 -1);
00205
00206 MQ_Struct *mymq = reinterpret_cast<MQ_Struct *> (ptr);
00207 mymq->tail_ = 0;
00208 mymq->head_ = 0;
00209 (mymq + 1)->tail_ = 0;
00210 (mymq + 1)->head_ = 0;
00211 if (this->shm_malloc_->bind ("to_server", mymq) == -1)
00212 return -1;
00213
00214 if (this->shm_malloc_->bind ("to_client", mymq + 1) == -1)
00215 return -1;
00216
00217 this->recv_channel_.queue_.init (mymq, this->shm_malloc_);
00218 ACE_NEW_RETURN (this->recv_channel_.sema_,
00219 ACE_SYNCH_PROCESS_SEMAPHORE (0, server_sema),
00220 -1);
00221 ACE_NEW_RETURN (this->recv_channel_.lock_,
00222 ACE_SYNCH_PROCESS_MUTEX (server_lock),
00223 -1);
00224
00225 this->send_channel_.queue_.init (mymq + 1, this->shm_malloc_);
00226 ACE_NEW_RETURN (this->send_channel_.sema_,
00227 ACE_SYNCH_PROCESS_SEMAPHORE (0, client_sema),
00228 -1);
00229 ACE_NEW_RETURN (this->send_channel_.lock_,
00230 ACE_SYNCH_PROCESS_MUTEX (client_lock),
00231 -1);
00232 }
00233 else
00234 {
00235
00236 MQ_Struct *mymq = reinterpret_cast<MQ_Struct *> (to_server_ptr);
00237 this->recv_channel_.queue_.init (mymq +1, this->shm_malloc_);
00238 ACE_NEW_RETURN (this->recv_channel_.sema_,
00239 ACE_SYNCH_PROCESS_SEMAPHORE (0, client_sema),
00240 -1);
00241 ACE_NEW_RETURN (this->recv_channel_.lock_,
00242 ACE_SYNCH_PROCESS_MUTEX (client_lock),
00243 -1);
00244
00245 this->send_channel_.queue_.init (mymq, this->shm_malloc_);
00246 ACE_NEW_RETURN (this->send_channel_.sema_,
00247 ACE_SYNCH_PROCESS_SEMAPHORE (0, server_sema),
00248 -1);
00249 ACE_NEW_RETURN (this->send_channel_.lock_,
00250 ACE_SYNCH_PROCESS_MUTEX (server_lock),
00251 -1);
00252 }
00253 return 0;
00254 }
00255
00256 ssize_t
00257 ACE_MT_MEM_IO::recv_buf (ACE_MEM_SAP_Node *&buf,
00258 int flags,
00259 const ACE_Time_Value *timeout)
00260 {
00261 ACE_TRACE ("ACE_MT_MEM_IO::recv_buf");
00262
00263
00264 ACE_UNUSED_ARG (timeout);
00265 ACE_UNUSED_ARG (flags);
00266
00267 if (this->shm_malloc_ == 0)
00268 {
00269 return -1;
00270 }
00271
00272
00273 if (this->recv_channel_.sema_->acquire () == -1)
00274 {
00275 return -1;
00276 }
00277
00278 {
00279
00280 ACE_GUARD_RETURN (ACE_SYNCH_PROCESS_MUTEX, ace_mon, *this->recv_channel_.lock_, -1);
00281
00282 buf = this->recv_channel_.queue_.read ();
00283
00284 if (buf != 0)
00285 {
00286 return ACE_Utils::truncate_cast<ssize_t> (buf->size ());
00287 }
00288
00289 return -1;
00290 }
00291 }
00292
00293 ssize_t
00294 ACE_MT_MEM_IO::send_buf (ACE_MEM_SAP_Node *buf,
00295 int flags,
00296 const ACE_Time_Value *timeout)
00297 {
00298 ACE_TRACE ("ACE_MT_MEM_IO::send_buf");
00299
00300
00301 ACE_UNUSED_ARG (timeout);
00302 ACE_UNUSED_ARG (flags);
00303
00304 if (this->shm_malloc_ == 0)
00305 {
00306 return -1;
00307 }
00308
00309 {
00310
00311 ACE_GUARD_RETURN (ACE_SYNCH_PROCESS_MUTEX, ace_mon, *this->send_channel_.lock_, -1);
00312
00313 if (this->send_channel_.queue_.write (buf) == -1)
00314 {
00315 this->release_buffer (buf);
00316 return -1;
00317 }
00318 }
00319
00320 if (this->send_channel_.sema_->release () == -1)
00321 {
00322 return -1;
00323 }
00324
00325 return ACE_Utils::truncate_cast<ssize_t> (buf->size ());
00326 }
00327 #endif
00328
00329 void
00330 ACE_MEM_IO::dump (void) const
00331 {
00332 #if defined (ACE_HAS_DUMP)
00333 ACE_TRACE ("ACE_MEM_IO::dump");
00334 #endif
00335 }
00336
00337 int
00338 ACE_MEM_IO::init (const ACE_TCHAR *name,
00339 ACE_MEM_IO::Signal_Strategy type,
00340 ACE_MEM_SAP::MALLOC_OPTIONS *options)
00341 {
00342 ACE_UNUSED_ARG (type);
00343
00344 delete this->deliver_strategy_;
00345 this->deliver_strategy_ = 0;
00346 switch (type)
00347 {
00348 case ACE_MEM_IO::Reactive:
00349 ACE_NEW_RETURN (this->deliver_strategy_,
00350 ACE_Reactive_MEM_IO (),
00351 -1);
00352 break;
00353 #if defined (ACE_WIN32) || !defined (_ACE_USE_SV_SEM)
00354 case ACE_MEM_IO::MT:
00355 ACE_NEW_RETURN (this->deliver_strategy_,
00356 ACE_MT_MEM_IO (),
00357 -1);
00358 break;
00359 #endif
00360 default:
00361 return -1;
00362 }
00363
00364 return this->deliver_strategy_->init (this->get_handle (),
00365 name,
00366 options);
00367 }
00368
00369 int
00370 ACE_MEM_IO::fini (void)
00371 {
00372 if (this->deliver_strategy_ != 0)
00373 {
00374 return this->deliver_strategy_->fini ();
00375 }
00376 else
00377 {
00378 return -1;
00379 }
00380 }
00381
00382
00383
00384
00385
00386
00387 ssize_t
00388 ACE_MEM_IO::send (const ACE_Message_Block *message_block,
00389 const ACE_Time_Value *timeout)
00390 {
00391 ACE_TRACE ("ACE_MEM_IO::send");
00392
00393 if (this->deliver_strategy_ == 0)
00394 {
00395 return -1;
00396 }
00397
00398 size_t len = message_block->total_length ();
00399
00400 if (len != 0)
00401 {
00402 ACE_MEM_SAP_Node *buf =
00403 reinterpret_cast<ACE_MEM_SAP_Node *> (
00404 this->deliver_strategy_->acquire_buffer (
00405 ACE_Utils::truncate_cast<ssize_t> (len)));
00406
00407 size_t n = 0;
00408
00409 while (message_block != 0)
00410 {
00411 ACE_OS::memcpy (static_cast<char *> (buf->data ()) + n,
00412 message_block->rd_ptr (),
00413 message_block->length ());
00414 n += message_block->length ();
00415
00416 if (message_block->cont ())
00417 {
00418 message_block = message_block->cont ();
00419 }
00420 else
00421 {
00422 message_block = message_block->next ();
00423 }
00424 }
00425
00426 buf->size_ = len;
00427
00428 return this->deliver_strategy_->send_buf (buf,
00429 0,
00430 timeout);
00431 }
00432
00433 return 0;
00434 }
00435
00436
00437 #if 0
00438 ssize_t
00439 ACE_MEM_IO::recvv (iovec *io_vec,
00440 const ACE_Time_Value *timeout)
00441 {
00442 ACE_TRACE ("ACE_MEM_IO::recvv");
00443 #if defined (FIONREAD)
00444 ACE_Handle_Set handle_set;
00445 handle_set.reset ();
00446 handle_set.set_bit (this->get_handle ());
00447
00448 io_vec->iov_base = 0;
00449
00450
00451 switch (ACE_OS::select (int (this->get_handle ()) + 1,
00452 handle_set,
00453 0, 0,
00454 timeout))
00455 {
00456 case -1:
00457 return -1;
00458
00459 case 0:
00460 errno = ETIME;
00461 return -1;
00462
00463 default:
00464
00465 break;
00466 }
00467
00468 int inlen;
00469
00470 if (ACE_OS::ioctl (this->get_handle (),
00471 FIONREAD,
00472 &inlen) == -1)
00473 return -1;
00474 else if (inlen > 0)
00475 {
00476 ACE_NEW_RETURN (io_vec->iov_base,
00477 char[inlen],
00478 -1);
00479 io_vec->iov_len = this->recv (io_vec->iov_base,
00480 inlen);
00481 return io_vec->iov_len;
00482 }
00483 else
00484 return 0;
00485 #else
00486 ACE_UNUSED_ARG (io_vec);
00487 ACE_UNUSED_ARG (timeout);
00488 ACE_NOTSUP_RETURN (-1);
00489 #endif
00490 }
00491
00492
00493
00494
00495
00496
00497 ssize_t
00498 ACE_MEM_IO::send (size_t n, ...) const
00499 {
00500 ACE_TRACE ("ACE_MEM_IO::send");
00501
00502 va_list argp;
00503 size_t total_tuples = n / 2;
00504 iovec *iovp;
00505 #if defined (ACE_HAS_ALLOCA)
00506 iovp = (iovec *) alloca (total_tuples * sizeof (iovec));
00507 #else
00508 ACE_NEW_RETURN (iovp,
00509 iovec[total_tuples],
00510 -1);
00511 #endif
00512
00513 va_start (argp, n);
00514
00515 for (size_t i = 0; i < total_tuples; i++)
00516 {
00517 iovp[i].iov_base = va_arg (argp, char *);
00518 iovp[i].iov_len = va_arg (argp, ssize_t);
00519 }
00520
00521 ssize_t result = ACE_OS::sendv (this->get_handle (),
00522 iovp,
00523 total_tuples);
00524 #if !defined (ACE_HAS_ALLOCA)
00525 delete [] iovp;
00526 #endif
00527 va_end (argp);
00528 return result;
00529 }
00530
00531
00532
00533
00534
00535
00536
00537 ssize_t
00538 ACE_MEM_IO::recv (size_t n, ...) const
00539 {
00540 ACE_TRACE ("ACE_MEM_IO::recv");
00541
00542 va_list argp;
00543 size_t total_tuples = n / 2;
00544 iovec *iovp;
00545 #if defined (ACE_HAS_ALLOCA)
00546 iovp = (iovec *) alloca (total_tuples * sizeof (iovec));
00547 #else
00548 ACE_NEW_RETURN (iovp,
00549 iovec[total_tuples],
00550 -1);
00551 #endif
00552
00553 va_start (argp, n);
00554
00555 for (size_t i = 0; i < total_tuples; i++)
00556 {
00557 iovp[i].iov_base = va_arg (argp, char *);
00558 iovp[i].iov_len = va_arg (argp, ssize_t);
00559 }
00560
00561 ssize_t result = ACE_OS::recvv (this->get_handle (),
00562 iovp,
00563 total_tuples);
00564 #if !defined (ACE_HAS_ALLOCA)
00565 delete [] iovp;
00566 #endif
00567 va_end (argp);
00568 return result;
00569 }
00570 #endif
00571
00572 ACE_END_VERSIONED_NAMESPACE_DECL
00573
00574 #endif