00001
00002
00003 #include "orbsvcs/Notify/Routing_Slip.h"
00004
00005 #include "orbsvcs/Notify/Delivery_Request.h"
00006 #include "orbsvcs/Notify/Worker_Task.h"
00007 #include "orbsvcs/Notify/ProxyConsumer.h"
00008 #include "orbsvcs/Notify/ProxySupplier.h"
00009 #include "orbsvcs/Notify/Event_Persistence_Strategy.h"
00010 #include "orbsvcs/Notify/Routing_Slip_Persistence_Manager.h"
00011 #include "orbsvcs/Notify/Routing_Slip_Queue.h"
00012 #include "orbsvcs/Notify/Method_Request_Lookup.h"
00013 #include "orbsvcs/Notify/Method_Request_Dispatch.h"
00014
00015 #include "tao/debug.h"
00016 #include "tao/corba.h"
00017
00018 #include "ace/Dynamic_Service.h"
00019 #include "ace/Truncate.h"
00020
00021
00022 #ifndef DEBUG_LEVEL
00023 # define DEBUG_LEVEL TAO_debug_level
00024 #endif //DEBUG_LEVEL
00025
00026 #define QUEUE_ALLOWED 1
00027
00028 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00029
00030 namespace TAO_Notify
00031 {
00032
00033
00034
00035 Routing_Slip_Queue Routing_Slip::persistent_queue_(QUEUE_ALLOWED);
00036
00037 TAO_SYNCH_MUTEX Routing_Slip::sequence_lock_;
00038 int Routing_Slip::routing_slip_sequence_= 0;
00039 size_t Routing_Slip::count_enter_transient_ = 0;
00040 size_t Routing_Slip::count_continue_transient_ = 0;
00041 size_t Routing_Slip::count_enter_reloaded_ = 0;
00042 size_t Routing_Slip::count_enter_new_ = 0;
00043 size_t Routing_Slip::count_continue_new_ = 0;
00044 size_t Routing_Slip::count_enter_complete_while_new_ = 0;
00045 size_t Routing_Slip::count_enter_saving_ = 0;
00046 size_t Routing_Slip::count_enter_saved_ = 0;
00047 size_t Routing_Slip::count_enter_updating_ = 0;
00048 size_t Routing_Slip::count_enter_changed_while_saving_ = 0;
00049 size_t Routing_Slip::count_continue_changed_while_saving_ = 0;
00050 size_t Routing_Slip::count_enter_changed_ = 0;
00051 size_t Routing_Slip::count_continue_changed_ = 0;
00052 size_t Routing_Slip::count_enter_complete_ = 0;
00053 size_t Routing_Slip::count_enter_deleting_ = 0;
00054 size_t Routing_Slip::count_enter_terminal_ = 0;
00055
00056 Routing_Slip_Ptr
00057 Routing_Slip::create (const TAO_Notify_Event::Ptr& event)
00058 {
00059 Routing_Slip * prs;
00060 ACE_NEW_THROW_EX (prs, Routing_Slip (event), CORBA::NO_MEMORY ());
00061 Routing_Slip_Ptr result(prs);
00062 result->this_ptr_ = result;
00063
00064
00065 if (DEBUG_LEVEL > 8 && ((result->sequence_ % 100) == 0))
00066 {
00067 ACE_ERROR ((LM_ERROR,
00068 ACE_TEXT ("(%P|%t) Routing_Slip_Statistics\n")
00069 ACE_TEXT (" enter_transient \t%d\n")
00070 ACE_TEXT (" continue_transient \t%d\n")
00071 ACE_TEXT (" enter_reloaded \t%d\n")
00072 ACE_TEXT (" enter_new \t%d\n")
00073 ACE_TEXT (" continue_new \t%d\n")
00074 ACE_TEXT (" enter_complete_while_new \t%d\n")
00075 ACE_TEXT (" enter_saving \t%d\n")
00076 ACE_TEXT (" enter_saved \t%d\n")
00077 ACE_TEXT (" enter_updating \t%d\n")
00078 ACE_TEXT (" enter_changed_while_saving \t%d\n")
00079 ACE_TEXT (" continue_changed_while_saving\t%d\n")
00080 ACE_TEXT (" enter_changed \t%d\n")
00081 ACE_TEXT (" continue_changed \t%d\n")
00082 ACE_TEXT (" enter_complete \t%d\n")
00083 ACE_TEXT (" enter_deleting \t%d\n")
00084 ACE_TEXT (" enter_terminal \t%d\n")
00085 , static_cast<int> (count_enter_transient_)
00086 , static_cast<int> (count_continue_transient_)
00087 , static_cast<int> (count_enter_reloaded_)
00088 , static_cast<int> (count_enter_new_)
00089 , static_cast<int> (count_continue_new_)
00090 , static_cast<int> (count_enter_complete_while_new_)
00091 , static_cast<int> (count_enter_saving_)
00092 , static_cast<int> (count_enter_saved_)
00093 , static_cast<int> (count_enter_updating_)
00094 , static_cast<int> (count_enter_changed_while_saving_)
00095 , static_cast<int> (count_continue_changed_while_saving_)
00096 , static_cast<int> (count_enter_changed_)
00097 , static_cast<int> (count_continue_changed_)
00098 , static_cast<int> (count_enter_complete_)
00099 , static_cast<int> (count_enter_deleting_)
00100 , static_cast<int> (count_enter_terminal_)
00101 ));
00102 }
00103 return result;
00104 }
00105
00106
00107 Routing_Slip_Ptr
00108 Routing_Slip::create (
00109 TAO_Notify_EventChannelFactory & ecf,
00110 Routing_Slip_Persistence_Manager * rspm)
00111 {
00112 Routing_Slip_Ptr result;
00113 ACE_Message_Block * event_mb = 0;
00114 ACE_Message_Block * rs_mb = 0;
00115 try
00116 {
00117 if (rspm->reload (event_mb, rs_mb))
00118 {
00119 TAO_InputCDR cdr_event (event_mb);
00120 TAO_Notify_Event::Ptr event (TAO_Notify_Event::unmarshal (cdr_event));
00121 if (event.isSet())
00122 {
00123 result = create (event);
00124 TAO_InputCDR cdr_rs (rs_mb);
00125 if ( result->unmarshal (ecf, cdr_rs))
00126 {
00127 result->set_rspm (rspm);
00128 }
00129 else
00130 {
00131 ACE_ERROR ((LM_ERROR,
00132 ACE_TEXT ("(%P|%t) Routing_Slip::create: Unmarshalling failed for routing slip.\n")
00133 ));
00134 result.reset ();
00135 }
00136 }
00137 else
00138 {
00139 ACE_ERROR ((LM_ERROR,
00140 ACE_TEXT ("(%P|%t) Routing_Slip::create: Unmarshalling failed for event.\n")
00141 ));
00142 }
00143 }
00144 }
00145 catch (const CORBA::Exception&)
00146 {
00147 ACE_ERROR ((LM_ERROR,
00148 ACE_TEXT ("(%P|%t) Routing_Slip::create: Exception reloading event.\n")
00149 ));
00150 }
00151 delete event_mb;
00152 delete rs_mb;
00153
00154 return result;
00155 }
00156
00157 void
00158 Routing_Slip::set_rspm (Routing_Slip_Persistence_Manager * rspm)
00159 {
00160 this->rspm_ = rspm;
00161 if (rspm_ != 0)
00162 {
00163 rspm->set_callback (this);
00164 }
00165 }
00166
00167 Routing_Slip::Routing_Slip(
00168 const TAO_Notify_Event::Ptr& event)
00169 : is_safe_ (false)
00170 , until_safe_ (internals_)
00171 , this_ptr_ (0)
00172 , event_(event)
00173 , state_ (rssCREATING)
00174 , complete_requests_ (0)
00175 , rspm_ (0)
00176 {
00177 Routing_Slip_Guard guard (sequence_lock_);
00178 this->sequence_ = ++routing_slip_sequence_;
00179 if (DEBUG_LEVEL > 1) ACE_DEBUG ((LM_DEBUG,
00180 ACE_TEXT ("(%P|%t) Routing Slip #%d: constructor\n"),
00181 this->sequence_
00182 ));
00183 }
00184
00185 Routing_Slip::~Routing_Slip ()
00186 {
00187 if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
00188 ACE_TEXT ("(%P|%t) Routing Slip #%d: destructor\n"),
00189 this->sequence_
00190 ));
00191 }
00192
00193 bool
00194 Routing_Slip::create_persistence_manager()
00195 {
00196 if (this->rspm_ == 0)
00197 {
00198 Event_Persistence_Strategy * strategy =
00199 ACE_Dynamic_Service <TAO_Notify::Event_Persistence_Strategy>::instance ("Event_Persistence");
00200 if (strategy != 0)
00201 {
00202 Event_Persistence_Factory * factory = strategy->get_factory ();
00203 if (factory != 0)
00204 {
00205 set_rspm (factory->create_routing_slip_persistence_manager(this));
00206 }
00207 }
00208 }
00209 return this->rspm_ != 0;
00210 }
00211
00212 const TAO_Notify_Event::Ptr &
00213 Routing_Slip::event () const
00214 {
00215 return this->event_;
00216 }
00217
00218 void
00219 Routing_Slip::wait_persist ()
00220 {
00221 Routing_Slip_Guard guard (this->internals_);
00222 while (!this->is_safe_)
00223 {
00224 this->until_safe_.wait ();
00225 }
00226 }
00227
00228 void
00229 Routing_Slip::route (TAO_Notify_ProxyConsumer* pc, bool reliable_channel)
00230 {
00231 ACE_ASSERT(pc != 0);
00232
00233 TAO_Notify_ProxyConsumer::Ptr pcgrd(pc);
00234
00235 Routing_Slip_Guard guard (this->internals_);
00236
00237 size_t request_id = delivery_requests_.size ();
00238
00239 if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
00240 ACE_TEXT ("(%P|%t) Routing Slip #%d: add Delivery_Request #%d: lookup, completed %d of %d\n"),
00241 this->sequence_,
00242 static_cast<int> (request_id),
00243 static_cast<int> (this->complete_requests_),
00244 static_cast<int> (this->delivery_requests_.size ())
00245 ));
00246
00247 Delivery_Request_Ptr request (new Delivery_Request (this->this_ptr_, request_id));
00248 this->delivery_requests_.push_back (request);
00249 TAO_Notify_Method_Request_Lookup_Queueable method (request, pc);
00250
00251 if (this->state_ == rssCREATING)
00252 {
00253 if (! reliable_channel)
00254 {
00255 enter_state_transient (guard);
00256 }
00257 else if (ACE_Dynamic_Service <TAO_Notify::Event_Persistence_Strategy>::instance ("Event_Persistence") == 0)
00258 {
00259 enter_state_transient (guard);
00260 }
00261 else if (! this->event_->reliable().is_valid())
00262 {
00263 enter_state_new (guard);
00264 }
00265 else if (this->event_->reliable().value() == true)
00266 {
00267 enter_state_new (guard);
00268 }
00269 else
00270 {
00271 enter_state_transient (guard);
00272 }
00273 }
00274 else
00275 {
00276
00277
00278
00279 guard.release ();
00280 }
00281 pc->execute_task (method);
00282 }
00283 #if 0 // forward
00284 void
00285 Routing_Slip::forward (TAO_Notify_ProxySupplier* ps, bool filter)
00286 {
00287
00288 ACE_ASSERT (this->state_ == rssCREATING);
00289
00290 TAO_Notify_ProxySupplier::Ptr psgrd(ps);
00291 Routing_Slip_Guard guard (this->internals_);
00292
00293 enter_state_transient (guard);
00294 guard.acquire();
00295 size_t request_id = delivery_requests_.size ();
00296
00297 if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
00298 ACE_TEXT ("(%P|%t) Routing Slip #%d: add Delivery_Request #%d: Forward %s; completed %d of %d\n"),
00299 this->sequence_,
00300 static_cast<int> (request_id),
00301 filter ? ACE_TEXT ("Filter") : ACE_TEXT ("No Filter"),
00302 static_cast<int> (this->complete_requests_),
00303 static_cast<int> (this->delivery_requests_.size ())
00304 ));
00305
00306 Delivery_Request_Ptr request (new Delivery_Request (this->this_ptr_, request_id));
00307 if (! ps->has_shutdown() )
00308 {
00309 this->delivery_requests_.push_back (request);
00310
00311 TAO_Notify_Method_Request_Dispatch_No_Copy method (request, ps, filter);
00312 guard.release ();
00313 if (DEBUG_LEVEL > 8)
00314 ACE_DEBUG ((LM_DEBUG,
00315 "(%P|%t) Routing Slip #%d: dispatching Delivery_Request %d to "
00316 "proxy supplier %d\n",
00317 this->sequence_,
00318 static_cast<int> (request_id),
00319 ps->id()));
00320 ps->worker_task()->execute (method);
00321 }
00322 else
00323 {
00324 if (DEBUG_LEVEL > 5)
00325 ACE_DEBUG ((LM_DEBUG,
00326 "(%P|%t) Routing Slip #%d: not dispatching Delivery_Request %d to "
00327 "proxy supplier %d; already shut down\n",
00328 this->sequence_,
00329 static_cast<int> (request_id),
00330 ps->id()));
00331 }
00332 }
00333 #endif // forward
00334
00335 void
00336 Routing_Slip::dispatch (
00337 TAO_Notify_ProxySupplier* ps,
00338 bool filter)
00339 {
00340
00341 ACE_ASSERT (this->state_ != rssCREATING);
00342
00343 TAO_Notify_ProxySupplier::Ptr psgrd(ps);
00344 Routing_Slip_Guard guard (this->internals_);
00345
00346 size_t request_id = delivery_requests_.size ();
00347
00348 if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
00349 ACE_TEXT ("(%P|%t) Routing Slip #%d: add Delivery_Request #%d: Dispatch %s; completed %d of %d\n"),
00350 this->sequence_,
00351 static_cast<int> (request_id),
00352 filter ? ACE_TEXT ("Filter") : ACE_TEXT ("No Filter"),
00353 static_cast<int> (this->complete_requests_),
00354 static_cast<int> (this->delivery_requests_.size ())
00355 ));
00356
00357 Delivery_Request_Ptr request (new Delivery_Request (this->this_ptr_, request_id));
00358 if (! ps->has_shutdown() )
00359 {
00360 this->delivery_requests_.push_back (request);
00361 TAO_Notify_Method_Request_Dispatch_No_Copy method (request, ps, filter);
00362 guard.release ();
00363 if (DEBUG_LEVEL > 8)
00364 ACE_DEBUG ((LM_DEBUG,
00365 "(%P|%t) Routing Slip #%d: dispatching Delivery_Request %d to "
00366 "proxy supplier %d\n",
00367 this->sequence_,
00368 static_cast<int> (request_id),
00369 ps->id()));
00370 ps->execute_task (method);
00371 }
00372 else
00373 {
00374 if (DEBUG_LEVEL > 5)
00375 ACE_DEBUG ((LM_DEBUG,
00376 "(%P|%t) Routing Slip #%d: not dispatching Delivery_Request %d to "
00377 "proxy supplier %d; already shut down\n",
00378 this->sequence_,
00379 static_cast<int> (request_id),
00380 ps->id()));
00381 }
00382 }
00383
00384
00385
00386
00387 void
00388 Routing_Slip::delivery_request_complete (size_t request_id)
00389 {
00390 Routing_Slip_Guard guard (this->internals_);
00391 ACE_ASSERT (request_id < this->delivery_requests_.size ());
00392
00393 this->delivery_requests_[request_id].reset ();
00394 this->complete_requests_ += 1;
00395
00396 if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
00397 ACE_TEXT ("(%P|%t) Routing Slip #%d: delivery_request_complete #%d: completed %d of %d\n"),
00398 this->sequence_,
00399 static_cast<int> (request_id),
00400 static_cast<int> (this->complete_requests_),
00401 static_cast<int> (this->delivery_requests_.size ())
00402 ));
00403 State state = this->state_;
00404 switch (state)
00405 {
00406 case rssTRANSIENT:
00407 {
00408 continue_state_transient (guard);
00409 break;
00410 }
00411 case rssNEW:
00412 {
00413 continue_state_new (guard);
00414 break;
00415 }
00416 case rssSAVING:
00417 {
00418 enter_state_changed_while_saving (guard);
00419 break;
00420 }
00421 case rssUPDATING:
00422 {
00423 enter_state_changed_while_saving (guard);
00424 break;
00425 }
00426 case rssSAVED:
00427 {
00428 enter_state_changed (guard);
00429 break;
00430 }
00431 case rssCHANGED_WHILE_SAVING:
00432 {
00433 continue_state_changed_while_saving (guard);
00434 break;
00435 }
00436 case rssCHANGED:
00437 {
00438 continue_state_changed (guard);
00439 break;
00440 }
00441 default:
00442 {
00443 ACE_ERROR ((LM_ERROR,
00444 ACE_TEXT ("(%P|%t) Notification Service Routing Slip: Unexpected delivery_request_complete in state %d\n"),
00445 static_cast<int> (this->state_)
00446 ));
00447 break;
00448 }
00449 }
00450 }
00451
00452 void
00453 Routing_Slip::at_front_of_persist_queue ()
00454 {
00455 Routing_Slip_Guard guard (this->internals_);
00456 State state = this->state_;
00457 switch (state)
00458 {
00459 case rssNEW:
00460 {
00461 if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
00462 ACE_TEXT ("(%P|%t) Routing Slip #%d: NEW Reached front of queue\n"),
00463 this->sequence_
00464 ));
00465 enter_state_saving (guard);
00466 break;
00467 }
00468 case rssCOMPLETE_WHILE_NEW:
00469 {
00470 if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
00471 ACE_TEXT ("(%P|%t) Routing Slip #%d: COMPLETE_WHILE_NEW Reached front of queue\n"),
00472 this->sequence_
00473 ));
00474 guard.release ();
00475 this->persistent_queue_.complete ();
00476 enter_state_terminal (guard);
00477 break;
00478 }
00479 case rssCHANGED:
00480 {
00481 if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
00482 ACE_TEXT ("(%P|%t) Routing Slip #%d: CHANGED Reached front of queue\n"),
00483 this->sequence_
00484 ));
00485 enter_state_updating (guard);
00486 break;
00487 }
00488 case rssCOMPLETE:
00489 {
00490 if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
00491 ACE_TEXT ("(%P|%t) Routing Slip #%d: COMPLETE Reached front of queue\n"),
00492 this->sequence_
00493 ));
00494 enter_state_deleting (guard);
00495 break;
00496 }
00497 default:
00498 {
00499 ACE_ERROR ((LM_ERROR,
00500 ACE_TEXT ("(%P|%t) Routing Slip %d: Unexpected at_front_of_persist_queue in state %d\n"),
00501 this->sequence_,
00502 static_cast<int> (this->state_)
00503 ));
00504 break;
00505 }
00506 }
00507 }
00508
00509 void
00510 Routing_Slip::persist_complete ()
00511 {
00512
00513 Routing_Slip_Ptr me(this->this_ptr_);
00514 Routing_Slip_Guard guard (this->internals_);
00515 ACE_ASSERT (guard.locked ());
00516
00517
00518 if (! is_safe_)
00519 {
00520 is_safe_ = true;
00521 this->until_safe_.signal ();
00522 }
00523
00524 State state = this->state_;
00525 switch (state)
00526 {
00527 case rssSAVING:
00528 {
00529 if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
00530 ACE_TEXT ("(%P|%t) Routing Slip #%d: SAVING persist complete\n"),
00531 this->sequence_
00532 ));
00533 enter_state_saved(guard);
00534 break;
00535 }
00536 case rssCHANGED_WHILE_SAVING:
00537 {
00538 enter_state_changed (guard);
00539 break;
00540 }
00541 case rssUPDATING:
00542 {
00543 if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
00544 ACE_TEXT ("(%P|%t) Routing Slip #%d: UPDATING persist complete\n"),
00545 this->sequence_
00546 ));
00547 enter_state_saved (guard);
00548 break;
00549 }
00550 case rssDELETING:
00551 {
00552 if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
00553 ACE_TEXT ("(%P|%t) Routing Slip #%d: DELETING persist complete\n"),
00554 this->sequence_
00555 ));
00556 enter_state_terminal (guard);
00557 break;
00558 }
00559 default:
00560 {
00561 ACE_ERROR ((LM_ERROR,
00562 ACE_TEXT ("(%P|%t) Notification Service Routing Slip: Unexpected transition in state %d\n"),
00563 static_cast<int> (this->state_)
00564 ));
00565 guard.release ();
00566 break;
00567 }
00568 }
00569 this->persistent_queue_.complete ();
00570 }
00571
00572
00573
00574
00575 bool
00576 Routing_Slip::all_deliveries_complete () const
00577 {
00578 return this->complete_requests_ == this->delivery_requests_.size ();
00579 }
00580
00581 void
00582 Routing_Slip::add_to_persist_queue(Routing_Slip_Guard & guard)
00583 {
00584 guard.release ();
00585 this->persistent_queue_.add (this->this_ptr_);
00586 }
00587
00588
00589
00590
00591 void
00592 Routing_Slip::enter_state_new (Routing_Slip_Guard & guard)
00593 {
00594 ++count_enter_new_;
00595 if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
00596 ACE_TEXT ("(%P|%t) Routing Slip #%d: enter state NEW\n"),
00597 this->sequence_
00598 ));
00599 this->state_ = rssNEW;
00600 add_to_persist_queue(guard);
00601 }
00602
00603 void
00604 Routing_Slip::continue_state_new (Routing_Slip_Guard & guard)
00605 {
00606 ++count_continue_new_;
00607 if (all_deliveries_complete ())
00608 {
00609 this->enter_state_complete_while_new (guard);
00610 }
00611 guard.release ();
00612 }
00613 void
00614 Routing_Slip::enter_state_complete_while_new (Routing_Slip_Guard & guard)
00615 {
00616 ++count_enter_complete_while_new_;
00617 ACE_UNUSED_ARG (guard);
00618 if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
00619 ACE_TEXT ("(%P|%t) Routing Slip #%d: enter state COMPLETE_WHILE_NEW\n"),
00620 this->sequence_
00621 ));
00622
00623 if (! is_safe_)
00624 {
00625 is_safe_ = true;
00626 this->until_safe_.signal ();
00627 }
00628 this->state_ = rssCOMPLETE_WHILE_NEW;
00629 }
00630
00631 void
00632 Routing_Slip::enter_state_reloaded (Routing_Slip_Guard & guard)
00633 {
00634 ++count_enter_reloaded_;
00635 if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
00636 ACE_TEXT ("(%P|%t) Routing Slip #&d: enter state RELOADED\n"),
00637 this->sequence_
00638 ));
00639 this->state_ = rssRELOADED;
00640 guard.release();
00641 }
00642
00643 void
00644 Routing_Slip::enter_state_transient (Routing_Slip_Guard & guard)
00645 {
00646 ++count_enter_transient_;
00647 if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
00648 ACE_TEXT ("(%P|%t) Routing Slip #%d: enter state TRANSIENT\n"),
00649 this->sequence_
00650 ));
00651 this->state_ = rssTRANSIENT;
00652 if (! is_safe_)
00653 {
00654 is_safe_ = true;
00655 this->until_safe_.signal ();
00656 }
00657 if (all_deliveries_complete ())
00658 {
00659 enter_state_terminal (guard);
00660 }
00661 else
00662 {
00663 guard.release ();
00664 }
00665 }
00666
00667 void
00668 Routing_Slip::continue_state_transient (Routing_Slip_Guard & guard)
00669 {
00670 ++count_continue_transient_;
00671 if (all_deliveries_complete ())
00672 {
00673 enter_state_terminal (guard);
00674 }
00675 else
00676 {
00677 guard.release ();
00678 }
00679 }
00680 void
00681 Routing_Slip::enter_state_saving (Routing_Slip_Guard & guard)
00682 {
00683 ++count_enter_saving_;
00684 if (!create_persistence_manager ())
00685 {
00686
00687
00688 guard.release ();
00689 this->persistent_queue_.complete ();
00690 enter_state_transient (guard);
00691 }
00692 else
00693 {
00694 if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
00695 ACE_TEXT ("(%P|%t) Routing Slip #%d: enter state SAVING\n"),
00696 this->sequence_
00697 ));
00698 this->state_ = rssSAVING;
00699
00700 TAO_OutputCDR event_cdr;
00701 this->event_->marshal (event_cdr);
00702
00703 const ACE_Message_Block *event_mb = event_cdr.begin ();
00704 TAO_OutputCDR rs_cdr;
00705 marshal (rs_cdr);
00706 const ACE_Message_Block *rs_mb = rs_cdr.begin ();
00707
00708 guard.release ();
00709 this->rspm_->store (*event_mb, *rs_mb);
00710 }
00711 }
00712
00713 void
00714 Routing_Slip::enter_state_saved (Routing_Slip_Guard & guard)
00715 {
00716 ++count_enter_saved_;
00717 if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
00718 ACE_TEXT ("(%P|%t) Routing Slip #%d: enter state SAVED\n"),
00719 this->sequence_
00720 ));
00721 this->state_ = rssSAVED;
00722 guard.release ();
00723 }
00724
00725 void
00726 Routing_Slip::enter_state_updating (Routing_Slip_Guard & guard)
00727 {
00728 ++count_enter_updating_;
00729 if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
00730 ACE_TEXT ("(%P|%t) Routing Slip #%d: enter state UPDATING\n"),
00731 this->sequence_
00732 ));
00733 this->state_ = rssUPDATING;
00734
00735 TAO_OutputCDR rs_cdr;
00736 marshal (rs_cdr);
00737 const ACE_Message_Block *rs_mb = rs_cdr.begin ();
00738 guard.release ();
00739
00740 ACE_ASSERT (this->rspm_ != 0);
00741 this->rspm_->update (*rs_mb);
00742 }
00743
00744
00745 void
00746 Routing_Slip::enter_state_changed_while_saving (Routing_Slip_Guard & guard)
00747 {
00748 ++count_enter_changed_while_saving_;
00749 if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
00750 ACE_TEXT ("(%P|%t) Routing Slip #%d: enter state CHANGED_WHILE_SAVING\n"),
00751 this->sequence_
00752 ));
00753 this->state_ = rssCHANGED_WHILE_SAVING;
00754 guard.release ();
00755 }
00756
00757 void
00758 Routing_Slip::continue_state_changed_while_saving (Routing_Slip_Guard & guard)
00759 {
00760
00761 guard.release ();
00762 }
00763
00764 void
00765 Routing_Slip::enter_state_changed (Routing_Slip_Guard & guard)
00766 {
00767 ++count_enter_changed_;
00768 if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
00769 ACE_TEXT ("(%P|%t) Routing Slip #%d: enter state CHANGED\n"),
00770 this->sequence_
00771 ));
00772
00773
00774 this->state_ = rssCHANGED;
00775 if (all_deliveries_complete ())
00776 {
00777 enter_state_complete (guard);
00778 }
00779 add_to_persist_queue (guard);
00780 }
00781
00782 void
00783 Routing_Slip::continue_state_changed (Routing_Slip_Guard & guard)
00784 {
00785 ++count_continue_changed_;
00786 if (all_deliveries_complete ())
00787 {
00788 enter_state_complete (guard);
00789 }
00790 else
00791 {
00792 guard.release ();
00793 }
00794 }
00795
00796 void
00797 Routing_Slip::enter_state_complete (Routing_Slip_Guard & guard)
00798 {
00799 ++count_enter_complete_;
00800 if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
00801 ACE_TEXT ("(%P|%t) Routing Slip #%d: enter state COMPLETE\n"),
00802 this->sequence_
00803 ));
00804 this->state_ = rssCOMPLETE;
00805 guard.release ();
00806 }
00807
00808 void
00809 Routing_Slip::enter_state_deleting (Routing_Slip_Guard & guard)
00810 {
00811 ++count_enter_deleting_;
00812 if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
00813 ACE_TEXT ("(%P|%t) Routing Slip #%d: enter state DELETING\n"),
00814 this->sequence_
00815 ));
00816 this->state_ = rssDELETING;
00817 guard.release ();
00818 this->rspm_->remove ();
00819 }
00820
00821 void
00822 Routing_Slip::enter_state_terminal (Routing_Slip_Guard & guard)
00823 {
00824 ++count_enter_terminal_;
00825 ACE_ASSERT( this->is_safe_);
00826 if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
00827 ACE_TEXT ("(%P|%t) Routing Slip #%d: enter state TERMINAL\n"),
00828 this->sequence_
00829 ));
00830 this->state_ = rssTERMINAL;
00831 this->this_ptr_.reset ();
00832 guard.release ();
00833 }
00834
00835 void
00836 Routing_Slip::marshal (TAO_OutputCDR & cdr)
00837 {
00838 size_t request_count = this->delivery_requests_.size();
00839 cdr.write_ulong (
00840 ACE_Utils::truncate_cast<CORBA::ULong> (request_count - this->complete_requests_));
00841 for (size_t nreq = 0; nreq < request_count; ++nreq)
00842 {
00843 Delivery_Request * request = this->delivery_requests_[nreq].get ();
00844 if (request != 0)
00845 {
00846 request->marshal (cdr);
00847 }
00848 }
00849 }
00850
00851 bool
00852 Routing_Slip::unmarshal (TAO_Notify_EventChannelFactory &ecf, TAO_InputCDR & cdr)
00853 {
00854 CORBA::ULong count = 0;
00855 cdr.read_ulong (count);
00856 for (size_t nreq = 0; nreq < count; ++nreq)
00857 {
00858 ACE_CDR::Octet code = 0;
00859 while (cdr.read_octet(code))
00860 {
00861 try
00862 {
00863 if (code == TAO_Notify_Method_Request_Dispatch::persistence_code)
00864 {
00865 Delivery_Request * prequest;
00866 ACE_NEW_THROW_EX (
00867 prequest,
00868 Delivery_Request(this_ptr_, this->delivery_requests_.size ()),
00869 CORBA::NO_MEMORY ());
00870 Delivery_Request_Ptr request(prequest);
00871 TAO_Notify_Method_Request_Dispatch_Queueable * method =
00872 TAO_Notify_Method_Request_Dispatch::unmarshal (
00873 request,
00874 ecf,
00875 cdr);
00876 if (method != 0)
00877 {
00878 this->delivery_requests_.push_back (request);
00879 this->delivery_methods_.push_back (method);
00880 }
00881 }
00882 else if (code == TAO_Notify_Method_Request_Lookup::persistence_code)
00883 {
00884 Delivery_Request_Ptr request(new Delivery_Request(this_ptr_, this->delivery_requests_.size ()));
00885 TAO_Notify_Method_Request_Lookup_Queueable * method =
00886 TAO_Notify_Method_Request_Lookup::unmarshal (
00887 request,
00888 ecf,
00889 cdr);
00890 if (method != 0)
00891 {
00892 this->delivery_requests_.push_back (request);
00893 this->delivery_methods_.push_back (method);
00894 }
00895 }
00896 }
00897 catch (const CORBA::Exception&)
00898 {
00899
00900
00901 }
00902 }
00903 }
00904 return this->delivery_requests_.size () > 0;
00905 }
00906
00907 void
00908 Routing_Slip::reconnect (void)
00909 {
00910 Routing_Slip_Guard guard (this->internals_);
00911 enter_state_saved (guard);
00912
00913
00914 size_t count = this->delivery_methods_.size ();
00915 for (size_t nmethod = 0; nmethod < count; ++nmethod)
00916 {
00917 this->delivery_methods_[nmethod]->execute ();
00918 }
00919 this->delivery_methods_.clear ();
00920 }
00921
00922 int
00923 Routing_Slip::sequence() const
00924 {
00925 return this->sequence_;
00926 }
00927
00928 bool
00929 Routing_Slip::should_retry () const
00930 {
00931
00932
00933 return this->state_ != rssTRANSIENT;
00934 }
00935
00936 }
00937
00938 TAO_END_VERSIONED_NAMESPACE_DECL