00001
00002
00003
00004 #include "ace/MEM_IO.h"
00005 #include "ace/Handle_Set.h"
00006
00007 #if (ACE_HAS_POSITION_INDEPENDENT_POINTERS == 1)
00008
00009 #if !defined (__ACE_INLINE__)
00010 #include "ace/MEM_IO.inl"
00011 #endif
00012
00013 ACE_RCSID(ace, MEM_IO, "MEM_IO.cpp,v 4.24 2006/03/14 21:15:49 sjiang Exp")
00014
00015 ACE_BEGIN_VERSIONED_NAMESPACE_DECL
00016
00017 ACE_ALLOC_HOOK_DEFINE(ACE_MEM_IO)
00018
00019 ACE_Reactive_MEM_IO::~ACE_Reactive_MEM_IO (void)
00020 {
00021 }
00022
00023 int
00024 ACE_Reactive_MEM_IO::init (ACE_HANDLE handle,
00025 const ACE_TCHAR *name,
00026 MALLOC_OPTIONS *options)
00027 {
00028 ACE_TRACE ("ACE_Reactive_MEM_IO::init");
00029 this->handle_ = handle;
00030 return this->create_shm_malloc (name,
00031 options);
00032 }
00033
00034 ssize_t
00035 ACE_Reactive_MEM_IO::recv_buf (ACE_MEM_SAP_Node *&buf,
00036 int flags,
00037 const ACE_Time_Value *timeout)
00038 {
00039 ACE_TRACE ("ACE_Reactive_MEM_IO::recv_buf");
00040
00041 if (this->shm_malloc_ == 0 || this->handle_ == ACE_INVALID_HANDLE)
00042 return -1;
00043
00044 off_t new_offset = 0;
00045 ssize_t retv = ACE::recv (this->handle_,
00046 (char *) &new_offset,
00047 sizeof (off_t),
00048 flags,
00049 timeout);
00050
00051 if (retv == 0)
00052 {
00053
00054 buf = 0;
00055 return 0;
00056 }
00057 else if (retv != sizeof (off_t))
00058 {
00059
00060 buf = 0;
00061 return -1;
00062 }
00063
00064 return this->get_buf_len (new_offset, buf);
00065 }
00066
00067 ssize_t
00068 ACE_Reactive_MEM_IO::send_buf (ACE_MEM_SAP_Node *buf,
00069 int flags,
00070 const ACE_Time_Value *timeout)
00071 {
00072 ACE_TRACE ("ACE_Reactive_MEM_IO::send_buf");
00073
00074 if (this->shm_malloc_ == 0 || this->handle_ == ACE_INVALID_HANDLE)
00075 return -1;
00076
00077 off_t offset = reinterpret_cast<char *> (buf) -
00078 static_cast<char *> (this->shm_malloc_->base_addr ());
00079
00080 if (ACE::send (this->handle_,
00081 (const char *) &offset,
00082 sizeof (offset),
00083 flags,
00084 timeout) != sizeof (offset))
00085 {
00086
00087 this->release_buffer (buf);
00088
00089 return -1;
00090 }
00091 return buf->size ();
00092 }
00093
00094 #if defined (ACE_WIN32) || !defined (_ACE_USE_SV_SEM)
00095 int
00096 ACE_MT_MEM_IO::Simple_Queue::write (ACE_MEM_SAP_Node *new_node)
00097 {
00098 if (this->mq_ == 0)
00099 return -1;
00100
00101
00102
00103 if (this->mq_->tail_.addr () == 0)
00104 {
00105 this->mq_->head_ = new_node;
00106 this->mq_->tail_ = new_node;
00107 new_node->next_ = 0;
00108 }
00109 else
00110 {
00111 this->mq_->tail_->next_ = new_node;
00112 new_node->next_ = 0;
00113 this->mq_->tail_ = new_node;
00114 }
00115 return 0;
00116 }
00117
00118 ACE_MEM_SAP_Node *
00119 ACE_MT_MEM_IO::Simple_Queue::read ()
00120 {
00121 if (this->mq_ == 0)
00122 return 0;
00123
00124 ACE_MEM_SAP_Node *retv = 0;
00125
00126 ACE_SEH_TRY
00127 {
00128 retv = this->mq_->head_;
00129
00130
00131 if (this->mq_->head_ == this->mq_->tail_)
00132 {
00133
00134 this->mq_->head_ = 0;
00135 this->mq_->tail_ = 0;
00136 }
00137 else
00138 this->mq_->head_ = retv->next_;
00139 }
00140 ACE_SEH_EXCEPT (this->malloc_->memory_pool ().seh_selector (GetExceptionInformation ()))
00141 {
00142 }
00143
00144 return retv;
00145 }
00146
00147 ACE_MT_MEM_IO::~ACE_MT_MEM_IO ()
00148 {
00149 delete this->recv_channel_.sema_;
00150 delete this->recv_channel_.lock_;
00151 delete this->send_channel_.sema_;
00152 delete this->send_channel_.lock_;
00153 }
00154
00155 int
00156 ACE_MT_MEM_IO::init (ACE_HANDLE handle,
00157 const ACE_TCHAR *name,
00158 MALLOC_OPTIONS *options)
00159 {
00160 ACE_TRACE ("ACE_MT_MEM_IO::init");
00161 ACE_UNUSED_ARG (handle);
00162
00163
00164
00165
00166 if (this->create_shm_malloc (name, options) == -1)
00167 return -1;
00168
00169 ACE_TCHAR server_sema [MAXPATHLEN];
00170 ACE_TCHAR client_sema [MAXPATHLEN];
00171 ACE_TCHAR server_lock [MAXPATHLEN];
00172 ACE_TCHAR client_lock [MAXPATHLEN];
00173 const ACE_TCHAR *basename = ACE::basename (name);
00174
00175
00176
00177 ACE_OS::strcpy (server_sema, basename);
00178 ACE_OS::strcat (server_sema, ACE_LIB_TEXT ("_sema_to_server"));
00179 ACE_OS::strcpy (client_sema, basename);
00180 ACE_OS::strcat (client_sema, ACE_LIB_TEXT ("_sema_to_client"));
00181 ACE_OS::strcpy (server_lock, basename);
00182 ACE_OS::strcat (server_lock, ACE_LIB_TEXT ("_lock_to_server"));
00183 ACE_OS::strcpy (client_lock, basename);
00184 ACE_OS::strcat (client_lock, ACE_LIB_TEXT ("_lock_to_client"));
00185
00186 void *to_server_ptr = 0;
00187
00188
00189
00190
00191 if (this->shm_malloc_->find ("to_server", to_server_ptr) == -1)
00192 {
00193 void *ptr = 0;
00194
00195 ACE_ALLOCATOR_RETURN (ptr,
00196 this->shm_malloc_->malloc (2 * sizeof (MQ_Struct)),
00197 -1);
00198
00199 MQ_Struct *mymq = reinterpret_cast<MQ_Struct *> (ptr);
00200 mymq->tail_ = 0;
00201 mymq->head_ = 0;
00202 (mymq + 1)->tail_ = 0;
00203 (mymq + 1)->head_ = 0;
00204 if (this->shm_malloc_->bind ("to_server", mymq) == -1)
00205 return -1;
00206
00207 if (this->shm_malloc_->bind ("to_client", mymq + 1) == -1)
00208 return -1;
00209
00210 this->recv_channel_.queue_.init (mymq, this->shm_malloc_);
00211 ACE_NEW_RETURN (this->recv_channel_.sema_,
00212 ACE_SYNCH_PROCESS_SEMAPHORE (0, server_sema),
00213 -1);
00214 ACE_NEW_RETURN (this->recv_channel_.lock_,
00215 ACE_SYNCH_PROCESS_MUTEX (server_lock),
00216 -1);
00217
00218 this->send_channel_.queue_.init (mymq + 1, this->shm_malloc_);
00219 ACE_NEW_RETURN (this->send_channel_.sema_,
00220 ACE_SYNCH_PROCESS_SEMAPHORE (0, client_sema),
00221 -1);
00222 ACE_NEW_RETURN (this->send_channel_.lock_,
00223 ACE_SYNCH_PROCESS_MUTEX (client_lock),
00224 -1);
00225 }
00226 else
00227 {
00228
00229 MQ_Struct *mymq = reinterpret_cast<MQ_Struct *> (to_server_ptr);
00230 this->recv_channel_.queue_.init (mymq +1, this->shm_malloc_);
00231 ACE_NEW_RETURN (this->recv_channel_.sema_,
00232 ACE_SYNCH_PROCESS_SEMAPHORE (0, client_sema),
00233 -1);
00234 ACE_NEW_RETURN (this->recv_channel_.lock_,
00235 ACE_SYNCH_PROCESS_MUTEX (client_lock),
00236 -1);
00237
00238 this->send_channel_.queue_.init (mymq, this->shm_malloc_);
00239 ACE_NEW_RETURN (this->send_channel_.sema_,
00240 ACE_SYNCH_PROCESS_SEMAPHORE (0, server_sema),
00241 -1);
00242 ACE_NEW_RETURN (this->send_channel_.lock_,
00243 ACE_SYNCH_PROCESS_MUTEX (server_lock),
00244 -1);
00245 }
00246 return 0;
00247 }
00248
00249 ssize_t
00250 ACE_MT_MEM_IO::recv_buf (ACE_MEM_SAP_Node *&buf,
00251 int flags,
00252 const ACE_Time_Value *timeout)
00253 {
00254 ACE_TRACE ("ACE_MT_MEM_IO::recv_buf");
00255
00256
00257 ACE_UNUSED_ARG (timeout);
00258 ACE_UNUSED_ARG (flags);
00259
00260 if (this->shm_malloc_ == 0)
00261 return -1;
00262
00263
00264 if (this->recv_channel_.sema_->acquire () == -1)
00265 return -1;
00266
00267 {
00268
00269 ACE_GUARD_RETURN (ACE_SYNCH_PROCESS_MUTEX, ace_mon, *this->recv_channel_.lock_, -1);
00270
00271 buf = this->recv_channel_.queue_.read ();
00272 if (buf != 0)
00273 return buf->size ();
00274 return -1;
00275 }
00276 }
00277
00278 ssize_t
00279 ACE_MT_MEM_IO::send_buf (ACE_MEM_SAP_Node *buf,
00280 int flags,
00281 const ACE_Time_Value *timeout)
00282 {
00283 ACE_TRACE ("ACE_MT_MEM_IO::send_buf");
00284
00285
00286 ACE_UNUSED_ARG (timeout);
00287 ACE_UNUSED_ARG (flags);
00288
00289 if (this->shm_malloc_ == 0)
00290 return -1;
00291
00292 {
00293
00294 ACE_GUARD_RETURN (ACE_SYNCH_PROCESS_MUTEX, ace_mon, *this->send_channel_.lock_, -1);
00295
00296 if (this->send_channel_.queue_.write (buf) == -1)
00297 {
00298 this->release_buffer (buf);
00299 return -1;
00300 }
00301 }
00302
00303 if (this->send_channel_.sema_->release () == -1)
00304 return -1;
00305
00306 return buf->size ();
00307 }
00308 #endif
00309
00310 void
00311 ACE_MEM_IO::dump (void) const
00312 {
00313 #if defined (ACE_HAS_DUMP)
00314 ACE_TRACE ("ACE_MEM_IO::dump");
00315 #endif
00316 }
00317
00318 int
00319 ACE_MEM_IO::init (const ACE_TCHAR *name,
00320 ACE_MEM_IO::Signal_Strategy type,
00321 ACE_MEM_SAP::MALLOC_OPTIONS *options)
00322 {
00323 ACE_UNUSED_ARG (type);
00324
00325 delete this->deliver_strategy_;
00326 this->deliver_strategy_ = 0;
00327 switch (type)
00328 {
00329 case ACE_MEM_IO::Reactive:
00330 ACE_NEW_RETURN (this->deliver_strategy_,
00331 ACE_Reactive_MEM_IO (),
00332 -1);
00333 break;
00334 #if defined (ACE_WIN32) || !defined (_ACE_USE_SV_SEM)
00335 case ACE_MEM_IO::MT:
00336 ACE_NEW_RETURN (this->deliver_strategy_,
00337 ACE_MT_MEM_IO (),
00338 -1);
00339 break;
00340 #endif
00341 default:
00342 return -1;
00343 }
00344
00345 return this->deliver_strategy_->init (this->get_handle (),
00346 name,
00347 options);
00348 }
00349
00350 int
00351 ACE_MEM_IO::fini ()
00352 {
00353 if (this->deliver_strategy_ != 0)
00354 return this->deliver_strategy_->fini ();
00355 else
00356 return -1;
00357 }
00358
00359
00360
00361
00362
00363
00364 ssize_t
00365 ACE_MEM_IO::send (const ACE_Message_Block *message_block,
00366 const ACE_Time_Value *timeout)
00367 {
00368 ACE_TRACE ("ACE_MEM_IO::send");
00369
00370 if (this->deliver_strategy_ == 0)
00371 return -1;
00372
00373 size_t len = message_block->total_length ();
00374
00375 if (len != 0)
00376 {
00377 ACE_MEM_SAP_Node *buf =
00378 reinterpret_cast<ACE_MEM_SAP_Node *> (
00379 this->deliver_strategy_->acquire_buffer (len));
00380 size_t n = 0;
00381 while (message_block != 0)
00382 {
00383 ACE_OS::memcpy (static_cast<char *> (buf->data ()) + n,
00384 message_block->rd_ptr (),
00385 message_block->length ());
00386 n += message_block->length ();
00387
00388 if (message_block->cont ())
00389 message_block = message_block->cont ();
00390 else
00391 message_block = message_block->next ();
00392 }
00393
00394 buf->size_ = len;
00395
00396 return this->deliver_strategy_->send_buf (buf,
00397 0,
00398 timeout);
00399 }
00400 return 0;
00401 }
00402
00403
00404 #if 0
00405 ssize_t
00406 ACE_MEM_IO::recvv (iovec *io_vec,
00407 const ACE_Time_Value *timeout)
00408 {
00409 ACE_TRACE ("ACE_MEM_IO::recvv");
00410 #if defined (FIONREAD)
00411 ACE_Handle_Set handle_set;
00412 handle_set.reset ();
00413 handle_set.set_bit (this->get_handle ());
00414
00415 io_vec->iov_base = 0;
00416
00417
00418 switch (ACE_OS::select (int (this->get_handle ()) + 1,
00419 handle_set,
00420 0, 0,
00421 timeout))
00422 {
00423 case -1:
00424 return -1;
00425
00426 case 0:
00427 errno = ETIME;
00428 return -1;
00429
00430 default:
00431
00432 break;
00433 }
00434
00435 int inlen;
00436
00437 if (ACE_OS::ioctl (this->get_handle (),
00438 FIONREAD,
00439 &inlen) == -1)
00440 return -1;
00441 else if (inlen > 0)
00442 {
00443 ACE_NEW_RETURN (io_vec->iov_base,
00444 char[inlen],
00445 -1);
00446 io_vec->iov_len = this->recv (io_vec->iov_base,
00447 inlen);
00448 return io_vec->iov_len;
00449 }
00450 else
00451 return 0;
00452 #else
00453 ACE_UNUSED_ARG (io_vec);
00454 ACE_UNUSED_ARG (timeout);
00455 ACE_NOTSUP_RETURN (-1);
00456 #endif
00457 }
00458
00459
00460
00461
00462
00463
00464 ssize_t
00465 ACE_MEM_IO::send (size_t n, ...) const
00466 {
00467 ACE_TRACE ("ACE_MEM_IO::send");
00468
00469 va_list argp;
00470 size_t total_tuples = n / 2;
00471 iovec *iovp;
00472 #if defined (ACE_HAS_ALLOCA)
00473 iovp = (iovec *) alloca (total_tuples * sizeof (iovec));
00474 #else
00475 ACE_NEW_RETURN (iovp,
00476 iovec[total_tuples],
00477 -1);
00478 #endif
00479
00480 va_start (argp, n);
00481
00482 for (size_t i = 0; i < total_tuples; i++)
00483 {
00484 iovp[i].iov_base = va_arg (argp, char *);
00485 iovp[i].iov_len = va_arg (argp, ssize_t);
00486 }
00487
00488 ssize_t result = ACE_OS::sendv (this->get_handle (),
00489 iovp,
00490 total_tuples);
00491 #if !defined (ACE_HAS_ALLOCA)
00492 delete [] iovp;
00493 #endif
00494 va_end (argp);
00495 return result;
00496 }
00497
00498
00499
00500
00501
00502
00503
00504 ssize_t
00505 ACE_MEM_IO::recv (size_t n, ...) const
00506 {
00507 ACE_TRACE ("ACE_MEM_IO::recv");
00508
00509 va_list argp;
00510 size_t total_tuples = n / 2;
00511 iovec *iovp;
00512 #if defined (ACE_HAS_ALLOCA)
00513 iovp = (iovec *) alloca (total_tuples * sizeof (iovec));
00514 #else
00515 ACE_NEW_RETURN (iovp,
00516 iovec[total_tuples],
00517 -1);
00518 #endif
00519
00520 va_start (argp, n);
00521
00522 for (size_t i = 0; i < total_tuples; i++)
00523 {
00524 iovp[i].iov_base = va_arg (argp, char *);
00525 iovp[i].iov_len = va_arg (argp, ssize_t);
00526 }
00527
00528 ssize_t result = ACE_OS::recvv (this->get_handle (),
00529 iovp,
00530 total_tuples);
00531 #if !defined (ACE_HAS_ALLOCA)
00532 delete [] iovp;
00533 #endif
00534 va_end (argp);
00535 return result;
00536 }
00537 #endif
00538
00539 ACE_END_VERSIONED_NAMESPACE_DECL
00540
00541 #endif