MEM_IO.cpp

Go to the documentation of this file.
00001 // MEM_IO.cpp
00002 // MEM_IO.cpp,v 4.24 2006/03/14 21:15:49 sjiang Exp
00003 
00004 #include "ace/MEM_IO.h"
00005 #include "ace/Handle_Set.h"
00006 
00007 #if (ACE_HAS_POSITION_INDEPENDENT_POINTERS == 1)
00008 
00009 #if !defined (__ACE_INLINE__)
00010 #include "ace/MEM_IO.inl"
00011 #endif /* __ACE_INLINE__ */
00012 
00013 ACE_RCSID(ace, MEM_IO, "MEM_IO.cpp,v 4.24 2006/03/14 21:15:49 sjiang Exp")
00014 
00015 ACE_BEGIN_VERSIONED_NAMESPACE_DECL
00016 
00017 ACE_ALLOC_HOOK_DEFINE(ACE_MEM_IO)
00018 
00019 ACE_Reactive_MEM_IO::~ACE_Reactive_MEM_IO (void)
00020 {
00021 }
00022 
00023 int
00024 ACE_Reactive_MEM_IO::init (ACE_HANDLE handle,
00025                            const ACE_TCHAR *name,
00026                            MALLOC_OPTIONS *options)
00027 {
00028   ACE_TRACE ("ACE_Reactive_MEM_IO::init");
00029   this->handle_ = handle;
00030   return this->create_shm_malloc (name,
00031                                   options);
00032 }
00033 
00034 ssize_t
00035 ACE_Reactive_MEM_IO::recv_buf (ACE_MEM_SAP_Node *&buf,
00036                                int flags,
00037                                const ACE_Time_Value *timeout)
00038 {
00039   ACE_TRACE ("ACE_Reactive_MEM_IO::recv_buf");
00040 
00041   if (this->shm_malloc_ == 0 || this->handle_ == ACE_INVALID_HANDLE)
00042     return -1;
00043 
00044   off_t new_offset = 0;
00045   ssize_t retv = ACE::recv (this->handle_,
00046                             (char *) &new_offset,
00047                             sizeof (off_t),
00048                             flags,
00049                             timeout);
00050 
00051   if (retv == 0)
00052     {
00053       //      ACE_DEBUG ((LM_INFO, "MEM_Stream closed\n"));
00054       buf = 0;
00055       return 0;
00056     }
00057   else if (retv != sizeof (off_t))
00058     {
00059       //  Nothing available or we are really screwed.
00060       buf = 0;
00061       return -1;
00062     }
00063 
00064   return this->get_buf_len (new_offset, buf);
00065 }
00066 
00067 ssize_t
00068 ACE_Reactive_MEM_IO::send_buf (ACE_MEM_SAP_Node *buf,
00069                                int flags,
00070                                const ACE_Time_Value *timeout)
00071 {
00072   ACE_TRACE ("ACE_Reactive_MEM_IO::send_buf");
00073 
00074   if (this->shm_malloc_ == 0 || this->handle_ == ACE_INVALID_HANDLE)
00075     return -1;
00076 
00077   off_t offset = reinterpret_cast<char *> (buf) -
00078     static_cast<char *> (this->shm_malloc_->base_addr ()); // the offset.
00079   // Send the offset value over the socket.
00080   if (ACE::send (this->handle_,
00081                  (const char *) &offset,
00082                  sizeof (offset),
00083                  flags,
00084                  timeout) != sizeof (offset))
00085     {
00086       // unsucessful send, release the memory in the shared-memory.
00087       this->release_buffer (buf);
00088 
00089       return -1;
00090     }
00091   return buf->size ();
00092 }
00093 
00094 #if defined (ACE_WIN32) || !defined (_ACE_USE_SV_SEM)
00095 int
00096 ACE_MT_MEM_IO::Simple_Queue::write (ACE_MEM_SAP_Node *new_node)
00097 {
00098   if (this->mq_ == 0)
00099     return -1;
00100 
00101   // Here, we assume we already have acquired the lock necessary.
00102   // And we are allowed to write.
00103   if (this->mq_->tail_.addr () == 0)     // nothing in the queue.
00104     {
00105       this->mq_->head_ = new_node;
00106       this->mq_->tail_ = new_node;
00107       new_node->next_ = 0;
00108     }
00109   else
00110     {
00111       this->mq_->tail_->next_ = new_node;
00112       new_node->next_ = 0;
00113       this->mq_->tail_ = new_node;
00114     }
00115   return 0;
00116 }
00117 
00118 ACE_MEM_SAP_Node *
00119 ACE_MT_MEM_IO::Simple_Queue::read ()
00120 {
00121   if (this->mq_ == 0)
00122     return 0;
00123 
00124   ACE_MEM_SAP_Node *retv = 0;
00125 
00126   ACE_SEH_TRY
00127     {
00128       retv = this->mq_->head_;
00129       // Here, we assume we already have acquired the lock necessary
00130       // and there are soemthing in the queue.
00131       if (this->mq_->head_ == this->mq_->tail_)
00132         {
00133           // Last message in the queue.
00134           this->mq_->head_ = 0;
00135           this->mq_->tail_ = 0;
00136         }
00137       else
00138         this->mq_->head_ = retv->next_;
00139     }
00140   ACE_SEH_EXCEPT (this->malloc_->memory_pool ().seh_selector (GetExceptionInformation ()))
00141     {
00142     }
00143 
00144   return retv;
00145 }
00146 
00147 ACE_MT_MEM_IO::~ACE_MT_MEM_IO ()
00148 {
00149   delete this->recv_channel_.sema_;
00150   delete this->recv_channel_.lock_;
00151   delete this->send_channel_.sema_;
00152   delete this->send_channel_.lock_;
00153 }
00154 
00155 int
00156 ACE_MT_MEM_IO::init (ACE_HANDLE handle,
00157                      const ACE_TCHAR *name,
00158                      MALLOC_OPTIONS *options)
00159 {
00160   ACE_TRACE ("ACE_MT_MEM_IO::init");
00161   ACE_UNUSED_ARG (handle);
00162 
00163   // @@ Give me a rule on naming and how the queue should
00164   //    be kept in the shared memory and we are done
00165   //    with this.
00166   if (this->create_shm_malloc (name, options) == -1)
00167     return -1;
00168 
00169   ACE_TCHAR server_sema [MAXPATHLEN];
00170   ACE_TCHAR client_sema [MAXPATHLEN];
00171   ACE_TCHAR server_lock [MAXPATHLEN];
00172   ACE_TCHAR client_lock [MAXPATHLEN];
00173   const ACE_TCHAR *basename = ACE::basename (name);
00174   //  size_t baselen = ACE_OS::strlen (basename);
00175 
00176   // Building names.  @@ Check buffer overflow?
00177   ACE_OS::strcpy (server_sema, basename);
00178   ACE_OS::strcat (server_sema, ACE_LIB_TEXT ("_sema_to_server"));
00179   ACE_OS::strcpy (client_sema, basename);
00180   ACE_OS::strcat (client_sema, ACE_LIB_TEXT ("_sema_to_client"));
00181   ACE_OS::strcpy (server_lock, basename);
00182   ACE_OS::strcat (server_lock, ACE_LIB_TEXT ("_lock_to_server"));
00183   ACE_OS::strcpy (client_lock, basename);
00184   ACE_OS::strcat (client_lock, ACE_LIB_TEXT ("_lock_to_client"));
00185 
00186   void *to_server_ptr = 0;
00187   // @@ Here, we assume the shared memory fill will never be resued.
00188   //    So we can determine whether we are server or client by examining
00189   //    if the simple message queues have already been set up in
00190   //    the Malloc object or not.
00191   if (this->shm_malloc_->find ("to_server", to_server_ptr) == -1)
00192     {
00193       void *ptr = 0;
00194       // We are server.
00195       ACE_ALLOCATOR_RETURN (ptr,
00196                             this->shm_malloc_->malloc (2 * sizeof (MQ_Struct)),
00197                             -1);
00198 
00199       MQ_Struct *mymq = reinterpret_cast<MQ_Struct *> (ptr);
00200       mymq->tail_ = 0;
00201       mymq->head_ = 0;
00202       (mymq + 1)->tail_ = 0;
00203       (mymq + 1)->head_ = 0;
00204       if (this->shm_malloc_->bind ("to_server", mymq) == -1)
00205         return -1;
00206 
00207       if (this->shm_malloc_->bind ("to_client", mymq + 1) == -1)
00208         return -1;
00209 
00210       this->recv_channel_.queue_.init (mymq, this->shm_malloc_);
00211       ACE_NEW_RETURN (this->recv_channel_.sema_,
00212                       ACE_SYNCH_PROCESS_SEMAPHORE (0, server_sema),
00213                       -1);
00214       ACE_NEW_RETURN (this->recv_channel_.lock_,
00215                       ACE_SYNCH_PROCESS_MUTEX (server_lock),
00216                       -1);
00217 
00218       this->send_channel_.queue_.init (mymq + 1, this->shm_malloc_);
00219       ACE_NEW_RETURN (this->send_channel_.sema_,
00220                       ACE_SYNCH_PROCESS_SEMAPHORE (0, client_sema),
00221                       -1);
00222       ACE_NEW_RETURN (this->send_channel_.lock_,
00223                       ACE_SYNCH_PROCESS_MUTEX (client_lock),
00224                       -1);
00225     }
00226   else
00227     {
00228       // we are client.
00229       MQ_Struct *mymq = reinterpret_cast<MQ_Struct *> (to_server_ptr);
00230       this->recv_channel_.queue_.init (mymq +1, this->shm_malloc_);
00231       ACE_NEW_RETURN (this->recv_channel_.sema_,
00232                       ACE_SYNCH_PROCESS_SEMAPHORE (0, client_sema),
00233                       -1);
00234       ACE_NEW_RETURN (this->recv_channel_.lock_,
00235                       ACE_SYNCH_PROCESS_MUTEX (client_lock),
00236                       -1);
00237 
00238       this->send_channel_.queue_.init (mymq, this->shm_malloc_);
00239       ACE_NEW_RETURN (this->send_channel_.sema_,
00240                       ACE_SYNCH_PROCESS_SEMAPHORE (0, server_sema),
00241                       -1);
00242       ACE_NEW_RETURN (this->send_channel_.lock_,
00243                       ACE_SYNCH_PROCESS_MUTEX (server_lock),
00244                       -1);
00245     }
00246   return 0;
00247 }
00248 
00249 ssize_t
00250 ACE_MT_MEM_IO::recv_buf (ACE_MEM_SAP_Node *&buf,
00251                          int flags,
00252                          const ACE_Time_Value *timeout)
00253 {
00254   ACE_TRACE ("ACE_MT_MEM_IO::recv_buf");
00255 
00256   // @@ Don't know how to handle timeout yet.
00257   ACE_UNUSED_ARG (timeout);
00258   ACE_UNUSED_ARG (flags);
00259 
00260   if (this->shm_malloc_ == 0)
00261     return -1;
00262 
00263   // Need to handle timeout here.
00264   if (this->recv_channel_.sema_->acquire () == -1)
00265     return -1;
00266 
00267   {
00268     // @@ We can probably skip the lock in certain circumstance.
00269     ACE_GUARD_RETURN (ACE_SYNCH_PROCESS_MUTEX, ace_mon, *this->recv_channel_.lock_, -1);
00270 
00271     buf = this->recv_channel_.queue_.read ();
00272     if (buf != 0)
00273       return buf->size ();
00274     return -1;
00275   }
00276 }
00277 
00278 ssize_t
00279 ACE_MT_MEM_IO::send_buf (ACE_MEM_SAP_Node *buf,
00280                          int flags,
00281                          const ACE_Time_Value *timeout)
00282 {
00283   ACE_TRACE ("ACE_MT_MEM_IO::send_buf");
00284 
00285   // @@ Don't know how to handle timeout yet.
00286   ACE_UNUSED_ARG (timeout);
00287   ACE_UNUSED_ARG (flags);
00288 
00289   if (this->shm_malloc_ == 0)
00290     return -1;
00291 
00292   {
00293     // @@ We can probably skip the lock in certain curcumstances.
00294     ACE_GUARD_RETURN (ACE_SYNCH_PROCESS_MUTEX, ace_mon, *this->send_channel_.lock_, -1);
00295 
00296     if (this->send_channel_.queue_.write (buf) == -1)
00297       {
00298         this->release_buffer (buf);
00299         return -1;
00300       }
00301   }
00302 
00303   if (this->send_channel_.sema_->release () == -1)
00304     return -1;
00305 
00306   return buf->size ();
00307 }
00308 #endif /* ACE_WIN32 || !_ACE_USE_SV_SEM */
00309 
00310 void
00311 ACE_MEM_IO::dump (void) const
00312 {
00313 #if defined (ACE_HAS_DUMP)
00314   ACE_TRACE ("ACE_MEM_IO::dump");
00315 #endif /* ACE_HAS_DUMP */
00316 }
00317 
00318 int
00319 ACE_MEM_IO::init (const ACE_TCHAR *name,
00320                   ACE_MEM_IO::Signal_Strategy type,
00321                   ACE_MEM_SAP::MALLOC_OPTIONS *options)
00322 {
00323   ACE_UNUSED_ARG (type);
00324 
00325   delete this->deliver_strategy_;
00326   this->deliver_strategy_ = 0;
00327   switch (type)
00328     {
00329     case ACE_MEM_IO::Reactive:
00330       ACE_NEW_RETURN (this->deliver_strategy_,
00331                       ACE_Reactive_MEM_IO (),
00332                       -1);
00333       break;
00334 #if defined (ACE_WIN32) || !defined (_ACE_USE_SV_SEM)
00335     case ACE_MEM_IO::MT:
00336       ACE_NEW_RETURN (this->deliver_strategy_,
00337                       ACE_MT_MEM_IO (),
00338                       -1);
00339       break;
00340 #endif /* ACE_WIN32 || !_ACE_USE_SV_SEM */
00341     default:
00342       return -1;
00343     }
00344 
00345   return this->deliver_strategy_->init (this->get_handle (),
00346                                         name,
00347                                         options);
00348 }
00349 
00350 int
00351 ACE_MEM_IO::fini ()
00352 {
00353   if (this->deliver_strategy_ != 0)
00354     return this->deliver_strategy_->fini ();
00355   else
00356     return -1;
00357 }
00358 
00359 // Allows a client to read from a socket without having to provide
00360 // a buffer to read.  This method determines how much data is in the
00361 // socket, allocates a buffer of this size, reads in the data, and
00362 // returns the number of bytes read.
00363 
00364 ssize_t
00365 ACE_MEM_IO::send (const ACE_Message_Block *message_block,
00366                   const ACE_Time_Value *timeout)
00367 {
00368   ACE_TRACE ("ACE_MEM_IO::send");
00369 
00370   if (this->deliver_strategy_ == 0)
00371     return -1;                  // Something went seriously wrong.
00372 
00373   size_t len = message_block->total_length ();
00374 
00375   if (len != 0)
00376     {
00377       ACE_MEM_SAP_Node *buf =
00378         reinterpret_cast<ACE_MEM_SAP_Node *> (
00379           this->deliver_strategy_->acquire_buffer (len));
00380       size_t n = 0;
00381       while (message_block != 0)
00382         {
00383           ACE_OS::memcpy (static_cast<char *> (buf->data ()) + n,
00384                           message_block->rd_ptr (),
00385                           message_block->length ());
00386           n += message_block->length ();
00387 
00388           if (message_block->cont ())
00389             message_block = message_block->cont ();
00390           else
00391             message_block = message_block->next ();
00392         }
00393 
00394       buf->size_ = len;
00395 
00396       return this->deliver_strategy_->send_buf (buf,
00397                                                 0,
00398                                                 timeout);
00399     }
00400   return 0;
00401 }
00402 
00403 
00404 #if 0
00405 ssize_t
00406 ACE_MEM_IO::recvv (iovec *io_vec,
00407                    const ACE_Time_Value *timeout)
00408 {
00409   ACE_TRACE ("ACE_MEM_IO::recvv");
00410 #if defined (FIONREAD)
00411   ACE_Handle_Set handle_set;
00412   handle_set.reset ();
00413   handle_set.set_bit (this->get_handle ());
00414 
00415   io_vec->iov_base = 0;
00416 
00417   // Check the status of the current socket.
00418   switch (ACE_OS::select (int (this->get_handle ()) + 1,
00419                           handle_set,
00420                           0, 0,
00421                           timeout))
00422     {
00423     case -1:
00424       return -1;
00425       /* NOTREACHED */
00426     case 0:
00427       errno = ETIME;
00428       return -1;
00429       /* NOTREACHED */
00430     default:
00431       // Goes fine, fallthrough to get data
00432       break;
00433     }
00434 
00435   int inlen;
00436 
00437   if (ACE_OS::ioctl (this->get_handle (),
00438                      FIONREAD,
00439                      &inlen) == -1)
00440     return -1;
00441   else if (inlen > 0)
00442     {
00443       ACE_NEW_RETURN (io_vec->iov_base,
00444                       char[inlen],
00445                       -1);
00446       io_vec->iov_len = this->recv (io_vec->iov_base,
00447                                     inlen);
00448       return io_vec->iov_len;
00449     }
00450   else
00451     return 0;
00452 #else
00453   ACE_UNUSED_ARG (io_vec);
00454   ACE_UNUSED_ARG (timeout);
00455   ACE_NOTSUP_RETURN (-1);
00456 #endif /* FIONREAD */
00457 }
00458 
00459 // Send N char *ptrs and int lengths.  Note that the char *'s precede
00460 // the ints (basically, an varargs version of writev).  The count N is
00461 // the *total* number of trailing arguments, *not* a couple of the
00462 // number of tuple pairs!
00463 
00464 ssize_t
00465 ACE_MEM_IO::send (size_t n, ...) const
00466 {
00467   ACE_TRACE ("ACE_MEM_IO::send");
00468 
00469   va_list argp;
00470   size_t total_tuples = n / 2;
00471   iovec *iovp;
00472 #if defined (ACE_HAS_ALLOCA)
00473   iovp = (iovec *) alloca (total_tuples * sizeof (iovec));
00474 #else
00475   ACE_NEW_RETURN (iovp,
00476                   iovec[total_tuples],
00477                   -1);
00478 #endif /* !defined (ACE_HAS_ALLOCA) */
00479 
00480   va_start (argp, n);
00481 
00482   for (size_t i = 0; i < total_tuples; i++)
00483     {
00484       iovp[i].iov_base = va_arg (argp, char *);
00485       iovp[i].iov_len = va_arg (argp, ssize_t);
00486     }
00487 
00488   ssize_t result = ACE_OS::sendv (this->get_handle (),
00489                                   iovp,
00490                                   total_tuples);
00491 #if !defined (ACE_HAS_ALLOCA)
00492   delete [] iovp;
00493 #endif /* !defined (ACE_HAS_ALLOCA) */
00494   va_end (argp);
00495   return result;
00496 }
00497 
00498 // This is basically an interface to ACE_OS::readv, that doesn't use
00499 // the struct iovec_Base explicitly.  The ... can be passed as an arbitrary
00500 // number of (char *ptr, int len) tuples.  However, the count N is the
00501 // *total* number of trailing arguments, *not* a couple of the number
00502 // of tuple pairs!
00503 
00504 ssize_t
00505 ACE_MEM_IO::recv (size_t n, ...) const
00506 {
00507   ACE_TRACE ("ACE_MEM_IO::recv");
00508 
00509   va_list argp;
00510   size_t total_tuples = n / 2;
00511   iovec *iovp;
00512 #if defined (ACE_HAS_ALLOCA)
00513   iovp = (iovec *) alloca (total_tuples * sizeof (iovec));
00514 #else
00515   ACE_NEW_RETURN (iovp,
00516                   iovec[total_tuples],
00517                   -1);
00518 #endif /* !defined (ACE_HAS_ALLOCA) */
00519 
00520   va_start (argp, n);
00521 
00522   for (size_t i = 0; i < total_tuples; i++)
00523     {
00524       iovp[i].iov_base = va_arg (argp, char *);
00525       iovp[i].iov_len = va_arg (argp, ssize_t);
00526     }
00527 
00528   ssize_t result = ACE_OS::recvv (this->get_handle (),
00529                                   iovp,
00530                                   total_tuples);
00531 #if !defined (ACE_HAS_ALLOCA)
00532   delete [] iovp;
00533 #endif /* !defined (ACE_HAS_ALLOCA) */
00534   va_end (argp);
00535   return result;
00536 }
00537 #endif /* 0 */
00538 
00539 ACE_END_VERSIONED_NAMESPACE_DECL
00540 
00541 #endif /* ACE_HAS_POSITION_INDEPENDENT_POINTERS == 1 */

Generated on Thu Nov 9 09:41:55 2006 for ACE by doxygen 1.3.6