00001 
00002 
00003 
00004 #include "ace/MEM_IO.h"
00005 #include "ace/Handle_Set.h"
00006 
00007 #if (ACE_HAS_POSITION_INDEPENDENT_POINTERS == 1)
00008 
00009 #if !defined (__ACE_INLINE__)
00010 #include "ace/MEM_IO.inl"
00011 #endif 
00012 
00013 ACE_RCSID(ace, MEM_IO, "$Id: MEM_IO.cpp 79134 2007-07-31 18:23:50Z johnnyw $")
00014 
00015 ACE_BEGIN_VERSIONED_NAMESPACE_DECL
00016 
00017 ACE_ALLOC_HOOK_DEFINE(ACE_MEM_IO)
00018 
00019 ACE_Reactive_MEM_IO::~ACE_Reactive_MEM_IO (void)
00020 {
00021 }
00022 
00023 int
00024 ACE_Reactive_MEM_IO::init (ACE_HANDLE handle,
00025                            const ACE_TCHAR *name,
00026                            MALLOC_OPTIONS *options)
00027 {
00028   ACE_TRACE ("ACE_Reactive_MEM_IO::init");
00029   this->handle_ = handle;
00030   return this->create_shm_malloc (name,
00031                                   options);
00032 }
00033 
00034 ssize_t
00035 ACE_Reactive_MEM_IO::recv_buf (ACE_MEM_SAP_Node *&buf,
00036                                int flags,
00037                                const ACE_Time_Value *timeout)
00038 {
00039   ACE_TRACE ("ACE_Reactive_MEM_IO::recv_buf");
00040 
00041   if (this->shm_malloc_ == 0 || this->handle_ == ACE_INVALID_HANDLE)
00042     return -1;
00043 
00044   ACE_OFF_T new_offset = 0;
00045   ssize_t retv = ACE::recv (this->handle_,
00046                             (char *) &new_offset,
00047                             sizeof (ACE_OFF_T),
00048                             flags,
00049                             timeout);
00050 
00051   if (retv == 0)
00052     {
00053       
00054       buf = 0;
00055       return 0;
00056     }
00057   else if (retv != sizeof (ACE_OFF_T))
00058     {
00059       
00060       buf = 0;
00061       return -1;
00062     }
00063 
00064   return this->get_buf_len (new_offset, buf);
00065 }
00066 
00067 ssize_t
00068 ACE_Reactive_MEM_IO::send_buf (ACE_MEM_SAP_Node *buf,
00069                                int flags,
00070                                const ACE_Time_Value *timeout)
00071 {
00072   ACE_TRACE ("ACE_Reactive_MEM_IO::send_buf");
00073 
00074   if (this->shm_malloc_ == 0 || this->handle_ == ACE_INVALID_HANDLE)
00075     return -1;
00076 
00077   ACE_OFF_T offset = reinterpret_cast<char *> (buf) -
00078     static_cast<char *> (this->shm_malloc_->base_addr ()); 
00079   
00080   if (ACE::send (this->handle_,
00081                  (const char *) &offset,
00082                  sizeof (offset),
00083                  flags,
00084                  timeout) != sizeof (offset))
00085     {
00086       
00087       this->release_buffer (buf);
00088 
00089       return -1;
00090     }
00091   return buf->size ();
00092 }
00093 
00094 #if defined (ACE_WIN32) || !defined (_ACE_USE_SV_SEM)
00095 int
00096 ACE_MT_MEM_IO::Simple_Queue::write (ACE_MEM_SAP_Node *new_node)
00097 {
00098   if (this->mq_ == 0)
00099     return -1;
00100 
00101   
00102   
00103   if (this->mq_->tail_.addr () == 0)     
00104     {
00105       this->mq_->head_ = new_node;
00106       this->mq_->tail_ = new_node;
00107       new_node->next_ = 0;
00108     }
00109   else
00110     {
00111       this->mq_->tail_->next_ = new_node;
00112       new_node->next_ = 0;
00113       this->mq_->tail_ = new_node;
00114     }
00115   return 0;
00116 }
00117 
00118 ACE_MEM_SAP_Node *
00119 ACE_MT_MEM_IO::Simple_Queue::read ()
00120 {
00121   if (this->mq_ == 0)
00122     return 0;
00123 
00124   ACE_MEM_SAP_Node *retv = 0;
00125 
00126   ACE_SEH_TRY
00127     {
00128       retv = this->mq_->head_;
00129       
00130       
00131       if (this->mq_->head_ == this->mq_->tail_)
00132         {
00133           
00134           this->mq_->head_ = 0;
00135           this->mq_->tail_ = 0;
00136         }
00137       else
00138         this->mq_->head_ = retv->next_;
00139     }
00140   ACE_SEH_EXCEPT (this->malloc_->memory_pool ().seh_selector (GetExceptionInformation ()))
00141     {
00142     }
00143 
00144   return retv;
00145 }
00146 
00147 ACE_MT_MEM_IO::~ACE_MT_MEM_IO ()
00148 {
00149   delete this->recv_channel_.sema_;
00150   delete this->recv_channel_.lock_;
00151   delete this->send_channel_.sema_;
00152   delete this->send_channel_.lock_;
00153 }
00154 
00155 int
00156 ACE_MT_MEM_IO::init (ACE_HANDLE handle,
00157                      const ACE_TCHAR *name,
00158                      MALLOC_OPTIONS *options)
00159 {
00160   ACE_TRACE ("ACE_MT_MEM_IO::init");
00161   ACE_UNUSED_ARG (handle);
00162 
00163   
00164   
00165   
00166   if (this->create_shm_malloc (name, options) == -1)
00167     return -1;
00168 
00169   ACE_TCHAR server_sema [MAXPATHLEN];
00170   ACE_TCHAR client_sema [MAXPATHLEN];
00171   ACE_TCHAR server_lock [MAXPATHLEN];
00172   ACE_TCHAR client_lock [MAXPATHLEN];
00173   const ACE_TCHAR *basename = ACE::basename (name);
00174   
00175 
00176   
00177   ACE_OS::strcpy (server_sema, basename);
00178   ACE_OS::strcat (server_sema, ACE_TEXT ("_sema_to_server"));
00179   ACE_OS::strcpy (client_sema, basename);
00180   ACE_OS::strcat (client_sema, ACE_TEXT ("_sema_to_client"));
00181   ACE_OS::strcpy (server_lock, basename);
00182   ACE_OS::strcat (server_lock, ACE_TEXT ("_lock_to_server"));
00183   ACE_OS::strcpy (client_lock, basename);
00184   ACE_OS::strcat (client_lock, ACE_TEXT ("_lock_to_client"));
00185 
00186   void *to_server_ptr = 0;
00187   
00188   
00189   
00190   
00191   if (this->shm_malloc_->find ("to_server", to_server_ptr) == -1)
00192     {
00193       void *ptr = 0;
00194       
00195       ACE_ALLOCATOR_RETURN (ptr,
00196                             this->shm_malloc_->malloc (2 * sizeof (MQ_Struct)),
00197                             -1);
00198 
00199       MQ_Struct *mymq = reinterpret_cast<MQ_Struct *> (ptr);
00200       mymq->tail_ = 0;
00201       mymq->head_ = 0;
00202       (mymq + 1)->tail_ = 0;
00203       (mymq + 1)->head_ = 0;
00204       if (this->shm_malloc_->bind ("to_server", mymq) == -1)
00205         return -1;
00206 
00207       if (this->shm_malloc_->bind ("to_client", mymq + 1) == -1)
00208         return -1;
00209 
00210       this->recv_channel_.queue_.init (mymq, this->shm_malloc_);
00211       ACE_NEW_RETURN (this->recv_channel_.sema_,
00212                       ACE_SYNCH_PROCESS_SEMAPHORE (0, server_sema),
00213                       -1);
00214       ACE_NEW_RETURN (this->recv_channel_.lock_,
00215                       ACE_SYNCH_PROCESS_MUTEX (server_lock),
00216                       -1);
00217 
00218       this->send_channel_.queue_.init (mymq + 1, this->shm_malloc_);
00219       ACE_NEW_RETURN (this->send_channel_.sema_,
00220                       ACE_SYNCH_PROCESS_SEMAPHORE (0, client_sema),
00221                       -1);
00222       ACE_NEW_RETURN (this->send_channel_.lock_,
00223                       ACE_SYNCH_PROCESS_MUTEX (client_lock),
00224                       -1);
00225     }
00226   else
00227     {
00228       
00229       MQ_Struct *mymq = reinterpret_cast<MQ_Struct *> (to_server_ptr);
00230       this->recv_channel_.queue_.init (mymq +1, this->shm_malloc_);
00231       ACE_NEW_RETURN (this->recv_channel_.sema_,
00232                       ACE_SYNCH_PROCESS_SEMAPHORE (0, client_sema),
00233                       -1);
00234       ACE_NEW_RETURN (this->recv_channel_.lock_,
00235                       ACE_SYNCH_PROCESS_MUTEX (client_lock),
00236                       -1);
00237 
00238       this->send_channel_.queue_.init (mymq, this->shm_malloc_);
00239       ACE_NEW_RETURN (this->send_channel_.sema_,
00240                       ACE_SYNCH_PROCESS_SEMAPHORE (0, server_sema),
00241                       -1);
00242       ACE_NEW_RETURN (this->send_channel_.lock_,
00243                       ACE_SYNCH_PROCESS_MUTEX (server_lock),
00244                       -1);
00245     }
00246   return 0;
00247 }
00248 
00249 ssize_t
00250 ACE_MT_MEM_IO::recv_buf (ACE_MEM_SAP_Node *&buf,
00251                          int flags,
00252                          const ACE_Time_Value *timeout)
00253 {
00254   ACE_TRACE ("ACE_MT_MEM_IO::recv_buf");
00255 
00256   
00257   ACE_UNUSED_ARG (timeout);
00258   ACE_UNUSED_ARG (flags);
00259 
00260   if (this->shm_malloc_ == 0)
00261     return -1;
00262 
00263   
00264   if (this->recv_channel_.sema_->acquire () == -1)
00265     return -1;
00266 
00267   {
00268     
00269     ACE_GUARD_RETURN (ACE_SYNCH_PROCESS_MUTEX, ace_mon, *this->recv_channel_.lock_, -1);
00270 
00271     buf = this->recv_channel_.queue_.read ();
00272     if (buf != 0)
00273       return buf->size ();
00274     return -1;
00275   }
00276 }
00277 
00278 ssize_t
00279 ACE_MT_MEM_IO::send_buf (ACE_MEM_SAP_Node *buf,
00280                          int flags,
00281                          const ACE_Time_Value *timeout)
00282 {
00283   ACE_TRACE ("ACE_MT_MEM_IO::send_buf");
00284 
00285   
00286   ACE_UNUSED_ARG (timeout);
00287   ACE_UNUSED_ARG (flags);
00288 
00289   if (this->shm_malloc_ == 0)
00290     return -1;
00291 
00292   {
00293     
00294     ACE_GUARD_RETURN (ACE_SYNCH_PROCESS_MUTEX, ace_mon, *this->send_channel_.lock_, -1);
00295 
00296     if (this->send_channel_.queue_.write (buf) == -1)
00297       {
00298         this->release_buffer (buf);
00299         return -1;
00300       }
00301   }
00302 
00303   if (this->send_channel_.sema_->release () == -1)
00304     return -1;
00305 
00306   return buf->size ();
00307 }
00308 #endif 
00309 
00310 void
00311 ACE_MEM_IO::dump (void) const
00312 {
00313 #if defined (ACE_HAS_DUMP)
00314   ACE_TRACE ("ACE_MEM_IO::dump");
00315 #endif 
00316 }
00317 
00318 int
00319 ACE_MEM_IO::init (const ACE_TCHAR *name,
00320                   ACE_MEM_IO::Signal_Strategy type,
00321                   ACE_MEM_SAP::MALLOC_OPTIONS *options)
00322 {
00323   ACE_UNUSED_ARG (type);
00324 
00325   delete this->deliver_strategy_;
00326   this->deliver_strategy_ = 0;
00327   switch (type)
00328     {
00329     case ACE_MEM_IO::Reactive:
00330       ACE_NEW_RETURN (this->deliver_strategy_,
00331                       ACE_Reactive_MEM_IO (),
00332                       -1);
00333       break;
00334 #if defined (ACE_WIN32) || !defined (_ACE_USE_SV_SEM)
00335     case ACE_MEM_IO::MT:
00336       ACE_NEW_RETURN (this->deliver_strategy_,
00337                       ACE_MT_MEM_IO (),
00338                       -1);
00339       break;
00340 #endif 
00341     default:
00342       return -1;
00343     }
00344 
00345   return this->deliver_strategy_->init (this->get_handle (),
00346                                         name,
00347                                         options);
00348 }
00349 
00350 int
00351 ACE_MEM_IO::fini ()
00352 {
00353   if (this->deliver_strategy_ != 0)
00354     return this->deliver_strategy_->fini ();
00355   else
00356     return -1;
00357 }
00358 
00359 
00360 
00361 
00362 
00363 
00364 ssize_t
00365 ACE_MEM_IO::send (const ACE_Message_Block *message_block,
00366                   const ACE_Time_Value *timeout)
00367 {
00368   ACE_TRACE ("ACE_MEM_IO::send");
00369 
00370   if (this->deliver_strategy_ == 0)
00371     return -1;                  
00372 
00373   size_t len = message_block->total_length ();
00374 
00375   if (len != 0)
00376     {
00377       ACE_MEM_SAP_Node *buf =
00378         reinterpret_cast<ACE_MEM_SAP_Node *> (
00379           this->deliver_strategy_->acquire_buffer (len));
00380       size_t n = 0;
00381       while (message_block != 0)
00382         {
00383           ACE_OS::memcpy (static_cast<char *> (buf->data ()) + n,
00384                           message_block->rd_ptr (),
00385                           message_block->length ());
00386           n += message_block->length ();
00387 
00388           if (message_block->cont ())
00389             message_block = message_block->cont ();
00390           else
00391             message_block = message_block->next ();
00392         }
00393 
00394       buf->size_ = len;
00395 
00396       return this->deliver_strategy_->send_buf (buf,
00397                                                 0,
00398                                                 timeout);
00399     }
00400   return 0;
00401 }
00402 
00403 
00404 #if 0
00405 ssize_t
00406 ACE_MEM_IO::recvv (iovec *io_vec,
00407                    const ACE_Time_Value *timeout)
00408 {
00409   ACE_TRACE ("ACE_MEM_IO::recvv");
00410 #if defined (FIONREAD)
00411   ACE_Handle_Set handle_set;
00412   handle_set.reset ();
00413   handle_set.set_bit (this->get_handle ());
00414 
00415   io_vec->iov_base = 0;
00416 
00417   
00418   switch (ACE_OS::select (int (this->get_handle ()) + 1,
00419                           handle_set,
00420                           0, 0,
00421                           timeout))
00422     {
00423     case -1:
00424       return -1;
00425       
00426     case 0:
00427       errno = ETIME;
00428       return -1;
00429       
00430     default:
00431       
00432       break;
00433     }
00434 
00435   int inlen;
00436 
00437   if (ACE_OS::ioctl (this->get_handle (),
00438                      FIONREAD,
00439                      &inlen) == -1)
00440     return -1;
00441   else if (inlen > 0)
00442     {
00443       ACE_NEW_RETURN (io_vec->iov_base,
00444                       char[inlen],
00445                       -1);
00446       io_vec->iov_len = this->recv (io_vec->iov_base,
00447                                     inlen);
00448       return io_vec->iov_len;
00449     }
00450   else
00451     return 0;
00452 #else
00453   ACE_UNUSED_ARG (io_vec);
00454   ACE_UNUSED_ARG (timeout);
00455   ACE_NOTSUP_RETURN (-1);
00456 #endif 
00457 }
00458 
00459 
00460 
00461 
00462 
00463 
00464 ssize_t
00465 ACE_MEM_IO::send (size_t n, ...) const
00466 {
00467   ACE_TRACE ("ACE_MEM_IO::send");
00468 
00469   va_list argp;
00470   size_t total_tuples = n / 2;
00471   iovec *iovp;
00472 #if defined (ACE_HAS_ALLOCA)
00473   iovp = (iovec *) alloca (total_tuples * sizeof (iovec));
00474 #else
00475   ACE_NEW_RETURN (iovp,
00476                   iovec[total_tuples],
00477                   -1);
00478 #endif 
00479 
00480   va_start (argp, n);
00481 
00482   for (size_t i = 0; i < total_tuples; i++)
00483     {
00484       iovp[i].iov_base = va_arg (argp, char *);
00485       iovp[i].iov_len = va_arg (argp, ssize_t);
00486     }
00487 
00488   ssize_t result = ACE_OS::sendv (this->get_handle (),
00489                                   iovp,
00490                                   total_tuples);
00491 #if !defined (ACE_HAS_ALLOCA)
00492   delete [] iovp;
00493 #endif 
00494   va_end (argp);
00495   return result;
00496 }
00497 
00498 
00499 
00500 
00501 
00502 
00503 
00504 ssize_t
00505 ACE_MEM_IO::recv (size_t n, ...) const
00506 {
00507   ACE_TRACE ("ACE_MEM_IO::recv");
00508 
00509   va_list argp;
00510   size_t total_tuples = n / 2;
00511   iovec *iovp;
00512 #if defined (ACE_HAS_ALLOCA)
00513   iovp = (iovec *) alloca (total_tuples * sizeof (iovec));
00514 #else
00515   ACE_NEW_RETURN (iovp,
00516                   iovec[total_tuples],
00517                   -1);
00518 #endif 
00519 
00520   va_start (argp, n);
00521 
00522   for (size_t i = 0; i < total_tuples; i++)
00523     {
00524       iovp[i].iov_base = va_arg (argp, char *);
00525       iovp[i].iov_len = va_arg (argp, ssize_t);
00526     }
00527 
00528   ssize_t result = ACE_OS::recvv (this->get_handle (),
00529                                   iovp,
00530                                   total_tuples);
00531 #if !defined (ACE_HAS_ALLOCA)
00532   delete [] iovp;
00533 #endif 
00534   va_end (argp);
00535   return result;
00536 }
00537 #endif 
00538 
00539 ACE_END_VERSIONED_NAMESPACE_DECL
00540 
00541 #endif