00001
00002
00003 #include "orbsvcs/Notify/Routing_Slip.h"
00004
00005 #include "orbsvcs/Notify/Delivery_Request.h"
00006 #include "orbsvcs/Notify/Worker_Task.h"
00007 #include "orbsvcs/Notify/ProxyConsumer.h"
00008 #include "orbsvcs/Notify/ProxySupplier.h"
00009 #include "orbsvcs/Notify/Event_Persistence_Strategy.h"
00010 #include "orbsvcs/Notify/Routing_Slip_Persistence_Manager.h"
00011 #include "orbsvcs/Notify/Routing_Slip_Queue.h"
00012 #include "orbsvcs/Notify/Method_Request_Lookup.h"
00013 #include "orbsvcs/Notify/Method_Request_Dispatch.h"
00014
00015 #include "tao/debug.h"
00016 #include "tao/corba.h"
00017
00018 #include "ace/Dynamic_Service.h"
00019
00020
00021 #ifndef DEBUG_LEVEL
00022 # define DEBUG_LEVEL TAO_debug_level
00023 #endif //DEBUG_LEVEL
00024
00025 #define QUEUE_ALLOWED 1
00026
00027 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00028
00029 namespace TAO_Notify
00030 {
00031
00032
00033
00034 Routing_Slip_Queue Routing_Slip::persistent_queue_(QUEUE_ALLOWED);
00035
00036 TAO_SYNCH_MUTEX Routing_Slip::sequence_lock_;
00037 int Routing_Slip::routing_slip_sequence_= 0;
00038 size_t Routing_Slip::count_enter_transient_ = 0;
00039 size_t Routing_Slip::count_continue_transient_ = 0;
00040 size_t Routing_Slip::count_enter_reloaded_ = 0;
00041 size_t Routing_Slip::count_enter_new_ = 0;
00042 size_t Routing_Slip::count_continue_new_ = 0;
00043 size_t Routing_Slip::count_enter_complete_while_new_ = 0;
00044 size_t Routing_Slip::count_enter_saving_ = 0;
00045 size_t Routing_Slip::count_enter_saved_ = 0;
00046 size_t Routing_Slip::count_enter_updating_ = 0;
00047 size_t Routing_Slip::count_enter_changed_while_saving_ = 0;
00048 size_t Routing_Slip::count_continue_changed_while_saving_ = 0;
00049 size_t Routing_Slip::count_enter_changed_ = 0;
00050 size_t Routing_Slip::count_continue_changed_ = 0;
00051 size_t Routing_Slip::count_enter_complete_ = 0;
00052 size_t Routing_Slip::count_enter_deleting_ = 0;
00053 size_t Routing_Slip::count_enter_terminal_ = 0;
00054
00055 Routing_Slip_Ptr
00056 Routing_Slip::create (const TAO_Notify_Event::Ptr& event ACE_ENV_ARG_DECL)
00057 {
00058 Routing_Slip * prs;
00059 ACE_NEW_THROW_EX (prs, Routing_Slip (event), CORBA::NO_MEMORY ());
00060 ACE_CHECK_RETURN (Routing_Slip_Ptr());
00061 Routing_Slip_Ptr result(prs);
00062 result->this_ptr_ = result;
00063
00064
00065 if (DEBUG_LEVEL > 8 && ((result->sequence_ % 100) == 0))
00066 {
00067 ACE_ERROR ((LM_ERROR,
00068 ACE_TEXT ("(%P|%t) Routing_Slip_Statistics\n")
00069 ACE_TEXT (" enter_transient \t%d\n")
00070 ACE_TEXT (" continue_transient \t%d\n")
00071 ACE_TEXT (" enter_reloaded \t%d\n")
00072 ACE_TEXT (" enter_new \t%d\n")
00073 ACE_TEXT (" continue_new \t%d\n")
00074 ACE_TEXT (" enter_complete_while_new \t%d\n")
00075 ACE_TEXT (" enter_saving \t%d\n")
00076 ACE_TEXT (" enter_saved \t%d\n")
00077 ACE_TEXT (" enter_updating \t%d\n")
00078 ACE_TEXT (" enter_changed_while_saving \t%d\n")
00079 ACE_TEXT (" continue_changed_while_saving\t%d\n")
00080 ACE_TEXT (" enter_changed \t%d\n")
00081 ACE_TEXT (" continue_changed \t%d\n")
00082 ACE_TEXT (" enter_complete \t%d\n")
00083 ACE_TEXT (" enter_deleting \t%d\n")
00084 ACE_TEXT (" enter_terminal \t%d\n")
00085 , static_cast<int> (count_enter_transient_)
00086 , static_cast<int> (count_continue_transient_)
00087 , static_cast<int> (count_enter_reloaded_)
00088 , static_cast<int> (count_enter_new_)
00089 , static_cast<int> (count_continue_new_)
00090 , static_cast<int> (count_enter_complete_while_new_)
00091 , static_cast<int> (count_enter_saving_)
00092 , static_cast<int> (count_enter_saved_)
00093 , static_cast<int> (count_enter_updating_)
00094 , static_cast<int> (count_enter_changed_while_saving_)
00095 , static_cast<int> (count_continue_changed_while_saving_)
00096 , static_cast<int> (count_enter_changed_)
00097 , static_cast<int> (count_continue_changed_)
00098 , static_cast<int> (count_enter_complete_)
00099 , static_cast<int> (count_enter_deleting_)
00100 , static_cast<int> (count_enter_terminal_)
00101 ));
00102 }
00103 return result;
00104 }
00105
00106
00107 Routing_Slip_Ptr
00108 Routing_Slip::create (
00109 TAO_Notify_EventChannelFactory & ecf,
00110 Routing_Slip_Persistence_Manager * rspm)
00111 {
00112 Routing_Slip_Ptr result;
00113 ACE_Message_Block * event_mb = 0;
00114 ACE_Message_Block * rs_mb = 0;
00115 ACE_DECLARE_NEW_ENV;
00116 ACE_TRY
00117 {
00118 if (rspm->reload (event_mb, rs_mb))
00119 {
00120 TAO_InputCDR cdr_event (event_mb);
00121 TAO_Notify_Event::Ptr event (TAO_Notify_Event::unmarshal (cdr_event));
00122 if (event.isSet())
00123 {
00124 result = create (event ACE_ENV_ARG_PARAMETER);
00125 ACE_TRY_CHECK;
00126 TAO_InputCDR cdr_rs (rs_mb);
00127 if ( result->unmarshal (ecf, cdr_rs))
00128 {
00129 result->set_rspm (rspm);
00130 }
00131 else
00132 {
00133 ACE_ERROR ((LM_ERROR,
00134 ACE_TEXT ("(%P|%t) Routing_Slip::create: Unmarshalling failed for routing slip.\n")
00135 ));
00136 result.reset ();
00137 }
00138 }
00139 else
00140 {
00141 ACE_ERROR ((LM_ERROR,
00142 ACE_TEXT ("(%P|%t) Routing_Slip::create: Unmarshalling failed for event.\n")
00143 ));
00144 }
00145 }
00146 }
00147 ACE_CATCHANY
00148 {
00149 ACE_ERROR ((LM_ERROR,
00150 ACE_TEXT ("(%P|%t) Routing_Slip::create: Exception reloading event.\n")
00151 ));
00152 }
00153 ACE_ENDTRY;
00154 delete event_mb;
00155 delete rs_mb;
00156
00157 return result;
00158 }
00159
00160 void
00161 Routing_Slip::set_rspm (Routing_Slip_Persistence_Manager * rspm)
00162 {
00163 this->rspm_ = rspm;
00164 if (rspm_ != 0)
00165 {
00166 rspm->set_callback (this);
00167 }
00168 }
00169
00170 Routing_Slip::Routing_Slip(
00171 const TAO_Notify_Event::Ptr& event)
00172 : is_safe_ (false)
00173 , until_safe_ (internals_)
00174 , this_ptr_ (0)
00175 , event_(event)
00176 , state_ (rssCREATING)
00177 , complete_requests_ (0)
00178 , rspm_ (0)
00179 {
00180 Routing_Slip_Guard guard (sequence_lock_);
00181 this->sequence_ = ++routing_slip_sequence_;
00182 if (DEBUG_LEVEL > 1) ACE_DEBUG ((LM_DEBUG,
00183 ACE_TEXT ("(%P|%t) Routing Slip #%d: constructor\n"),
00184 this->sequence_
00185 ));
00186 }
00187
00188 Routing_Slip::~Routing_Slip ()
00189 {
00190 if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
00191 ACE_TEXT ("(%P|%t) Routing Slip #%d: destructor\n"),
00192 this->sequence_
00193 ));
00194 }
00195
00196 bool
00197 Routing_Slip::create_persistence_manager()
00198 {
00199 if (this->rspm_ == 0)
00200 {
00201 Event_Persistence_Strategy * strategy =
00202 ACE_Dynamic_Service <TAO_Notify::Event_Persistence_Strategy>::instance ("Event_Persistence");
00203 if (strategy != 0)
00204 {
00205 Event_Persistence_Factory * factory = strategy->get_factory ();
00206 if (factory != 0)
00207 {
00208 set_rspm (factory->create_routing_slip_persistence_manager(this));
00209 }
00210 }
00211 }
00212 return this->rspm_ != 0;
00213 }
00214
00215 const TAO_Notify_Event::Ptr &
00216 Routing_Slip::event () const
00217 {
00218 return this->event_;
00219 }
00220
00221 void
00222 Routing_Slip::wait_persist ()
00223 {
00224 Routing_Slip_Guard guard (this->internals_);
00225 while (!this->is_safe_)
00226 {
00227 this->until_safe_.wait ();
00228 }
00229 }
00230
00231 void
00232 Routing_Slip::route (TAO_Notify_ProxyConsumer* pc, bool reliable_channel ACE_ENV_ARG_DECL)
00233 {
00234 ACE_ASSERT(pc != 0);
00235
00236 TAO_Notify_ProxyConsumer::Ptr pcgrd(pc);
00237
00238 Routing_Slip_Guard guard (this->internals_);
00239
00240 size_t request_id = delivery_requests_.size ();
00241
00242 if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
00243 ACE_TEXT ("(%P|%t) Routing Slip #%d: add Delivery_Request #%d: lookup, completed %d of %d\n"),
00244 this->sequence_,
00245 static_cast<int> (request_id),
00246 static_cast<int> (this->complete_requests_),
00247 static_cast<int> (this->delivery_requests_.size ())
00248 ));
00249
00250 Delivery_Request_Ptr request (new Delivery_Request (this->this_ptr_, request_id));
00251 this->delivery_requests_.push_back (request);
00252 TAO_Notify_Method_Request_Lookup_Queueable method (request, pc);
00253
00254 if (this->state_ == rssCREATING)
00255 {
00256 if (! reliable_channel)
00257 {
00258 enter_state_transient (guard);
00259 }
00260 else if (ACE_Dynamic_Service <TAO_Notify::Event_Persistence_Strategy>::instance ("Event_Persistence") == 0)
00261 {
00262 enter_state_transient (guard);
00263 }
00264 else if (! this->event_->reliable().is_valid())
00265 {
00266 enter_state_new (guard);
00267 }
00268 else if (this->event_->reliable().value() == CosNotification::Persistent)
00269 {
00270 enter_state_new (guard);
00271 }
00272 else
00273 {
00274 enter_state_transient (guard);
00275 }
00276 }
00277 guard.release ();
00278 pc->execute_task (method ACE_ENV_ARG_PARAMETER);
00279 }
00280 #if 0 // forward
00281 void
00282 Routing_Slip::forward (TAO_Notify_ProxySupplier* ps, bool filter)
00283 {
00284
00285 ACE_ASSERT (this->state_ == rssCREATING);
00286
00287 TAO_Notify_ProxySupplier::Ptr psgrd(ps);
00288 Routing_Slip_Guard guard (this->internals_);
00289
00290 enter_state_transient (guard);
00291 size_t request_id = delivery_requests_.size ();
00292
00293 if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
00294 ACE_TEXT ("(%P|%t) Routing Slip #%d: add Delivery_Request #%d: Forward %s; completed %d of %d\n"),
00295 this->sequence_,
00296 static_cast<int> (request_id),
00297 filter ? ACE_TEXT ("Filter") : ACE_TEXT ("No Filter"),
00298 static_cast<int> (this->complete_requests_),
00299 static_cast<int> (this->delivery_requests_.size ())
00300 ));
00301
00302 Delivery_Request_Ptr request (new Delivery_Request (this->this_ptr_, request_id));
00303 if (! ps->has_shutdown() )
00304 {
00305 this->delivery_requests_.push_back (request);
00306
00307 TAO_Notify_Method_Request_Dispatch_No_Copy method (request, ps, filter);
00308 guard.release ();
00309 if (DEBUG_LEVEL > 8)
00310 ACE_DEBUG ((LM_DEBUG,
00311 "(%P|%t) Routing Slip #%d: dispatching Delivery_Request %d to "
00312 "proxy supplier %d\n",
00313 this->sequence_,
00314 static_cast<int> (request_id),
00315 ps->id()));
00316 ps->worker_task()->execute (method);
00317 }
00318 else
00319 {
00320 if (DEBUG_LEVEL > 5)
00321 ACE_DEBUG ((LM_DEBUG,
00322 "(%P|%t) Routing Slip #%d: not dispatching Delivery_Request %d to "
00323 "proxy supplier %d; already shut down\n",
00324 this->sequence_,
00325 static_cast<int> (request_id),
00326 ps->id()));
00327 }
00328 }
00329 #endif // forward
00330
00331 void
00332 Routing_Slip::dispatch (
00333 TAO_Notify_ProxySupplier* ps,
00334 bool filter
00335 ACE_ENV_ARG_DECL)
00336 {
00337
00338 ACE_ASSERT (this->state_ != rssCREATING);
00339
00340 TAO_Notify_ProxySupplier::Ptr psgrd(ps);
00341 Routing_Slip_Guard guard (this->internals_);
00342
00343 size_t request_id = delivery_requests_.size ();
00344
00345 if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
00346 ACE_TEXT ("(%P|%t) Routing Slip #%d: add Delivery_Request #%d: Dispatch %s; completed %d of %d\n"),
00347 this->sequence_,
00348 static_cast<int> (request_id),
00349 filter ? ACE_TEXT ("Filter") : ACE_TEXT ("No Filter"),
00350 static_cast<int> (this->complete_requests_),
00351 static_cast<int> (this->delivery_requests_.size ())
00352 ));
00353
00354 Delivery_Request_Ptr request (new Delivery_Request (this->this_ptr_, request_id));
00355 if (! ps->has_shutdown() )
00356 {
00357 this->delivery_requests_.push_back (request);
00358 TAO_Notify_Method_Request_Dispatch_No_Copy method (request, ps, filter);
00359 guard.release ();
00360 if (DEBUG_LEVEL > 8)
00361 ACE_DEBUG ((LM_DEBUG,
00362 "(%P|%t) Routing Slip #%d: dispatching Delivery_Request %d to "
00363 "proxy supplier %d\n",
00364 this->sequence_,
00365 static_cast<int> (request_id),
00366 ps->id()));
00367 ps->execute_task (method ACE_ENV_ARG_PARAMETER);
00368 ACE_CHECK;
00369 }
00370 else
00371 {
00372 if (DEBUG_LEVEL > 5)
00373 ACE_DEBUG ((LM_DEBUG,
00374 "(%P|%t) Routing Slip #%d: not dispatching Delivery_Request %d to "
00375 "proxy supplier %d; already shut down\n",
00376 this->sequence_,
00377 static_cast<int> (request_id),
00378 ps->id()));
00379 }
00380 }
00381
00382
00383
00384
00385 void
00386 Routing_Slip::delivery_request_complete (size_t request_id)
00387 {
00388 Routing_Slip_Guard guard (this->internals_);
00389 ACE_ASSERT (request_id < this->delivery_requests_.size ());
00390
00391 this->delivery_requests_[request_id].reset ();
00392 this->complete_requests_ += 1;
00393
00394 if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
00395 ACE_TEXT ("(%P|%t) Routing Slip #%d: delivery_request_complete #%d: completed %d of %d\n"),
00396 this->sequence_,
00397 static_cast<int> (request_id),
00398 static_cast<int> (this->complete_requests_),
00399 static_cast<int> (this->delivery_requests_.size ())
00400 ));
00401 State state = this->state_;
00402 switch (state)
00403 {
00404 case rssTRANSIENT:
00405 {
00406 continue_state_transient (guard);
00407 break;
00408 }
00409 case rssNEW:
00410 {
00411 continue_state_new (guard);
00412 break;
00413 }
00414 case rssSAVING:
00415 {
00416 enter_state_changed_while_saving (guard);
00417 break;
00418 }
00419 case rssUPDATING:
00420 {
00421 enter_state_changed_while_saving (guard);
00422 break;
00423 }
00424 case rssSAVED:
00425 {
00426 enter_state_changed (guard);
00427 break;
00428 }
00429 case rssCHANGED_WHILE_SAVING:
00430 {
00431 continue_state_changed_while_saving (guard);
00432 break;
00433 }
00434 case rssCHANGED:
00435 {
00436 continue_state_changed (guard);
00437 break;
00438 }
00439 default:
00440 {
00441 ACE_ERROR ((LM_ERROR,
00442 ACE_TEXT ("(%P|%t) Notification Service Routing Slip: Unexpected delivery_request_complete in state %d\n"),
00443 static_cast<int> (this->state_)
00444 ));
00445 break;
00446 }
00447 }
00448 }
00449
00450 void
00451 Routing_Slip::at_front_of_persist_queue ()
00452 {
00453 Routing_Slip_Guard guard (this->internals_);
00454 State state = this->state_;
00455 switch (state)
00456 {
00457 case rssNEW:
00458 {
00459 if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
00460 ACE_TEXT ("(%P|%t) Routing Slip #%d: NEW Reached front of queue\n"),
00461 this->sequence_
00462 ));
00463 enter_state_saving (guard);
00464 break;
00465 }
00466 case rssCOMPLETE_WHILE_NEW:
00467 {
00468 if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
00469 ACE_TEXT ("(%P|%t) Routing Slip #%d: COMPLETE_WHILE_NEW Reached front of queue\n"),
00470 this->sequence_
00471 ));
00472 this->persistent_queue_.complete ();
00473 enter_state_terminal (guard);
00474 break;
00475 }
00476 case rssCHANGED:
00477 {
00478 if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
00479 ACE_TEXT ("(%P|%t) Routing Slip #%d: CHANGED Reached front of queue\n"),
00480 this->sequence_
00481 ));
00482 enter_state_updating (guard);
00483 break;
00484 }
00485 case rssCOMPLETE:
00486 {
00487 if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
00488 ACE_TEXT ("(%P|%t) Routing Slip #%d: COMPLETE Reached front of queue\n"),
00489 this->sequence_
00490 ));
00491 enter_state_deleting (guard);
00492 break;
00493 }
00494 default:
00495 {
00496 ACE_ERROR ((LM_ERROR,
00497 ACE_TEXT ("(%P|%t) Routing Slip %d: Unexpected at_front_of_persist_queue in state %d\n"),
00498 this->sequence_,
00499 static_cast<int> (this->state_)
00500 ));
00501 break;
00502 }
00503 }
00504 }
00505
00506 void
00507 Routing_Slip::persist_complete ()
00508 {
00509
00510 Routing_Slip_Ptr me(this->this_ptr_);
00511 Routing_Slip_Guard guard (this->internals_);
00512 ACE_ASSERT (guard.locked ());
00513
00514
00515 if (! is_safe_)
00516 {
00517 is_safe_ = true;
00518 this->until_safe_.signal ();
00519 }
00520
00521 State state = this->state_;
00522 switch (state)
00523 {
00524 case rssSAVING:
00525 {
00526 if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
00527 ACE_TEXT ("(%P|%t) Routing Slip #%d: SAVING persist complete\n"),
00528 this->sequence_
00529 ));
00530 enter_state_saved(guard);
00531 break;
00532 }
00533 case rssCHANGED_WHILE_SAVING:
00534 {
00535 enter_state_changed (guard);
00536 break;
00537 }
00538 case rssUPDATING:
00539 {
00540 if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
00541 ACE_TEXT ("(%P|%t) Routing Slip #%d: UPDATING persist complete\n"),
00542 this->sequence_
00543 ));
00544 enter_state_saved (guard);
00545 break;
00546 }
00547 case rssDELETING:
00548 {
00549 if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
00550 ACE_TEXT ("(%P|%t) Routing Slip #%d: DELETING persist complete\n"),
00551 this->sequence_
00552 ));
00553 enter_state_terminal (guard);
00554 break;
00555 }
00556 default:
00557 {
00558 ACE_ERROR ((LM_ERROR,
00559 ACE_TEXT ("(%P|%t) Notification Service Routing Slip: Unexpected transition in state %d\n"),
00560 static_cast<int> (this->state_)
00561 ));
00562 break;
00563 }
00564 }
00565 this->persistent_queue_.complete ();
00566 }
00567
00568
00569
00570
00571 bool
00572 Routing_Slip::all_deliveries_complete () const
00573 {
00574 return this->complete_requests_ == this->delivery_requests_.size ();
00575 }
00576
00577 void
00578 Routing_Slip::add_to_persist_queue(Routing_Slip_Guard & guard)
00579 {
00580 guard.release ();
00581 this->persistent_queue_.add (this->this_ptr_);
00582 guard.acquire ();
00583 }
00584
00585
00586
00587
00588 void
00589 Routing_Slip::enter_state_new (Routing_Slip_Guard & guard)
00590 {
00591 ++count_enter_new_;
00592 if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
00593 ACE_TEXT ("(%P|%t) Routing Slip #%d: enter state NEW\n"),
00594 this->sequence_
00595 ));
00596 this->state_ = rssNEW;
00597 add_to_persist_queue(guard);
00598 }
00599
00600 void
00601 Routing_Slip::continue_state_new (Routing_Slip_Guard & guard)
00602 {
00603 ++count_continue_new_;
00604 if (all_deliveries_complete ())
00605 {
00606 this->enter_state_complete_while_new (guard);
00607 }
00608 }
00609 void
00610 Routing_Slip::enter_state_complete_while_new (Routing_Slip_Guard & guard)
00611 {
00612 ++count_enter_complete_while_new_;
00613 ACE_UNUSED_ARG (guard);
00614 if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
00615 ACE_TEXT ("(%P|%t) Routing Slip #%d: enter state COMPLETE_WHILE_NEW\n"),
00616 this->sequence_
00617 ));
00618
00619 if (! is_safe_)
00620 {
00621 is_safe_ = true;
00622 this->until_safe_.signal ();
00623 }
00624 this->state_ = rssCOMPLETE_WHILE_NEW;
00625 }
00626
00627 void
00628 Routing_Slip::enter_state_reloaded (Routing_Slip_Guard & guard)
00629 {
00630 ++count_enter_reloaded_;
00631 ACE_UNUSED_ARG (guard);
00632 if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
00633 ACE_TEXT ("(%P|%t) Routing Slip #&d: enter state RELOADED\n"),
00634 this->sequence_
00635 ));
00636 this->state_ = rssRELOADED;
00637 }
00638
00639 void
00640 Routing_Slip::enter_state_transient (Routing_Slip_Guard & guard)
00641 {
00642 ++count_enter_transient_;
00643 ACE_UNUSED_ARG (guard);
00644 if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
00645 ACE_TEXT ("(%P|%t) Routing Slip #%d: enter state TRANSIENT\n"),
00646 this->sequence_
00647 ));
00648 this->state_ = rssTRANSIENT;
00649 if (! is_safe_)
00650 {
00651 is_safe_ = true;
00652 this->until_safe_.signal ();
00653 }
00654 if (all_deliveries_complete ())
00655 {
00656 enter_state_terminal (guard);
00657 }
00658 }
00659
00660 void
00661 Routing_Slip::continue_state_transient (Routing_Slip_Guard & guard)
00662 {
00663 ++count_continue_transient_;
00664 if (all_deliveries_complete ())
00665 {
00666 enter_state_terminal (guard);
00667 }
00668 }
00669 void
00670 Routing_Slip::enter_state_saving (Routing_Slip_Guard & guard)
00671 {
00672 ++count_enter_saving_;
00673 if (!create_persistence_manager ())
00674 {
00675
00676
00677 this->persistent_queue_.complete ();
00678 enter_state_transient (guard);
00679 }
00680 else
00681 {
00682 if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
00683 ACE_TEXT ("(%P|%t) Routing Slip #%d: enter state SAVING\n"),
00684 this->sequence_
00685 ));
00686 this->state_ = rssSAVING;
00687
00688 TAO_OutputCDR event_cdr;
00689 this->event_->marshal (event_cdr);
00690
00691 const ACE_Message_Block *event_mb = event_cdr.begin ();
00692 TAO_OutputCDR rs_cdr;
00693 marshal (rs_cdr);
00694 const ACE_Message_Block *rs_mb = rs_cdr.begin ();
00695
00696 guard.release ();
00697 this->rspm_->store (*event_mb, *rs_mb);
00698
00699 guard.acquire ();
00700 }
00701 }
00702
00703 void
00704 Routing_Slip::enter_state_saved (Routing_Slip_Guard & guard)
00705 {
00706 ++count_enter_saved_;
00707 ACE_UNUSED_ARG (guard);
00708 if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
00709 ACE_TEXT ("(%P|%t) Routing Slip #%d: enter state SAVED\n"),
00710 this->sequence_
00711 ));
00712 this->state_ = rssSAVED;
00713 }
00714
00715 void
00716 Routing_Slip::enter_state_updating (Routing_Slip_Guard & guard)
00717 {
00718 ++count_enter_updating_;
00719 if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
00720 ACE_TEXT ("(%P|%t) Routing Slip #%d: enter state UPDATING\n"),
00721 this->sequence_
00722 ));
00723 this->state_ = rssUPDATING;
00724
00725 TAO_OutputCDR rs_cdr;
00726 marshal (rs_cdr);
00727 const ACE_Message_Block *rs_mb = rs_cdr.begin ();
00728 guard.release ();
00729
00730 ACE_ASSERT (this->rspm_ != 0);
00731 this->rspm_->update (*rs_mb);
00732 guard.acquire ();
00733 }
00734
00735
00736 void
00737 Routing_Slip::enter_state_changed_while_saving (Routing_Slip_Guard & guard)
00738 {
00739 ++count_enter_changed_while_saving_;
00740 ACE_UNUSED_ARG (guard);
00741 if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
00742 ACE_TEXT ("(%P|%t) Routing Slip #%d: enter state CHANGED_WHILE_SAVING\n"),
00743 this->sequence_
00744 ));
00745 this->state_ = rssCHANGED_WHILE_SAVING;
00746 }
00747
00748 void
00749 Routing_Slip::continue_state_changed_while_saving (Routing_Slip_Guard & guard)
00750 {
00751 ACE_UNUSED_ARG (guard);
00752
00753 }
00754
00755 void
00756 Routing_Slip::enter_state_changed (Routing_Slip_Guard & guard)
00757 {
00758 ++count_enter_changed_;
00759 if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
00760 ACE_TEXT ("(%P|%t) Routing Slip #%d: enter state CHANGED\n"),
00761 this->sequence_
00762 ));
00763
00764
00765 this->state_ = rssCHANGED;
00766 if (all_deliveries_complete ())
00767 {
00768 enter_state_complete (guard);
00769 }
00770 add_to_persist_queue (guard);
00771 }
00772
00773 void
00774 Routing_Slip::continue_state_changed (Routing_Slip_Guard & guard)
00775 {
00776 ++count_continue_changed_;
00777 if (all_deliveries_complete ())
00778 {
00779 enter_state_complete (guard);
00780 }
00781 }
00782
00783 void
00784 Routing_Slip::enter_state_complete (Routing_Slip_Guard & guard)
00785 {
00786 ++count_enter_complete_;
00787 ACE_UNUSED_ARG (guard);
00788 if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
00789 ACE_TEXT ("(%P|%t) Routing Slip #%d: enter state COMPLETE\n"),
00790 this->sequence_
00791 ));
00792 this->state_ = rssCOMPLETE;
00793 }
00794
00795 void
00796 Routing_Slip::enter_state_deleting (Routing_Slip_Guard & guard)
00797 {
00798 ++count_enter_deleting_;
00799 if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
00800 ACE_TEXT ("(%P|%t) Routing Slip #%d: enter state DELETING\n"),
00801 this->sequence_
00802 ));
00803 this->state_ = rssDELETING;
00804 guard.release ();
00805 this->rspm_->remove ();
00806 guard.acquire ();
00807 }
00808
00809 void
00810 Routing_Slip::enter_state_terminal (Routing_Slip_Guard & guard)
00811 {
00812 ++count_enter_terminal_;
00813 ACE_UNUSED_ARG (guard);
00814 ACE_ASSERT( this->is_safe_);
00815 if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
00816 ACE_TEXT ("(%P|%t) Routing Slip #%d: enter state TERMINAL\n"),
00817 this->sequence_
00818 ));
00819 this->state_ = rssTERMINAL;
00820 this->this_ptr_.reset ();
00821 }
00822
00823 void
00824 Routing_Slip::marshal (TAO_OutputCDR & cdr)
00825 {
00826 size_t request_count = this->delivery_requests_.size();
00827 cdr.write_ulong (request_count - this->complete_requests_);
00828 for (size_t nreq = 0; nreq < request_count; ++nreq)
00829 {
00830 Delivery_Request * request = this->delivery_requests_[nreq].get ();
00831 if (request != 0)
00832 {
00833 request->marshal (cdr);
00834 }
00835 }
00836 }
00837
00838 bool
00839 Routing_Slip::unmarshal (TAO_Notify_EventChannelFactory &ecf, TAO_InputCDR & cdr)
00840 {
00841 CORBA::ULong count = 0;
00842 cdr.read_ulong (count);
00843 for (size_t nreq = 0; nreq < count; ++nreq)
00844 {
00845 ACE_CDR::Octet code = 0;
00846 while (cdr.read_octet(code))
00847 {
00848 ACE_DECLARE_NEW_ENV;
00849 ACE_TRY
00850 {
00851 if (code == TAO_Notify_Method_Request_Dispatch::persistence_code)
00852 {
00853 Delivery_Request * prequest;
00854 ACE_NEW_THROW_EX (
00855 prequest,
00856 Delivery_Request(this_ptr_, this->delivery_requests_.size ()),
00857 CORBA::NO_MEMORY ());
00858 ACE_TRY_CHECK;
00859 Delivery_Request_Ptr request(prequest);
00860 TAO_Notify_Method_Request_Dispatch_Queueable * method =
00861 TAO_Notify_Method_Request_Dispatch::unmarshal (
00862 request,
00863 ecf,
00864 cdr
00865 ACE_ENV_ARG_PARAMETER);
00866 ACE_TRY_CHECK;
00867 if (method != 0)
00868 {
00869 this->delivery_requests_.push_back (request);
00870 this->delivery_methods_.push_back (method);
00871 }
00872 }
00873 else if (code == TAO_Notify_Method_Request_Lookup::persistence_code)
00874 {
00875 Delivery_Request_Ptr request(new Delivery_Request(this_ptr_, this->delivery_requests_.size ()));
00876 TAO_Notify_Method_Request_Lookup_Queueable * method =
00877 TAO_Notify_Method_Request_Lookup::unmarshal (
00878 request,
00879 ecf,
00880 cdr
00881 ACE_ENV_ARG_PARAMETER);
00882 ACE_TRY_CHECK
00883 if (method != 0)
00884 {
00885 this->delivery_requests_.push_back (request);
00886 this->delivery_methods_.push_back (method);
00887 }
00888 }
00889 }
00890 ACE_CATCHANY;
00891 {
00892
00893
00894 }
00895 ACE_ENDTRY;
00896 }
00897 }
00898 return this->delivery_requests_.size () > 0;
00899 }
00900
00901 void
00902 Routing_Slip::reconnect (ACE_ENV_SINGLE_ARG_DECL)
00903 {
00904 Routing_Slip_Guard guard (this->internals_);
00905 enter_state_saved (guard);
00906 guard.release ();
00907
00908 size_t count = this->delivery_methods_.size ();
00909 for (size_t nmethod = 0; nmethod < count; ++nmethod)
00910 {
00911 this->delivery_methods_[nmethod]->execute (ACE_ENV_SINGLE_ARG_PARAMETER);
00912 ACE_CHECK;
00913 }
00914 this->delivery_methods_.clear ();
00915 }
00916
00917 int
00918 Routing_Slip::sequence() const
00919 {
00920 return this->sequence_;
00921 }
00922
00923 bool
00924 Routing_Slip::should_retry () const
00925 {
00926
00927
00928 return this->state_ != rssTRANSIENT;
00929 }
00930
00931 }
00932
00933 TAO_END_VERSIONED_NAMESPACE_DECL