Routing_Slip.cpp

Go to the documentation of this file.
00001 // Routing_Slip.cpp,v 1.12 2006/03/14 06:14:34 jtc Exp
00002 
00003 #include "orbsvcs/Notify/Routing_Slip.h"
00004 
00005 #include "orbsvcs/Notify/Delivery_Request.h"
00006 #include "orbsvcs/Notify/Worker_Task.h"
00007 #include "orbsvcs/Notify/ProxyConsumer.h"
00008 #include "orbsvcs/Notify/ProxySupplier.h"
00009 #include "orbsvcs/Notify/Event_Persistence_Strategy.h"
00010 #include "orbsvcs/Notify/Routing_Slip_Persistence_Manager.h"
00011 #include "orbsvcs/Notify/Routing_Slip_Queue.h"
00012 #include "orbsvcs/Notify/Method_Request_Lookup.h"
00013 #include "orbsvcs/Notify/Method_Request_Dispatch.h"
00014 
00015 #include "tao/debug.h"
00016 #include "tao/corba.h"
00017 
00018 #include "ace/Dynamic_Service.h"
00019 
00020 //#define DEBUG_LEVEL 9
00021 #ifndef DEBUG_LEVEL
00022 # define DEBUG_LEVEL TAO_debug_level
00023 #endif //DEBUG_LEVEL
00024 
00025 #define QUEUE_ALLOWED 1
00026 
00027 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00028 
00029 namespace TAO_Notify
00030 {
00031 ///////////////////////
00032 // Routing_Slip Statics
00033 
00034 Routing_Slip_Queue Routing_Slip::persistent_queue_(QUEUE_ALLOWED);
00035 
00036 TAO_SYNCH_MUTEX Routing_Slip::sequence_lock_;
00037 int Routing_Slip::routing_slip_sequence_= 0;
00038 size_t Routing_Slip::count_enter_transient_ = 0;
00039 size_t Routing_Slip::count_continue_transient_ = 0;
00040 size_t Routing_Slip::count_enter_reloaded_ = 0;
00041 size_t Routing_Slip::count_enter_new_ = 0;
00042 size_t Routing_Slip::count_continue_new_ = 0;
00043 size_t Routing_Slip::count_enter_complete_while_new_ = 0;
00044 size_t Routing_Slip::count_enter_saving_ = 0;
00045 size_t Routing_Slip::count_enter_saved_ = 0;
00046 size_t Routing_Slip::count_enter_updating_ = 0;
00047 size_t Routing_Slip::count_enter_changed_while_saving_ = 0;
00048 size_t Routing_Slip::count_continue_changed_while_saving_ = 0;
00049 size_t Routing_Slip::count_enter_changed_ = 0;
00050 size_t Routing_Slip::count_continue_changed_ = 0;
00051 size_t Routing_Slip::count_enter_complete_ = 0;
00052 size_t Routing_Slip::count_enter_deleting_ = 0;
00053 size_t Routing_Slip::count_enter_terminal_ = 0;
00054 
00055 Routing_Slip_Ptr
00056 Routing_Slip::create (const TAO_Notify_Event::Ptr& event ACE_ENV_ARG_DECL)
00057 {
00058   Routing_Slip * prs;
00059   ACE_NEW_THROW_EX (prs, Routing_Slip (event), CORBA::NO_MEMORY ());
00060   ACE_CHECK_RETURN (Routing_Slip_Ptr());
00061   Routing_Slip_Ptr result(prs);
00062   result->this_ptr_ = result; // let the pointers touch so they use the same ref count
00063 
00064   // note we don't care about ultra-precise stats, so no guard for these
00065   if (DEBUG_LEVEL > 8 && ((result->sequence_ % 100) == 0))
00066   {
00067     ACE_ERROR ((LM_ERROR,
00068       ACE_TEXT ("(%P|%t) Routing_Slip_Statistics\n")
00069       ACE_TEXT ("  enter_transient              \t%d\n")
00070       ACE_TEXT ("  continue_transient           \t%d\n")
00071       ACE_TEXT ("  enter_reloaded               \t%d\n")
00072       ACE_TEXT ("  enter_new                    \t%d\n")
00073       ACE_TEXT ("  continue_new                 \t%d\n")
00074       ACE_TEXT ("  enter_complete_while_new     \t%d\n")
00075       ACE_TEXT ("  enter_saving                 \t%d\n")
00076       ACE_TEXT ("  enter_saved                  \t%d\n")
00077       ACE_TEXT ("  enter_updating               \t%d\n")
00078       ACE_TEXT ("  enter_changed_while_saving   \t%d\n")
00079       ACE_TEXT ("  continue_changed_while_saving\t%d\n")
00080       ACE_TEXT ("  enter_changed                \t%d\n")
00081       ACE_TEXT ("  continue_changed             \t%d\n")
00082       ACE_TEXT ("  enter_complete               \t%d\n")
00083       ACE_TEXT ("  enter_deleting               \t%d\n")
00084       ACE_TEXT ("  enter_terminal               \t%d\n")
00085       , static_cast<int> (count_enter_transient_)
00086       , static_cast<int> (count_continue_transient_)
00087       , static_cast<int> (count_enter_reloaded_)
00088       , static_cast<int> (count_enter_new_)
00089       , static_cast<int> (count_continue_new_)
00090       , static_cast<int> (count_enter_complete_while_new_)
00091       , static_cast<int> (count_enter_saving_)
00092       , static_cast<int> (count_enter_saved_)
00093       , static_cast<int> (count_enter_updating_)
00094       , static_cast<int> (count_enter_changed_while_saving_)
00095       , static_cast<int> (count_continue_changed_while_saving_)
00096       , static_cast<int> (count_enter_changed_)
00097       , static_cast<int> (count_continue_changed_)
00098       , static_cast<int> (count_enter_complete_)
00099       , static_cast<int> (count_enter_deleting_)
00100       , static_cast<int> (count_enter_terminal_)
00101       ));
00102   }
00103   return result;
00104 }
00105 
00106 // static
00107 Routing_Slip_Ptr
00108 Routing_Slip::create (
00109   TAO_Notify_EventChannelFactory & ecf,
00110   Routing_Slip_Persistence_Manager * rspm)
00111 {
00112   Routing_Slip_Ptr result;
00113   ACE_Message_Block * event_mb = 0;
00114   ACE_Message_Block * rs_mb = 0;
00115   ACE_DECLARE_NEW_ENV;
00116   ACE_TRY
00117     {
00118       if (rspm->reload (event_mb, rs_mb))
00119       {
00120         TAO_InputCDR cdr_event (event_mb);
00121         TAO_Notify_Event::Ptr event (TAO_Notify_Event::unmarshal (cdr_event));
00122         if (event.isSet())
00123         {
00124           result = create (event ACE_ENV_ARG_PARAMETER);
00125           ACE_TRY_CHECK;
00126           TAO_InputCDR cdr_rs (rs_mb);
00127           if ( result->unmarshal (ecf, cdr_rs))
00128           {
00129             result->set_rspm (rspm);
00130           }
00131           else
00132           {
00133             ACE_ERROR ((LM_ERROR,
00134               ACE_TEXT ("(%P|%t) Routing_Slip::create: Unmarshalling failed for routing slip.\n")
00135               ));
00136             result.reset ();
00137           }
00138         }
00139         else
00140         {
00141           ACE_ERROR ((LM_ERROR,
00142             ACE_TEXT ("(%P|%t) Routing_Slip::create: Unmarshalling failed for event.\n")
00143             ));
00144         }
00145       }
00146     }
00147   ACE_CATCHANY
00148     {
00149       ACE_ERROR ((LM_ERROR,
00150         ACE_TEXT ("(%P|%t) Routing_Slip::create: Exception reloading event.\n")
00151         ));
00152     }
00153   ACE_ENDTRY;
00154   delete event_mb;
00155   delete rs_mb;
00156 
00157   return result;
00158 }
00159 
00160 void
00161 Routing_Slip::set_rspm (Routing_Slip_Persistence_Manager * rspm)
00162 {
00163   this->rspm_ = rspm;
00164   if (rspm_ != 0)
00165   {
00166     rspm->set_callback (this);
00167   }
00168 }
00169 
00170 Routing_Slip::Routing_Slip(
00171       const TAO_Notify_Event::Ptr& event)
00172   : is_safe_ (false)
00173   , until_safe_ (internals_)
00174   , this_ptr_ (0)
00175   , event_(event)
00176   , state_ (rssCREATING)
00177   , complete_requests_ (0)
00178   , rspm_ (0)
00179 {
00180   Routing_Slip_Guard guard (sequence_lock_);
00181   this->sequence_ = ++routing_slip_sequence_;
00182   if (DEBUG_LEVEL > 1) ACE_DEBUG ((LM_DEBUG,
00183       ACE_TEXT ("(%P|%t) Routing Slip #%d: constructor\n"),
00184       this->sequence_
00185       ));
00186 }
00187 
00188 Routing_Slip::~Routing_Slip ()
00189 {
00190   if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
00191       ACE_TEXT ("(%P|%t) Routing Slip #%d: destructor\n"),
00192       this->sequence_
00193       ));
00194 }
00195 
00196 bool
00197 Routing_Slip::create_persistence_manager()
00198 {
00199   if (this->rspm_ == 0)
00200   {
00201     Event_Persistence_Strategy * strategy =
00202       ACE_Dynamic_Service <TAO_Notify::Event_Persistence_Strategy>::instance ("Event_Persistence");
00203     if (strategy != 0)
00204     {
00205       Event_Persistence_Factory * factory = strategy->get_factory ();
00206       if (factory != 0)
00207       {
00208         set_rspm (factory->create_routing_slip_persistence_manager(this));
00209       }
00210     }
00211   }
00212   return this->rspm_ != 0;
00213 }
00214 
00215 const TAO_Notify_Event::Ptr &
00216 Routing_Slip::event () const
00217 {
00218   return this->event_;
00219 }
00220 
00221 void
00222 Routing_Slip::wait_persist ()
00223 {
00224   Routing_Slip_Guard guard (this->internals_);
00225   while (!this->is_safe_)
00226   {
00227     this->until_safe_.wait ();
00228   }
00229 }
00230 
00231 void
00232 Routing_Slip::route (TAO_Notify_ProxyConsumer* pc, bool reliable_channel ACE_ENV_ARG_DECL)
00233 {
00234   ACE_ASSERT(pc != 0);
00235 
00236   TAO_Notify_ProxyConsumer::Ptr pcgrd(pc);
00237 
00238   Routing_Slip_Guard guard (this->internals_);
00239 
00240   size_t request_id = delivery_requests_.size ();
00241 
00242   if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
00243       ACE_TEXT ("(%P|%t) Routing Slip #%d: add Delivery_Request #%d: lookup, completed %d of %d\n"),
00244       this->sequence_,
00245       static_cast<int> (request_id),
00246       static_cast<int> (this->complete_requests_),
00247       static_cast<int> (this->delivery_requests_.size ())
00248       ));
00249 
00250   Delivery_Request_Ptr request (new Delivery_Request (this->this_ptr_, request_id));
00251   this->delivery_requests_.push_back (request);
00252   TAO_Notify_Method_Request_Lookup_Queueable method (request, pc);
00253 
00254   if (this->state_ == rssCREATING)
00255   {
00256     if (! reliable_channel)
00257     {
00258       enter_state_transient (guard);
00259     }
00260     else if (ACE_Dynamic_Service <TAO_Notify::Event_Persistence_Strategy>::instance ("Event_Persistence") == 0)
00261     {
00262       enter_state_transient (guard);
00263     }
00264     else if (! this->event_->reliable().is_valid())
00265     {
00266       enter_state_new (guard);
00267     }
00268     else if (this->event_->reliable().value() == CosNotification::Persistent)
00269     {
00270       enter_state_new (guard);
00271     }
00272     else
00273     {
00274       enter_state_transient (guard);
00275     }
00276   }
00277   guard.release ();
00278   pc->execute_task (method ACE_ENV_ARG_PARAMETER);
00279 }
00280 #if 0 // forward
00281 void
00282 Routing_Slip::forward (TAO_Notify_ProxySupplier* ps, bool filter)
00283 {
00284   // must be the first action
00285   ACE_ASSERT (this->state_ == rssCREATING);
00286 
00287   TAO_Notify_ProxySupplier::Ptr psgrd(ps);
00288   Routing_Slip_Guard guard (this->internals_);
00289 
00290   enter_state_transient (guard);
00291   size_t request_id = delivery_requests_.size ();
00292 
00293   if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
00294       ACE_TEXT ("(%P|%t) Routing Slip #%d: add Delivery_Request #%d: Forward %s; completed %d of %d\n"),
00295       this->sequence_,
00296       static_cast<int> (request_id),
00297       filter ? ACE_TEXT ("Filter") : ACE_TEXT ("No Filter"),
00298       static_cast<int> (this->complete_requests_),
00299       static_cast<int> (this->delivery_requests_.size ())
00300       ));
00301 
00302   Delivery_Request_Ptr request (new Delivery_Request (this->this_ptr_, request_id));
00303   if (! ps->has_shutdown() )
00304     {
00305       this->delivery_requests_.push_back (request);
00306 //      Delivery_Method_Dispatch method (request, ps, filter);
00307       TAO_Notify_Method_Request_Dispatch_No_Copy method (request, ps, filter);
00308       guard.release ();
00309       if (DEBUG_LEVEL > 8)
00310         ACE_DEBUG ((LM_DEBUG,
00311                     "(%P|%t) Routing Slip #%d: dispatching Delivery_Request %d to "
00312                     "proxy supplier %d\n",
00313                     this->sequence_,
00314                     static_cast<int> (request_id),
00315                     ps->id()));
00316       ps->worker_task()->execute (method);
00317     }
00318   else
00319     {
00320       if (DEBUG_LEVEL > 5)
00321         ACE_DEBUG ((LM_DEBUG,
00322                     "(%P|%t) Routing Slip #%d: not dispatching Delivery_Request %d to "
00323                     "proxy supplier %d; already shut down\n",
00324                     this->sequence_,
00325                     static_cast<int> (request_id),
00326                     ps->id()));
00327     }
00328 }
00329 #endif // forward
00330 
00331 void
00332 Routing_Slip::dispatch (
00333   TAO_Notify_ProxySupplier* ps,
00334   bool filter
00335   ACE_ENV_ARG_DECL)
00336 {
00337   // cannot be the first action
00338   ACE_ASSERT (this->state_ != rssCREATING);
00339 
00340   TAO_Notify_ProxySupplier::Ptr psgrd(ps);
00341   Routing_Slip_Guard guard (this->internals_);
00342 
00343   size_t request_id = delivery_requests_.size ();
00344 
00345   if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
00346       ACE_TEXT ("(%P|%t) Routing Slip #%d: add Delivery_Request #%d: Dispatch %s; completed %d of %d\n"),
00347       this->sequence_,
00348       static_cast<int> (request_id),
00349       filter ? ACE_TEXT ("Filter") : ACE_TEXT ("No Filter"),
00350       static_cast<int> (this->complete_requests_),
00351       static_cast<int> (this->delivery_requests_.size ())
00352       ));
00353 
00354   Delivery_Request_Ptr request (new Delivery_Request (this->this_ptr_, request_id));
00355   if (! ps->has_shutdown() )
00356     {
00357       this->delivery_requests_.push_back (request);
00358       TAO_Notify_Method_Request_Dispatch_No_Copy method (request, ps, filter);
00359       guard.release ();
00360       if (DEBUG_LEVEL > 8)
00361         ACE_DEBUG ((LM_DEBUG,
00362                     "(%P|%t) Routing Slip #%d: dispatching Delivery_Request %d to "
00363                     "proxy supplier %d\n",
00364                     this->sequence_,
00365                     static_cast<int> (request_id),
00366                     ps->id()));
00367       ps->execute_task (method ACE_ENV_ARG_PARAMETER);
00368       ACE_CHECK;
00369     }
00370   else
00371     {
00372       if (DEBUG_LEVEL > 5)
00373         ACE_DEBUG ((LM_DEBUG,
00374                     "(%P|%t) Routing Slip #%d: not dispatching Delivery_Request %d to "
00375                     "proxy supplier %d; already shut down\n",
00376                     this->sequence_,
00377                     static_cast<int> (request_id),
00378                     ps->id()));
00379     }
00380 }
00381 
00382 //////////
00383 // signals
00384 
00385 void
00386 Routing_Slip::delivery_request_complete (size_t request_id)
00387 {
00388   Routing_Slip_Guard guard (this->internals_);
00389   ACE_ASSERT (request_id < this->delivery_requests_.size ());
00390   // reset the pointer to allow the delivery_request to be deleted.
00391   this->delivery_requests_[request_id].reset ();
00392   this->complete_requests_ += 1;
00393 
00394   if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
00395       ACE_TEXT ("(%P|%t) Routing Slip #%d: delivery_request_complete #%d: completed %d of %d\n"),
00396       this->sequence_,
00397       static_cast<int> (request_id),
00398       static_cast<int> (this->complete_requests_),
00399       static_cast<int> (this->delivery_requests_.size ())
00400       ));
00401   State state = this->state_;
00402   switch (state)
00403   {
00404     case rssTRANSIENT:
00405     {
00406       continue_state_transient (guard);
00407       break;
00408     }
00409     case rssNEW:
00410     {
00411       continue_state_new (guard);
00412       break;
00413     }
00414     case rssSAVING:
00415     {
00416       enter_state_changed_while_saving (guard);
00417       break;
00418     }
00419     case rssUPDATING:
00420     {
00421       enter_state_changed_while_saving (guard);
00422       break;
00423     }
00424     case rssSAVED:
00425     {
00426       enter_state_changed (guard);
00427       break;
00428     }
00429     case rssCHANGED_WHILE_SAVING:
00430     {
00431       continue_state_changed_while_saving (guard);
00432       break;
00433     }
00434     case rssCHANGED:
00435     {
00436       continue_state_changed (guard);
00437       break;
00438     }
00439     default:
00440     {
00441       ACE_ERROR ((LM_ERROR,
00442         ACE_TEXT ("(%P|%t) Notification Service Routing Slip: Unexpected delivery_request_complete in state %d\n"),
00443         static_cast<int> (this->state_)
00444         ));
00445       break;
00446     }
00447   }
00448 }
00449 
00450 void
00451 Routing_Slip::at_front_of_persist_queue ()
00452 {
00453   Routing_Slip_Guard guard (this->internals_);
00454   State state = this->state_;
00455   switch (state)
00456   {
00457     case rssNEW:
00458     {
00459       if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
00460         ACE_TEXT ("(%P|%t) Routing Slip #%d: NEW Reached front of queue\n"),
00461         this->sequence_
00462         ));
00463       enter_state_saving (guard);
00464       break;
00465     }
00466     case rssCOMPLETE_WHILE_NEW:
00467     {
00468       if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
00469         ACE_TEXT ("(%P|%t) Routing Slip #%d: COMPLETE_WHILE_NEW Reached front of queue\n"),
00470         this->sequence_
00471         ));
00472       this->persistent_queue_.complete ();
00473       enter_state_terminal (guard);
00474       break;
00475     }
00476     case rssCHANGED:
00477     {
00478       if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
00479         ACE_TEXT ("(%P|%t) Routing Slip #%d: CHANGED Reached front of queue\n"),
00480         this->sequence_
00481         ));
00482       enter_state_updating (guard);
00483       break;
00484     }
00485     case rssCOMPLETE:
00486     {
00487       if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
00488         ACE_TEXT ("(%P|%t) Routing Slip #%d: COMPLETE Reached front of queue\n"),
00489         this->sequence_
00490         ));
00491       enter_state_deleting (guard);
00492       break;
00493     }
00494     default:
00495     {
00496       ACE_ERROR ((LM_ERROR,
00497         ACE_TEXT ("(%P|%t) Routing Slip %d: Unexpected at_front_of_persist_queue in state %d\n"),
00498         this->sequence_,
00499         static_cast<int> (this->state_)
00500         ));
00501       break;
00502     }
00503   }
00504 }
00505 
00506 void
00507 Routing_Slip::persist_complete ()
00508 {
00509   // keep this object around til this method returns.
00510   Routing_Slip_Ptr me(this->this_ptr_);
00511   Routing_Slip_Guard guard (this->internals_);
00512   ACE_ASSERT (guard.locked ());
00513 
00514   // allow the ConsumerProxy to return from the CORBA push call.
00515   if (! is_safe_)
00516   {
00517     is_safe_ = true;
00518     this->until_safe_.signal ();
00519   }
00520 
00521   State state = this->state_;
00522   switch (state)
00523   {
00524     case rssSAVING:
00525     {
00526       if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
00527         ACE_TEXT ("(%P|%t) Routing Slip #%d: SAVING persist complete\n"),
00528         this->sequence_
00529         ));
00530       enter_state_saved(guard);
00531       break;
00532     }
00533     case rssCHANGED_WHILE_SAVING:
00534     {
00535       enter_state_changed (guard);
00536       break;
00537     }
00538     case rssUPDATING:
00539     {
00540       if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
00541         ACE_TEXT ("(%P|%t) Routing Slip #%d: UPDATING persist complete\n"),
00542         this->sequence_
00543         ));
00544       enter_state_saved (guard);
00545       break;
00546     }
00547     case rssDELETING:
00548     {
00549       if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
00550         ACE_TEXT ("(%P|%t) Routing Slip #%d: DELETING persist complete\n"),
00551         this->sequence_
00552         ));
00553       enter_state_terminal (guard);
00554       break;
00555     }
00556     default:
00557     {
00558       ACE_ERROR ((LM_ERROR,
00559         ACE_TEXT ("(%P|%t) Notification Service Routing Slip: Unexpected transition in state %d\n"),
00560         static_cast<int> (this->state_)
00561         ));
00562       break;
00563     }
00564   }
00565   this->persistent_queue_.complete ();
00566 }
00567 
00568 //////////////////
00569 // support methods
00570 
00571 bool
00572 Routing_Slip::all_deliveries_complete () const
00573 {
00574   return this->complete_requests_ == this->delivery_requests_.size ();
00575 }
00576 
00577 void
00578 Routing_Slip::add_to_persist_queue(Routing_Slip_Guard & guard)
00579 {
00580   guard.release ();
00581   this->persistent_queue_.add (this->this_ptr_);
00582   guard.acquire (); // necessary?
00583 }
00584 
00585 ////////////////////
00586 // State transitions
00587 
00588 void
00589 Routing_Slip::enter_state_new (Routing_Slip_Guard & guard)
00590 {
00591   ++count_enter_new_;
00592   if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
00593       ACE_TEXT ("(%P|%t) Routing Slip #%d: enter state NEW\n"),
00594       this->sequence_
00595       ));
00596   this->state_ = rssNEW;
00597   add_to_persist_queue(guard);
00598 }
00599 
00600 void
00601 Routing_Slip::continue_state_new (Routing_Slip_Guard & guard)
00602 {
00603   ++count_continue_new_;
00604   if (all_deliveries_complete ())
00605   {
00606     this->enter_state_complete_while_new (guard);
00607   }
00608 }
00609 void
00610 Routing_Slip::enter_state_complete_while_new (Routing_Slip_Guard & guard)
00611 {
00612   ++count_enter_complete_while_new_;
00613   ACE_UNUSED_ARG (guard);
00614   if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
00615       ACE_TEXT ("(%P|%t) Routing Slip #%d: enter state COMPLETE_WHILE_NEW\n"),
00616       this->sequence_
00617       ));
00618   // allow the ConsumerProxy to return from the CORBA push call.
00619   if (! is_safe_)
00620   {
00621     is_safe_ = true;
00622     this->until_safe_.signal ();
00623   }
00624   this->state_ = rssCOMPLETE_WHILE_NEW;
00625 }
00626 
00627 void
00628 Routing_Slip::enter_state_reloaded (Routing_Slip_Guard & guard)
00629 {
00630   ++count_enter_reloaded_;
00631   ACE_UNUSED_ARG (guard);
00632   if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
00633       ACE_TEXT ("(%P|%t) Routing Slip #&d: enter state RELOADED\n"),
00634       this->sequence_
00635       ));
00636   this->state_ = rssRELOADED;
00637 }
00638 
00639 void
00640 Routing_Slip::enter_state_transient (Routing_Slip_Guard & guard)
00641 {
00642   ++count_enter_transient_;
00643   ACE_UNUSED_ARG (guard);
00644   if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
00645       ACE_TEXT ("(%P|%t) Routing Slip #%d: enter state TRANSIENT\n"),
00646       this->sequence_
00647       ));
00648   this->state_ = rssTRANSIENT;
00649   if (! is_safe_)
00650   {
00651     is_safe_ = true;
00652     this->until_safe_.signal ();
00653   }
00654   if (all_deliveries_complete ())
00655   {
00656     enter_state_terminal (guard);
00657   }
00658 }
00659 
00660 void
00661 Routing_Slip::continue_state_transient (Routing_Slip_Guard & guard)
00662 {
00663   ++count_continue_transient_;
00664   if (all_deliveries_complete ())
00665   {
00666     enter_state_terminal (guard);
00667   }
00668 }
00669 void
00670 Routing_Slip::enter_state_saving (Routing_Slip_Guard & guard)
00671 {
00672   ++count_enter_saving_;
00673   if (!create_persistence_manager ())
00674   {
00675     // Note This should actually be a throw (out of memory)
00676     // but we cheat and make this a transient event.
00677     this->persistent_queue_.complete ();
00678     enter_state_transient (guard);
00679   }
00680   else
00681   {
00682     if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
00683         ACE_TEXT ("(%P|%t) Routing Slip #%d: enter state SAVING\n"),
00684         this->sequence_
00685         ));
00686     this->state_ = rssSAVING;
00687 
00688     TAO_OutputCDR event_cdr;
00689     this->event_->marshal (event_cdr);
00690 
00691     const ACE_Message_Block *event_mb = event_cdr.begin ();
00692     TAO_OutputCDR rs_cdr;
00693     marshal (rs_cdr);
00694     const ACE_Message_Block *rs_mb = rs_cdr.begin ();
00695 
00696     guard.release ();
00697     this->rspm_->store (*event_mb, *rs_mb);
00698 
00699     guard.acquire (); // necessary?
00700   }
00701 }
00702 
00703 void
00704 Routing_Slip::enter_state_saved (Routing_Slip_Guard & guard)
00705 {
00706   ++count_enter_saved_;
00707   ACE_UNUSED_ARG (guard);
00708   if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
00709       ACE_TEXT ("(%P|%t) Routing Slip #%d: enter state SAVED\n"),
00710       this->sequence_
00711       ));
00712   this->state_ = rssSAVED;
00713 }
00714 
00715 void
00716 Routing_Slip::enter_state_updating (Routing_Slip_Guard & guard)
00717 {
00718   ++count_enter_updating_;
00719   if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
00720       ACE_TEXT ("(%P|%t) Routing Slip #%d: enter state UPDATING\n"),
00721       this->sequence_
00722       ));
00723   this->state_ = rssUPDATING;
00724 
00725   TAO_OutputCDR rs_cdr;
00726   marshal (rs_cdr);
00727   const ACE_Message_Block *rs_mb = rs_cdr.begin ();
00728   guard.release ();
00729 
00730   ACE_ASSERT (this->rspm_ != 0);
00731   this->rspm_->update (*rs_mb);
00732   guard.acquire (); // necessary?
00733 }
00734 
00735 
00736 void
00737 Routing_Slip::enter_state_changed_while_saving (Routing_Slip_Guard & guard)
00738 {
00739   ++count_enter_changed_while_saving_;
00740   ACE_UNUSED_ARG (guard);
00741   if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
00742       ACE_TEXT ("(%P|%t) Routing Slip #%d: enter state CHANGED_WHILE_SAVING\n"),
00743       this->sequence_
00744       ));
00745   this->state_ = rssCHANGED_WHILE_SAVING;
00746 }
00747 
00748 void
00749 Routing_Slip::continue_state_changed_while_saving (Routing_Slip_Guard & guard)
00750 {
00751   ACE_UNUSED_ARG (guard);
00752   // no action necessary
00753 }
00754 
00755 void
00756 Routing_Slip::enter_state_changed (Routing_Slip_Guard & guard)
00757 {
00758   ++count_enter_changed_;
00759   if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
00760       ACE_TEXT ("(%P|%t) Routing Slip #%d: enter state CHANGED\n"),
00761       this->sequence_
00762       ));
00763   // complete state change BEFORE initiating request to avoid
00764   // race condition if request finishes before state is stable.
00765   this->state_ = rssCHANGED;
00766   if (all_deliveries_complete ())
00767   {
00768     enter_state_complete (guard);
00769   }
00770   add_to_persist_queue (guard);
00771 }
00772 
00773 void
00774 Routing_Slip::continue_state_changed (Routing_Slip_Guard & guard)
00775 {
00776   ++count_continue_changed_;
00777   if (all_deliveries_complete ())
00778   {
00779     enter_state_complete (guard);
00780   }
00781 }
00782 
00783 void
00784 Routing_Slip::enter_state_complete (Routing_Slip_Guard & guard)
00785 {
00786   ++count_enter_complete_;
00787   ACE_UNUSED_ARG (guard);
00788   if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
00789       ACE_TEXT ("(%P|%t) Routing Slip #%d: enter state COMPLETE\n"),
00790       this->sequence_
00791       ));
00792   this->state_ = rssCOMPLETE;
00793 }
00794 
00795 void
00796 Routing_Slip::enter_state_deleting (Routing_Slip_Guard & guard)
00797 {
00798   ++count_enter_deleting_;
00799   if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
00800       ACE_TEXT ("(%P|%t) Routing Slip #%d: enter state DELETING\n"),
00801       this->sequence_
00802       ));
00803   this->state_ = rssDELETING;
00804   guard.release ();
00805   this->rspm_->remove ();
00806   guard.acquire (); // necessary?
00807 }
00808 
00809 void
00810 Routing_Slip::enter_state_terminal (Routing_Slip_Guard & guard)
00811 {
00812   ++count_enter_terminal_;
00813   ACE_UNUSED_ARG (guard);
00814   ACE_ASSERT( this->is_safe_);
00815   if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
00816       ACE_TEXT ("(%P|%t) Routing Slip #%d: enter state TERMINAL\n"),
00817       this->sequence_
00818       ));
00819   this->state_ = rssTERMINAL;
00820   this->this_ptr_.reset ();
00821 }
00822 
00823 void
00824 Routing_Slip::marshal (TAO_OutputCDR & cdr)
00825 {
00826   size_t request_count = this->delivery_requests_.size();
00827   cdr.write_ulong (request_count - this->complete_requests_);
00828   for (size_t nreq = 0; nreq < request_count; ++nreq)
00829   {
00830     Delivery_Request * request = this->delivery_requests_[nreq].get ();
00831     if (request != 0)
00832     {
00833       request->marshal (cdr);
00834     }
00835   }
00836 }
00837 
00838 bool
00839 Routing_Slip::unmarshal (TAO_Notify_EventChannelFactory &ecf, TAO_InputCDR & cdr)
00840 {
00841   CORBA::ULong count = 0;
00842   cdr.read_ulong (count);
00843   for (size_t nreq = 0; nreq < count; ++nreq)
00844   {
00845     ACE_CDR::Octet code = 0;
00846     while (cdr.read_octet(code))
00847     {
00848       ACE_DECLARE_NEW_ENV;
00849       ACE_TRY
00850       {
00851         if (code == TAO_Notify_Method_Request_Dispatch::persistence_code)
00852         {
00853           Delivery_Request * prequest;
00854           ACE_NEW_THROW_EX (
00855             prequest,
00856             Delivery_Request(this_ptr_, this->delivery_requests_.size ()),
00857             CORBA::NO_MEMORY ());
00858           ACE_TRY_CHECK;
00859           Delivery_Request_Ptr request(prequest);
00860           TAO_Notify_Method_Request_Dispatch_Queueable * method =
00861             TAO_Notify_Method_Request_Dispatch::unmarshal (
00862               request,
00863               ecf,
00864               cdr
00865               ACE_ENV_ARG_PARAMETER);
00866           ACE_TRY_CHECK;
00867           if (method != 0)
00868           {
00869             this->delivery_requests_.push_back (request);
00870             this->delivery_methods_.push_back (method);
00871           }
00872         }
00873         else if (code == TAO_Notify_Method_Request_Lookup::persistence_code)
00874         {
00875           Delivery_Request_Ptr request(new Delivery_Request(this_ptr_, this->delivery_requests_.size ()));
00876           TAO_Notify_Method_Request_Lookup_Queueable * method =
00877               TAO_Notify_Method_Request_Lookup::unmarshal (
00878                 request,
00879                 ecf,
00880                 cdr
00881                 ACE_ENV_ARG_PARAMETER);
00882           ACE_TRY_CHECK
00883           if (method != 0)
00884           {
00885             this->delivery_requests_.push_back (request);
00886             this->delivery_methods_.push_back (method);
00887           }
00888         }
00889       }
00890       ACE_CATCHANY;
00891       {
00892         // @@todo should we log this?
00893         // just ignore failures
00894       }
00895       ACE_ENDTRY;
00896     }
00897   }
00898   return this->delivery_requests_.size () > 0;
00899 }
00900 
00901 void
00902 Routing_Slip::reconnect (ACE_ENV_SINGLE_ARG_DECL)
00903 {
00904   Routing_Slip_Guard guard (this->internals_);
00905   enter_state_saved (guard);
00906   guard.release ();
00907   //@@todo is there a worker_task available to do this?
00908   size_t count = this->delivery_methods_.size ();
00909   for (size_t nmethod = 0; nmethod < count; ++nmethod)
00910   {
00911     this->delivery_methods_[nmethod]->execute (ACE_ENV_SINGLE_ARG_PARAMETER);
00912     ACE_CHECK;
00913   }
00914   this->delivery_methods_.clear ();
00915 }
00916 
00917 int
00918 Routing_Slip::sequence() const
00919 {
00920   return this->sequence_;
00921 }
00922 
00923 bool
00924 Routing_Slip::should_retry () const
00925 {
00926   // simple minded test: if it's transient, don't retry it
00927   // @@todo Eventually this should check timeout, discard policy, etc.
00928   return this->state_ != rssTRANSIENT;
00929 }
00930 
00931 } // namespace
00932 
00933 TAO_END_VERSIONED_NAMESPACE_DECL

Generated on Thu Nov 9 13:24:15 2006 for TAO_CosNotification by doxygen 1.3.6