Routing_Slip.cpp

Go to the documentation of this file.
00001 // $Id: Routing_Slip.cpp 79324 2007-08-13 11:20:01Z elliott_c $
00002 
00003 #include "orbsvcs/Notify/Routing_Slip.h"
00004 
00005 #include "orbsvcs/Notify/Delivery_Request.h"
00006 #include "orbsvcs/Notify/Worker_Task.h"
00007 #include "orbsvcs/Notify/ProxyConsumer.h"
00008 #include "orbsvcs/Notify/ProxySupplier.h"
00009 #include "orbsvcs/Notify/Event_Persistence_Strategy.h"
00010 #include "orbsvcs/Notify/Routing_Slip_Persistence_Manager.h"
00011 #include "orbsvcs/Notify/Routing_Slip_Queue.h"
00012 #include "orbsvcs/Notify/Method_Request_Lookup.h"
00013 #include "orbsvcs/Notify/Method_Request_Dispatch.h"
00014 
00015 #include "tao/debug.h"
00016 #include "tao/corba.h"
00017 
00018 #include "ace/Dynamic_Service.h"
00019 #include "ace/Truncate.h"
00020 
00021 //#define DEBUG_LEVEL 9
00022 #ifndef DEBUG_LEVEL
00023 # define DEBUG_LEVEL TAO_debug_level
00024 #endif //DEBUG_LEVEL
00025 
00026 #define QUEUE_ALLOWED 1
00027 
00028 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00029 
00030 namespace TAO_Notify
00031 {
00032 ///////////////////////
00033 // Routing_Slip Statics
00034 
00035 Routing_Slip_Queue Routing_Slip::persistent_queue_(QUEUE_ALLOWED);
00036 
00037 TAO_SYNCH_MUTEX Routing_Slip::sequence_lock_;
00038 int Routing_Slip::routing_slip_sequence_= 0;
00039 size_t Routing_Slip::count_enter_transient_ = 0;
00040 size_t Routing_Slip::count_continue_transient_ = 0;
00041 size_t Routing_Slip::count_enter_reloaded_ = 0;
00042 size_t Routing_Slip::count_enter_new_ = 0;
00043 size_t Routing_Slip::count_continue_new_ = 0;
00044 size_t Routing_Slip::count_enter_complete_while_new_ = 0;
00045 size_t Routing_Slip::count_enter_saving_ = 0;
00046 size_t Routing_Slip::count_enter_saved_ = 0;
00047 size_t Routing_Slip::count_enter_updating_ = 0;
00048 size_t Routing_Slip::count_enter_changed_while_saving_ = 0;
00049 size_t Routing_Slip::count_continue_changed_while_saving_ = 0;
00050 size_t Routing_Slip::count_enter_changed_ = 0;
00051 size_t Routing_Slip::count_continue_changed_ = 0;
00052 size_t Routing_Slip::count_enter_complete_ = 0;
00053 size_t Routing_Slip::count_enter_deleting_ = 0;
00054 size_t Routing_Slip::count_enter_terminal_ = 0;
00055 
00056 Routing_Slip_Ptr
00057 Routing_Slip::create (const TAO_Notify_Event::Ptr& event)
00058 {
00059   Routing_Slip * prs;
00060   ACE_NEW_THROW_EX (prs, Routing_Slip (event), CORBA::NO_MEMORY ());
00061   Routing_Slip_Ptr result(prs);
00062   result->this_ptr_ = result; // let the pointers touch so they use the same ref count
00063 
00064   // note we don't care about ultra-precise stats, so no guard for these
00065   if (DEBUG_LEVEL > 8 && ((result->sequence_ % 100) == 0))
00066   {
00067     ACE_ERROR ((LM_ERROR,
00068       ACE_TEXT ("(%P|%t) Routing_Slip_Statistics\n")
00069       ACE_TEXT ("  enter_transient              \t%d\n")
00070       ACE_TEXT ("  continue_transient           \t%d\n")
00071       ACE_TEXT ("  enter_reloaded               \t%d\n")
00072       ACE_TEXT ("  enter_new                    \t%d\n")
00073       ACE_TEXT ("  continue_new                 \t%d\n")
00074       ACE_TEXT ("  enter_complete_while_new     \t%d\n")
00075       ACE_TEXT ("  enter_saving                 \t%d\n")
00076       ACE_TEXT ("  enter_saved                  \t%d\n")
00077       ACE_TEXT ("  enter_updating               \t%d\n")
00078       ACE_TEXT ("  enter_changed_while_saving   \t%d\n")
00079       ACE_TEXT ("  continue_changed_while_saving\t%d\n")
00080       ACE_TEXT ("  enter_changed                \t%d\n")
00081       ACE_TEXT ("  continue_changed             \t%d\n")
00082       ACE_TEXT ("  enter_complete               \t%d\n")
00083       ACE_TEXT ("  enter_deleting               \t%d\n")
00084       ACE_TEXT ("  enter_terminal               \t%d\n")
00085       , static_cast<int> (count_enter_transient_)
00086       , static_cast<int> (count_continue_transient_)
00087       , static_cast<int> (count_enter_reloaded_)
00088       , static_cast<int> (count_enter_new_)
00089       , static_cast<int> (count_continue_new_)
00090       , static_cast<int> (count_enter_complete_while_new_)
00091       , static_cast<int> (count_enter_saving_)
00092       , static_cast<int> (count_enter_saved_)
00093       , static_cast<int> (count_enter_updating_)
00094       , static_cast<int> (count_enter_changed_while_saving_)
00095       , static_cast<int> (count_continue_changed_while_saving_)
00096       , static_cast<int> (count_enter_changed_)
00097       , static_cast<int> (count_continue_changed_)
00098       , static_cast<int> (count_enter_complete_)
00099       , static_cast<int> (count_enter_deleting_)
00100       , static_cast<int> (count_enter_terminal_)
00101       ));
00102   }
00103   return result;
00104 }
00105 
00106 // static
00107 Routing_Slip_Ptr
00108 Routing_Slip::create (
00109   TAO_Notify_EventChannelFactory & ecf,
00110   Routing_Slip_Persistence_Manager * rspm)
00111 {
00112   Routing_Slip_Ptr result;
00113   ACE_Message_Block * event_mb = 0;
00114   ACE_Message_Block * rs_mb = 0;
00115   try
00116     {
00117       if (rspm->reload (event_mb, rs_mb))
00118       {
00119         TAO_InputCDR cdr_event (event_mb);
00120         TAO_Notify_Event::Ptr event (TAO_Notify_Event::unmarshal (cdr_event));
00121         if (event.isSet())
00122         {
00123           result = create (event);
00124           TAO_InputCDR cdr_rs (rs_mb);
00125           if ( result->unmarshal (ecf, cdr_rs))
00126           {
00127             result->set_rspm (rspm);
00128           }
00129           else
00130           {
00131             ACE_ERROR ((LM_ERROR,
00132               ACE_TEXT ("(%P|%t) Routing_Slip::create: Unmarshalling failed for routing slip.\n")
00133               ));
00134             result.reset ();
00135           }
00136         }
00137         else
00138         {
00139           ACE_ERROR ((LM_ERROR,
00140             ACE_TEXT ("(%P|%t) Routing_Slip::create: Unmarshalling failed for event.\n")
00141             ));
00142         }
00143       }
00144     }
00145   catch (const CORBA::Exception&)
00146     {
00147       ACE_ERROR ((LM_ERROR,
00148         ACE_TEXT ("(%P|%t) Routing_Slip::create: Exception reloading event.\n")
00149         ));
00150     }
00151   delete event_mb;
00152   delete rs_mb;
00153 
00154   return result;
00155 }
00156 
00157 void
00158 Routing_Slip::set_rspm (Routing_Slip_Persistence_Manager * rspm)
00159 {
00160   this->rspm_ = rspm;
00161   if (rspm_ != 0)
00162   {
00163     rspm->set_callback (this);
00164   }
00165 }
00166 
00167 Routing_Slip::Routing_Slip(
00168       const TAO_Notify_Event::Ptr& event)
00169   : is_safe_ (false)
00170   , until_safe_ (internals_)
00171   , this_ptr_ (0)
00172   , event_(event)
00173   , state_ (rssCREATING)
00174   , complete_requests_ (0)
00175   , rspm_ (0)
00176 {
00177   Routing_Slip_Guard guard (sequence_lock_);
00178   this->sequence_ = ++routing_slip_sequence_;
00179   if (DEBUG_LEVEL > 1) ACE_DEBUG ((LM_DEBUG,
00180       ACE_TEXT ("(%P|%t) Routing Slip #%d: constructor\n"),
00181       this->sequence_
00182       ));
00183 }
00184 
00185 Routing_Slip::~Routing_Slip ()
00186 {
00187   if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
00188       ACE_TEXT ("(%P|%t) Routing Slip #%d: destructor\n"),
00189       this->sequence_
00190       ));
00191 }
00192 
00193 bool
00194 Routing_Slip::create_persistence_manager()
00195 {
00196   if (this->rspm_ == 0)
00197   {
00198     Event_Persistence_Strategy * strategy =
00199       ACE_Dynamic_Service <TAO_Notify::Event_Persistence_Strategy>::instance ("Event_Persistence");
00200     if (strategy != 0)
00201     {
00202       Event_Persistence_Factory * factory = strategy->get_factory ();
00203       if (factory != 0)
00204       {
00205         set_rspm (factory->create_routing_slip_persistence_manager(this));
00206       }
00207     }
00208   }
00209   return this->rspm_ != 0;
00210 }
00211 
00212 const TAO_Notify_Event::Ptr &
00213 Routing_Slip::event () const
00214 {
00215   return this->event_;
00216 }
00217 
00218 void
00219 Routing_Slip::wait_persist ()
00220 {
00221   Routing_Slip_Guard guard (this->internals_);
00222   while (!this->is_safe_)
00223   {
00224     this->until_safe_.wait ();
00225   }
00226 }
00227 
00228 void
00229 Routing_Slip::route (TAO_Notify_ProxyConsumer* pc, bool reliable_channel)
00230 {
00231   ACE_ASSERT(pc != 0);
00232 
00233   TAO_Notify_ProxyConsumer::Ptr pcgrd(pc);
00234 
00235   Routing_Slip_Guard guard (this->internals_);
00236 
00237   size_t request_id = delivery_requests_.size ();
00238 
00239   if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
00240       ACE_TEXT ("(%P|%t) Routing Slip #%d: add Delivery_Request #%d: lookup, completed %d of %d\n"),
00241       this->sequence_,
00242       static_cast<int> (request_id),
00243       static_cast<int> (this->complete_requests_),
00244       static_cast<int> (this->delivery_requests_.size ())
00245       ));
00246 
00247   Delivery_Request_Ptr request (new Delivery_Request (this->this_ptr_, request_id));
00248   this->delivery_requests_.push_back (request);
00249   TAO_Notify_Method_Request_Lookup_Queueable method (request, pc);
00250 
00251   if (this->state_ == rssCREATING)
00252   {
00253     if (! reliable_channel)
00254     {
00255       enter_state_transient (guard);
00256     }
00257     else if (ACE_Dynamic_Service <TAO_Notify::Event_Persistence_Strategy>::instance ("Event_Persistence") == 0)
00258     {
00259       enter_state_transient (guard);
00260     }
00261     else if (! this->event_->reliable().is_valid())
00262     {
00263       enter_state_new (guard);
00264     }
00265     else if (this->event_->reliable().value() == true)
00266     {
00267       enter_state_new (guard);
00268     }
00269     else
00270     {
00271       enter_state_transient (guard);
00272     }
00273   }
00274   else
00275   {
00276     // We only need to release the guard if the state is rssCREATING.
00277     // By calling enter_state_*, we are guaranteed that the guard has
00278     // been released.
00279     guard.release ();
00280   }
00281   pc->execute_task (method);
00282 }
00283 #if 0 // forward
00284 void
00285 Routing_Slip::forward (TAO_Notify_ProxySupplier* ps, bool filter)
00286 {
00287   // must be the first action
00288   ACE_ASSERT (this->state_ == rssCREATING);
00289 
00290   TAO_Notify_ProxySupplier::Ptr psgrd(ps);
00291   Routing_Slip_Guard guard (this->internals_);
00292 
00293   enter_state_transient (guard);
00294   guard.acquire();
00295   size_t request_id = delivery_requests_.size ();
00296 
00297   if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
00298       ACE_TEXT ("(%P|%t) Routing Slip #%d: add Delivery_Request #%d: Forward %s; completed %d of %d\n"),
00299       this->sequence_,
00300       static_cast<int> (request_id),
00301       filter ? ACE_TEXT ("Filter") : ACE_TEXT ("No Filter"),
00302       static_cast<int> (this->complete_requests_),
00303       static_cast<int> (this->delivery_requests_.size ())
00304       ));
00305 
00306   Delivery_Request_Ptr request (new Delivery_Request (this->this_ptr_, request_id));
00307   if (! ps->has_shutdown() )
00308     {
00309       this->delivery_requests_.push_back (request);
00310 //      Delivery_Method_Dispatch method (request, ps, filter);
00311       TAO_Notify_Method_Request_Dispatch_No_Copy method (request, ps, filter);
00312       guard.release ();
00313       if (DEBUG_LEVEL > 8)
00314         ACE_DEBUG ((LM_DEBUG,
00315                     "(%P|%t) Routing Slip #%d: dispatching Delivery_Request %d to "
00316                     "proxy supplier %d\n",
00317                     this->sequence_,
00318                     static_cast<int> (request_id),
00319                     ps->id()));
00320       ps->worker_task()->execute (method);
00321     }
00322   else
00323     {
00324       if (DEBUG_LEVEL > 5)
00325         ACE_DEBUG ((LM_DEBUG,
00326                     "(%P|%t) Routing Slip #%d: not dispatching Delivery_Request %d to "
00327                     "proxy supplier %d; already shut down\n",
00328                     this->sequence_,
00329                     static_cast<int> (request_id),
00330                     ps->id()));
00331     }
00332 }
00333 #endif // forward
00334 
00335 void
00336 Routing_Slip::dispatch (
00337   TAO_Notify_ProxySupplier* ps,
00338   bool filter)
00339 {
00340   // cannot be the first action
00341   ACE_ASSERT (this->state_ != rssCREATING);
00342 
00343   TAO_Notify_ProxySupplier::Ptr psgrd(ps);
00344   Routing_Slip_Guard guard (this->internals_);
00345 
00346   size_t request_id = delivery_requests_.size ();
00347 
00348   if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
00349       ACE_TEXT ("(%P|%t) Routing Slip #%d: add Delivery_Request #%d: Dispatch %s; completed %d of %d\n"),
00350       this->sequence_,
00351       static_cast<int> (request_id),
00352       filter ? ACE_TEXT ("Filter") : ACE_TEXT ("No Filter"),
00353       static_cast<int> (this->complete_requests_),
00354       static_cast<int> (this->delivery_requests_.size ())
00355       ));
00356 
00357   Delivery_Request_Ptr request (new Delivery_Request (this->this_ptr_, request_id));
00358   if (! ps->has_shutdown() )
00359     {
00360       this->delivery_requests_.push_back (request);
00361       TAO_Notify_Method_Request_Dispatch_No_Copy method (request, ps, filter);
00362       guard.release ();
00363       if (DEBUG_LEVEL > 8)
00364         ACE_DEBUG ((LM_DEBUG,
00365                     "(%P|%t) Routing Slip #%d: dispatching Delivery_Request %d to "
00366                     "proxy supplier %d\n",
00367                     this->sequence_,
00368                     static_cast<int> (request_id),
00369                     ps->id()));
00370       ps->execute_task (method);
00371     }
00372   else
00373     {
00374       if (DEBUG_LEVEL > 5)
00375         ACE_DEBUG ((LM_DEBUG,
00376                     "(%P|%t) Routing Slip #%d: not dispatching Delivery_Request %d to "
00377                     "proxy supplier %d; already shut down\n",
00378                     this->sequence_,
00379                     static_cast<int> (request_id),
00380                     ps->id()));
00381     }
00382 }
00383 
00384 //////////
00385 // signals
00386 
00387 void
00388 Routing_Slip::delivery_request_complete (size_t request_id)
00389 {
00390   Routing_Slip_Guard guard (this->internals_);
00391   ACE_ASSERT (request_id < this->delivery_requests_.size ());
00392   // reset the pointer to allow the delivery_request to be deleted.
00393   this->delivery_requests_[request_id].reset ();
00394   this->complete_requests_ += 1;
00395 
00396   if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
00397       ACE_TEXT ("(%P|%t) Routing Slip #%d: delivery_request_complete #%d: completed %d of %d\n"),
00398       this->sequence_,
00399       static_cast<int> (request_id),
00400       static_cast<int> (this->complete_requests_),
00401       static_cast<int> (this->delivery_requests_.size ())
00402       ));
00403   State state = this->state_;
00404   switch (state)
00405   {
00406     case rssTRANSIENT:
00407     {
00408       continue_state_transient (guard);
00409       break;
00410     }
00411     case rssNEW:
00412     {
00413       continue_state_new (guard);
00414       break;
00415     }
00416     case rssSAVING:
00417     {
00418       enter_state_changed_while_saving (guard);
00419       break;
00420     }
00421     case rssUPDATING:
00422     {
00423       enter_state_changed_while_saving (guard);
00424       break;
00425     }
00426     case rssSAVED:
00427     {
00428       enter_state_changed (guard);
00429       break;
00430     }
00431     case rssCHANGED_WHILE_SAVING:
00432     {
00433       continue_state_changed_while_saving (guard);
00434       break;
00435     }
00436     case rssCHANGED:
00437     {
00438       continue_state_changed (guard);
00439       break;
00440     }
00441     default:
00442     {
00443       ACE_ERROR ((LM_ERROR,
00444         ACE_TEXT ("(%P|%t) Notification Service Routing Slip: Unexpected delivery_request_complete in state %d\n"),
00445         static_cast<int> (this->state_)
00446         ));
00447       break;
00448     }
00449   }
00450 }
00451 
00452 void
00453 Routing_Slip::at_front_of_persist_queue ()
00454 {
00455   Routing_Slip_Guard guard (this->internals_);
00456   State state = this->state_;
00457   switch (state)
00458   {
00459     case rssNEW:
00460     {
00461       if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
00462         ACE_TEXT ("(%P|%t) Routing Slip #%d: NEW Reached front of queue\n"),
00463         this->sequence_
00464         ));
00465       enter_state_saving (guard);
00466       break;
00467     }
00468     case rssCOMPLETE_WHILE_NEW:
00469     {
00470       if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
00471         ACE_TEXT ("(%P|%t) Routing Slip #%d: COMPLETE_WHILE_NEW Reached front of queue\n"),
00472         this->sequence_
00473         ));
00474       guard.release ();
00475       this->persistent_queue_.complete ();
00476       enter_state_terminal (guard);
00477       break;
00478     }
00479     case rssCHANGED:
00480     {
00481       if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
00482         ACE_TEXT ("(%P|%t) Routing Slip #%d: CHANGED Reached front of queue\n"),
00483         this->sequence_
00484         ));
00485       enter_state_updating (guard);
00486       break;
00487     }
00488     case rssCOMPLETE:
00489     {
00490       if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
00491         ACE_TEXT ("(%P|%t) Routing Slip #%d: COMPLETE Reached front of queue\n"),
00492         this->sequence_
00493         ));
00494       enter_state_deleting (guard);
00495       break;
00496     }
00497     default:
00498     {
00499       ACE_ERROR ((LM_ERROR,
00500         ACE_TEXT ("(%P|%t) Routing Slip %d: Unexpected at_front_of_persist_queue in state %d\n"),
00501         this->sequence_,
00502         static_cast<int> (this->state_)
00503         ));
00504       break;
00505     }
00506   }
00507 }
00508 
00509 void
00510 Routing_Slip::persist_complete ()
00511 {
00512   // keep this object around til this method returns.
00513   Routing_Slip_Ptr me(this->this_ptr_);
00514   Routing_Slip_Guard guard (this->internals_);
00515   ACE_ASSERT (guard.locked ());
00516 
00517   // allow the ConsumerProxy to return from the CORBA push call.
00518   if (! is_safe_)
00519   {
00520     is_safe_ = true;
00521     this->until_safe_.signal ();
00522   }
00523 
00524   State state = this->state_;
00525   switch (state)
00526   {
00527     case rssSAVING:
00528     {
00529       if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
00530         ACE_TEXT ("(%P|%t) Routing Slip #%d: SAVING persist complete\n"),
00531         this->sequence_
00532         ));
00533       enter_state_saved(guard);
00534       break;
00535     }
00536     case rssCHANGED_WHILE_SAVING:
00537     {
00538       enter_state_changed (guard);
00539       break;
00540     }
00541     case rssUPDATING:
00542     {
00543       if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
00544         ACE_TEXT ("(%P|%t) Routing Slip #%d: UPDATING persist complete\n"),
00545         this->sequence_
00546         ));
00547       enter_state_saved (guard);
00548       break;
00549     }
00550     case rssDELETING:
00551     {
00552       if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
00553         ACE_TEXT ("(%P|%t) Routing Slip #%d: DELETING persist complete\n"),
00554         this->sequence_
00555         ));
00556       enter_state_terminal (guard);
00557       break;
00558     }
00559     default:
00560     {
00561       ACE_ERROR ((LM_ERROR,
00562         ACE_TEXT ("(%P|%t) Notification Service Routing Slip: Unexpected transition in state %d\n"),
00563         static_cast<int> (this->state_)
00564         ));
00565       guard.release ();
00566       break;
00567     }
00568   }
00569   this->persistent_queue_.complete ();
00570 }
00571 
00572 //////////////////
00573 // support methods
00574 
00575 bool
00576 Routing_Slip::all_deliveries_complete () const
00577 {
00578   return this->complete_requests_ == this->delivery_requests_.size ();
00579 }
00580 
00581 void
00582 Routing_Slip::add_to_persist_queue(Routing_Slip_Guard & guard)
00583 {
00584   guard.release ();
00585   this->persistent_queue_.add (this->this_ptr_);
00586 }
00587 
00588 ////////////////////
00589 // State transitions
00590 
00591 void
00592 Routing_Slip::enter_state_new (Routing_Slip_Guard & guard)
00593 {
00594   ++count_enter_new_;
00595   if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
00596       ACE_TEXT ("(%P|%t) Routing Slip #%d: enter state NEW\n"),
00597       this->sequence_
00598       ));
00599   this->state_ = rssNEW;
00600   add_to_persist_queue(guard);
00601 }
00602 
00603 void
00604 Routing_Slip::continue_state_new (Routing_Slip_Guard & guard)
00605 {
00606   ++count_continue_new_;
00607   if (all_deliveries_complete ())
00608   {
00609     this->enter_state_complete_while_new (guard);
00610   }
00611   guard.release ();
00612 }
00613 void
00614 Routing_Slip::enter_state_complete_while_new (Routing_Slip_Guard & guard)
00615 {
00616   ++count_enter_complete_while_new_;
00617   ACE_UNUSED_ARG (guard);
00618   if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
00619       ACE_TEXT ("(%P|%t) Routing Slip #%d: enter state COMPLETE_WHILE_NEW\n"),
00620       this->sequence_
00621       ));
00622   // allow the ConsumerProxy to return from the CORBA push call.
00623   if (! is_safe_)
00624   {
00625     is_safe_ = true;
00626     this->until_safe_.signal ();
00627   }
00628   this->state_ = rssCOMPLETE_WHILE_NEW;
00629 }
00630 
00631 void
00632 Routing_Slip::enter_state_reloaded (Routing_Slip_Guard & guard)
00633 {
00634   ++count_enter_reloaded_;
00635   if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
00636       ACE_TEXT ("(%P|%t) Routing Slip #&d: enter state RELOADED\n"),
00637       this->sequence_
00638       ));
00639   this->state_ = rssRELOADED;
00640   guard.release();
00641 }
00642 
00643 void
00644 Routing_Slip::enter_state_transient (Routing_Slip_Guard & guard)
00645 {
00646   ++count_enter_transient_;
00647   if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
00648       ACE_TEXT ("(%P|%t) Routing Slip #%d: enter state TRANSIENT\n"),
00649       this->sequence_
00650       ));
00651   this->state_ = rssTRANSIENT;
00652   if (! is_safe_)
00653   {
00654     is_safe_ = true;
00655     this->until_safe_.signal ();
00656   }
00657   if (all_deliveries_complete ())
00658   {
00659     enter_state_terminal (guard);
00660   }
00661   else
00662   {
00663     guard.release ();
00664   }
00665 }
00666 
00667 void
00668 Routing_Slip::continue_state_transient (Routing_Slip_Guard & guard)
00669 {
00670   ++count_continue_transient_;
00671   if (all_deliveries_complete ())
00672   {
00673     enter_state_terminal (guard);
00674   }
00675   else
00676   {
00677     guard.release ();
00678   }
00679 }
00680 void
00681 Routing_Slip::enter_state_saving (Routing_Slip_Guard & guard)
00682 {
00683   ++count_enter_saving_;
00684   if (!create_persistence_manager ())
00685   {
00686     // Note This should actually be a throw (out of memory)
00687     // but we cheat and make this a transient event.
00688     guard.release ();
00689     this->persistent_queue_.complete ();
00690     enter_state_transient (guard);
00691   }
00692   else
00693   {
00694     if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
00695         ACE_TEXT ("(%P|%t) Routing Slip #%d: enter state SAVING\n"),
00696         this->sequence_
00697         ));
00698     this->state_ = rssSAVING;
00699 
00700     TAO_OutputCDR event_cdr;
00701     this->event_->marshal (event_cdr);
00702 
00703     const ACE_Message_Block *event_mb = event_cdr.begin ();
00704     TAO_OutputCDR rs_cdr;
00705     marshal (rs_cdr);
00706     const ACE_Message_Block *rs_mb = rs_cdr.begin ();
00707 
00708     guard.release ();
00709     this->rspm_->store (*event_mb, *rs_mb);
00710   }
00711 }
00712 
00713 void
00714 Routing_Slip::enter_state_saved (Routing_Slip_Guard & guard)
00715 {
00716   ++count_enter_saved_;
00717   if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
00718       ACE_TEXT ("(%P|%t) Routing Slip #%d: enter state SAVED\n"),
00719       this->sequence_
00720       ));
00721   this->state_ = rssSAVED;
00722   guard.release ();
00723 }
00724 
00725 void
00726 Routing_Slip::enter_state_updating (Routing_Slip_Guard & guard)
00727 {
00728   ++count_enter_updating_;
00729   if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
00730       ACE_TEXT ("(%P|%t) Routing Slip #%d: enter state UPDATING\n"),
00731       this->sequence_
00732       ));
00733   this->state_ = rssUPDATING;
00734 
00735   TAO_OutputCDR rs_cdr;
00736   marshal (rs_cdr);
00737   const ACE_Message_Block *rs_mb = rs_cdr.begin ();
00738   guard.release ();
00739 
00740   ACE_ASSERT (this->rspm_ != 0);
00741   this->rspm_->update (*rs_mb);
00742 }
00743 
00744 
00745 void
00746 Routing_Slip::enter_state_changed_while_saving (Routing_Slip_Guard & guard)
00747 {
00748   ++count_enter_changed_while_saving_;
00749   if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
00750       ACE_TEXT ("(%P|%t) Routing Slip #%d: enter state CHANGED_WHILE_SAVING\n"),
00751       this->sequence_
00752       ));
00753   this->state_ = rssCHANGED_WHILE_SAVING;
00754   guard.release ();
00755 }
00756 
00757 void
00758 Routing_Slip::continue_state_changed_while_saving (Routing_Slip_Guard & guard)
00759 {
00760   // no action necessary
00761   guard.release ();
00762 }
00763 
00764 void
00765 Routing_Slip::enter_state_changed (Routing_Slip_Guard & guard)
00766 {
00767   ++count_enter_changed_;
00768   if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
00769       ACE_TEXT ("(%P|%t) Routing Slip #%d: enter state CHANGED\n"),
00770       this->sequence_
00771       ));
00772   // complete state change BEFORE initiating request to avoid
00773   // race condition if request finishes before state is stable.
00774   this->state_ = rssCHANGED;
00775   if (all_deliveries_complete ())
00776   {
00777     enter_state_complete (guard);
00778   }
00779   add_to_persist_queue (guard);
00780 }
00781 
00782 void
00783 Routing_Slip::continue_state_changed (Routing_Slip_Guard & guard)
00784 {
00785   ++count_continue_changed_;
00786   if (all_deliveries_complete ())
00787   {
00788     enter_state_complete (guard);
00789   }
00790   else
00791   {
00792     guard.release ();
00793   }
00794 }
00795 
00796 void
00797 Routing_Slip::enter_state_complete (Routing_Slip_Guard & guard)
00798 {
00799   ++count_enter_complete_;
00800   if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
00801       ACE_TEXT ("(%P|%t) Routing Slip #%d: enter state COMPLETE\n"),
00802       this->sequence_
00803       ));
00804   this->state_ = rssCOMPLETE;
00805   guard.release ();
00806 }
00807 
00808 void
00809 Routing_Slip::enter_state_deleting (Routing_Slip_Guard & guard)
00810 {
00811   ++count_enter_deleting_;
00812   if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
00813       ACE_TEXT ("(%P|%t) Routing Slip #%d: enter state DELETING\n"),
00814       this->sequence_
00815       ));
00816   this->state_ = rssDELETING;
00817   guard.release ();
00818   this->rspm_->remove ();
00819 }
00820 
00821 void
00822 Routing_Slip::enter_state_terminal (Routing_Slip_Guard & guard)
00823 {
00824   ++count_enter_terminal_;
00825   ACE_ASSERT( this->is_safe_);
00826   if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
00827       ACE_TEXT ("(%P|%t) Routing Slip #%d: enter state TERMINAL\n"),
00828       this->sequence_
00829       ));
00830   this->state_ = rssTERMINAL;
00831   this->this_ptr_.reset ();
00832   guard.release ();
00833 }
00834 
00835 void
00836 Routing_Slip::marshal (TAO_OutputCDR & cdr)
00837 {
00838   size_t request_count = this->delivery_requests_.size();
00839   cdr.write_ulong (
00840     ACE_Utils::truncate_cast<CORBA::ULong> (request_count - this->complete_requests_));
00841   for (size_t nreq = 0; nreq < request_count; ++nreq)
00842   {
00843     Delivery_Request * request = this->delivery_requests_[nreq].get ();
00844     if (request != 0)
00845     {
00846       request->marshal (cdr);
00847     }
00848   }
00849 }
00850 
00851 bool
00852 Routing_Slip::unmarshal (TAO_Notify_EventChannelFactory &ecf, TAO_InputCDR & cdr)
00853 {
00854   CORBA::ULong count = 0;
00855   cdr.read_ulong (count);
00856   for (size_t nreq = 0; nreq < count; ++nreq)
00857   {
00858     ACE_CDR::Octet code = 0;
00859     while (cdr.read_octet(code))
00860     {
00861       try
00862       {
00863         if (code == TAO_Notify_Method_Request_Dispatch::persistence_code)
00864         {
00865           Delivery_Request * prequest;
00866           ACE_NEW_THROW_EX (
00867             prequest,
00868             Delivery_Request(this_ptr_, this->delivery_requests_.size ()),
00869             CORBA::NO_MEMORY ());
00870           Delivery_Request_Ptr request(prequest);
00871           TAO_Notify_Method_Request_Dispatch_Queueable * method =
00872             TAO_Notify_Method_Request_Dispatch::unmarshal (
00873               request,
00874               ecf,
00875               cdr);
00876           if (method != 0)
00877           {
00878             this->delivery_requests_.push_back (request);
00879             this->delivery_methods_.push_back (method);
00880           }
00881         }
00882         else if (code == TAO_Notify_Method_Request_Lookup::persistence_code)
00883         {
00884           Delivery_Request_Ptr request(new Delivery_Request(this_ptr_, this->delivery_requests_.size ()));
00885           TAO_Notify_Method_Request_Lookup_Queueable * method =
00886               TAO_Notify_Method_Request_Lookup::unmarshal (
00887                 request,
00888                 ecf,
00889                 cdr);
00890           if (method != 0)
00891           {
00892             this->delivery_requests_.push_back (request);
00893             this->delivery_methods_.push_back (method);
00894           }
00895         }
00896       }
00897       catch (const CORBA::Exception&)
00898       {
00899         // @@todo should we log this?
00900         // just ignore failures
00901       }
00902     }
00903   }
00904   return this->delivery_requests_.size () > 0;
00905 }
00906 
00907 void
00908 Routing_Slip::reconnect (void)
00909 {
00910   Routing_Slip_Guard guard (this->internals_);
00911   enter_state_saved (guard);
00912 
00913   //@@todo is there a worker_task available to do this?
00914   size_t count = this->delivery_methods_.size ();
00915   for (size_t nmethod = 0; nmethod < count; ++nmethod)
00916   {
00917     this->delivery_methods_[nmethod]->execute ();
00918   }
00919   this->delivery_methods_.clear ();
00920 }
00921 
00922 int
00923 Routing_Slip::sequence() const
00924 {
00925   return this->sequence_;
00926 }
00927 
00928 bool
00929 Routing_Slip::should_retry () const
00930 {
00931   // simple minded test: if it's transient, don't retry it
00932   // @@todo Eventually this should check timeout, discard policy, etc.
00933   return this->state_ != rssTRANSIENT;
00934 }
00935 
00936 } // namespace
00937 
00938 TAO_END_VERSIONED_NAMESPACE_DECL

Generated on Tue Feb 2 17:45:29 2010 for TAO_CosNotification by  doxygen 1.4.7