00001
00002
00003 #include "orbsvcs/Notify/MonitorControlExt/MonitorEventChannel.h"
00004
00005 #include "ace/Monitor_Point_Registry.h"
00006
00007 #include "orbsvcs/Notify/MonitorControlExt/MonitorConsumerAdmin.h"
00008 #include "orbsvcs/Notify/MonitorControlExt/MonitorSupplierAdmin.h"
00009 #include "orbsvcs/Notify/MonitorControl/Dynamic_Statistic.h"
00010 #include "orbsvcs/Notify/MonitorControl/Timestamp_Monitor.h"
00011 #include "orbsvcs/Notify/MonitorControl/Control_Registry.h"
00012 #include "orbsvcs/Notify/Buffering_Strategy.h"
00013 #include "orbsvcs/Notify/ProxySupplier.h"
00014 #include "orbsvcs/Notify/ProxyConsumer.h"
00015 #include "orbsvcs/Notify/ThreadPool_Task.h"
00016
00017 #if defined (TAO_HAS_MONITOR_FRAMEWORK) && (TAO_HAS_MONITOR_FRAMEWORK == 1)
00018
00019 using namespace ACE_VERSIONED_NAMESPACE_NAME::ACE::Monitor_Control;
00020
00021 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00022
00023
00024
00025
00026
00027 class EventChannelConsumersSuppliers
00028 : public TAO_Dynamic_Statistic<TAO_MonitorEventChannel>
00029 {
00030 public:
00031 EventChannelConsumersSuppliers (TAO_MonitorEventChannel* ec,
00032 const ACE_CString& name,
00033 Monitor_Control_Types::Information_Type type,
00034 bool is_supplier = false)
00035 : TAO_Dynamic_Statistic<TAO_MonitorEventChannel> (ec,
00036 name.c_str (),
00037 type),
00038 is_supplier_ (is_supplier)
00039 {
00040 }
00041
00042 virtual void update (void) {
00043 if (this->type () == Monitor_Control_Types::MC_LIST)
00044 {
00045 Monitor_Control_Types::NameList list;
00046 if (this->is_supplier_)
00047 this->interf_->get_suppliers (&list);
00048 else
00049 this->interf_->get_consumers (&list);
00050 this->receive (list);
00051 }
00052 else
00053 {
00054 if (this->is_supplier_)
00055 this->receive (this->interf_->get_suppliers (0));
00056 else
00057 this->receive (this->interf_->get_consumers (0));
00058 }
00059 }
00060
00061 private:
00062 bool is_supplier_;
00063 };
00064
00065 class EventChannelTimedoutConsumers
00066 : public TAO_Dynamic_Statistic<TAO_MonitorEventChannel>
00067 {
00068 public:
00069 EventChannelTimedoutConsumers (TAO_MonitorEventChannel* ec,
00070 const ACE_CString& name,
00071 Monitor_Control_Types::Information_Type type)
00072 : TAO_Dynamic_Statistic<TAO_MonitorEventChannel> (ec,
00073 name.c_str (),
00074 type)
00075 {
00076 }
00077
00078 virtual void update (void) {
00079 Monitor_Control_Types::NameList list;
00080 this->interf_->get_timedout_consumers (&list);
00081 this->receive (list);
00082 }
00083 };
00084
00085 class EventChannelConsumerSupplierAdmins
00086 : public TAO_Dynamic_Statistic<TAO_MonitorEventChannel>
00087 {
00088 public:
00089 EventChannelConsumerSupplierAdmins (TAO_MonitorEventChannel* ec,
00090 const ACE_CString& name,
00091 Monitor_Control_Types::Information_Type type,
00092 bool is_supplier = false)
00093 : TAO_Dynamic_Statistic<TAO_MonitorEventChannel> (ec,
00094 name.c_str (),
00095 type),
00096 is_supplier_ (is_supplier)
00097 {
00098 }
00099
00100 virtual void update (void)
00101 {
00102 if (this->type () == Monitor_Control_Types::MC_LIST)
00103 {
00104 Monitor_Control_Types::NameList list;
00105
00106 if (this->is_supplier_)
00107 {
00108 this->interf_->get_supplieradmins (&list);
00109 }
00110 else
00111 {
00112 this->interf_->get_consumeradmins (&list);
00113 }
00114
00115 this->receive (list);
00116 }
00117 else
00118 {
00119 if (this->is_supplier_)
00120 {
00121 this->receive (this->interf_->get_supplieradmins (0));
00122 }
00123 else
00124 {
00125 this->receive (this->interf_->get_consumeradmins (0));
00126 }
00127 }
00128 }
00129
00130 private:
00131 bool is_supplier_;
00132 };
00133
00134 class QueuedEvents
00135 : public TAO_Dynamic_Statistic<TAO_MonitorEventChannel>
00136 {
00137 public:
00138 QueuedEvents (TAO_MonitorEventChannel* ec,
00139 const ACE_CString& name,
00140 Monitor_Control_Types::Information_Type type,
00141 bool count)
00142 : TAO_Dynamic_Statistic<TAO_MonitorEventChannel> (ec,
00143 name.c_str (),
00144 type),
00145 count_ (count)
00146 {
00147 }
00148
00149 virtual void update (void)
00150 {
00151 this->receive (this->interf_->calculate_queue_size (this->count_));
00152 }
00153
00154 private:
00155 bool count_;
00156 };
00157
00158 class OldestEvent
00159 : public TAO_Dynamic_Statistic<TAO_MonitorEventChannel>
00160 {
00161 public:
00162 OldestEvent (TAO_MonitorEventChannel* ec,
00163 const ACE_CString& name,
00164 Monitor_Control_Types::Information_Type type)
00165 : TAO_Dynamic_Statistic<TAO_MonitorEventChannel> (ec,
00166 name.c_str (),
00167 type)
00168 {
00169 }
00170
00171 virtual void update (void)
00172 {
00173 this->receive (this->interf_->get_oldest_event ());
00174 }
00175 };
00176
00177 class SlowestConsumers
00178 : public TAO_Dynamic_Statistic<TAO_MonitorEventChannel>
00179 {
00180 public:
00181 SlowestConsumers (TAO_MonitorEventChannel* ec,
00182 const ACE_CString& name,
00183 Monitor_Control_Types::Information_Type type)
00184 : TAO_Dynamic_Statistic<TAO_MonitorEventChannel> (ec,
00185 name.c_str (),
00186 type)
00187 {
00188 }
00189
00190 virtual void update (void)
00191 {
00192 Monitor_Control_Types::NameList list;
00193 this->interf_->determine_slowest_consumer (&list);
00194 this->receive (list);
00195 }
00196 };
00197
00198 class QueueOverflows:
00199 public Monitor_Base
00200 {
00201 public:
00202 QueueOverflows (const ACE_CString& name)
00203 : Monitor_Base (
00204 name.c_str (),
00205 Monitor_Control_Types::MC_COUNTER) {
00206 }
00207 };
00208
00209 class ShutdownControl : public TAO_NS_Control
00210 {
00211 public:
00212 ShutdownControl (TAO_MonitorEventChannel* ec,
00213 const ACE_CString& name)
00214 : TAO_NS_Control (name.c_str ()),
00215 ec_ (ec)
00216 {
00217 }
00218
00219 virtual bool execute (const char* command)
00220 {
00221 if (ACE_OS::strcmp (command, TAO_NS_CONTROL_SHUTDOWN) == 0)
00222 {
00223 this->ec_->destroy ();
00224 }
00225 else
00226 {
00227 return false;
00228 }
00229
00230 return true;
00231 }
00232
00233 private:
00234 TAO_MonitorEventChannel* ec_;
00235 };
00236
00237 class RemoveConsumerSupplierControl : public TAO_NS_Control
00238 {
00239 public:
00240 RemoveConsumerSupplierControl (TAO_MonitorEventChannel* ec,
00241 const ACE_CString& name,
00242 CosNotifyChannelAdmin::ProxyID id,
00243 bool is_supplier)
00244 : TAO_NS_Control (name.c_str ()),
00245 ec_ (ec),
00246 id_ (id),
00247 is_supplier_ (is_supplier)
00248 {
00249 }
00250
00251 virtual bool execute (const char* command)
00252 {
00253 bool status = true;
00254
00255 if (this->is_supplier_)
00256 {
00257 if (ACE_OS::strcmp (command, TAO_NS_CONTROL_REMOVE_SUPPLIER) == 0)
00258 {
00259 status = this->ec_->destroy_supplier (this->id_);
00260 }
00261 else
00262 {
00263
00264 return false;
00265 }
00266 }
00267 else
00268 {
00269 if (ACE_OS::strcmp (command, TAO_NS_CONTROL_REMOVE_CONSUMER) == 0)
00270 {
00271 status = this->ec_->destroy_consumer (this->id_);
00272 }
00273 else
00274 {
00275
00276 return false;
00277 }
00278 }
00279
00280 return status;
00281 }
00282
00283 private:
00284 TAO_MonitorEventChannel* ec_;
00285 CosNotifyChannelAdmin::ProxyID id_;
00286 bool is_supplier_;
00287 };
00288
00289
00290
00291
00292
00293 TAO_MonitorEventChannel::TAO_MonitorEventChannel (const char* name)
00294 : name_ (name)
00295 {
00296 this->add_stats ();
00297 }
00298
00299 TAO_MonitorEventChannel::~TAO_MonitorEventChannel (void)
00300 {
00301 ACE_GUARD (TAO_SYNCH_MUTEX, guard, this->names_mutex_);
00302
00303 Monitor_Point_Registry* instance = Monitor_Point_Registry::instance ();
00304 size_t size = this->stat_names_.size ();
00305
00306 for (size_t i = 0; i < size; ++i)
00307 {
00308 instance->remove (this->stat_names_[i].c_str ());
00309 }
00310
00311 TAO_Control_Registry* cinstance = TAO_Control_Registry::instance ();
00312 size = this->control_names_.size ();
00313
00314 for (size_t i = 0; i < size; ++i)
00315 {
00316 cinstance->remove (this->control_names_[i]);
00317 }
00318 }
00319
00320 const ACE_CString&
00321 TAO_MonitorEventChannel::name (void) const
00322 {
00323 return this->name_;
00324 }
00325
00326 bool
00327 TAO_MonitorEventChannel::register_statistic (const ACE_CString& name,
00328 Monitor_Base* stat)
00329 {
00330
00331
00332 bool added = Monitor_Point_Registry::instance ()->add (stat);
00333
00334 if (added)
00335 {
00336 ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, guard, this->names_mutex_, added);
00337
00338 this->stat_names_.push_back (name);
00339 }
00340
00341 return added;
00342 }
00343
00344 bool
00345 TAO_MonitorEventChannel::unregister_statistic (const ACE_CString& name)
00346 {
00347
00348
00349 bool removed = Monitor_Point_Registry::instance ()->remove (name.c_str ());
00350
00351 if (removed)
00352 {
00353 ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, guard, this->names_mutex_, removed);
00354
00355 this->remove_list_name (this->stat_names_, name);
00356 }
00357
00358 return removed;
00359 }
00360
00361 void
00362 TAO_MonitorEventChannel::map_supplier_proxy (
00363 CosNotifyChannelAdmin::ProxyID id,
00364 const ACE_CString& name)
00365 {
00366 if (name.length () == 0)
00367 {
00368 throw NotifyMonitoringExt::NameMapError ();
00369 }
00370
00371 ACE_CString full = this->name_ + "/" + name;
00372
00373 ACE_WRITE_GUARD (TAO_SYNCH_RW_MUTEX, guard, this->supplier_mutex_);
00374
00375 if (this->is_duplicate_name (this->supplier_map_, full))
00376 {
00377 throw NotifyMonitoringExt::NameAlreadyUsed ();
00378 }
00379
00380 if (this->supplier_map_.bind (id, full) != 0)
00381 {
00382 throw NotifyMonitoringExt::NameMapError ();
00383 }
00384
00385
00386
00387
00388 RemoveConsumerSupplierControl* rcsc = 0;
00389 ACE_NEW_THROW_EX (rcsc,
00390 RemoveConsumerSupplierControl (this, full, id, false),
00391 CORBA::NO_MEMORY ());
00392
00393 TAO_Control_Registry* cinstance = TAO_Control_Registry::instance ();
00394
00395 if (cinstance->add (rcsc))
00396 {
00397 ACE_GUARD (TAO_SYNCH_MUTEX, guard, this->names_mutex_);
00398
00399 this->control_names_.push_back (full);
00400 }
00401 else
00402 {
00403 delete rcsc;
00404 ACE_ERROR ((LM_ERROR,
00405 "Unable to add control: %s\n",
00406 full.c_str ()));
00407 }
00408 }
00409
00410 void
00411 TAO_MonitorEventChannel::map_consumer_proxy (
00412 CosNotifyChannelAdmin::ProxyID id,
00413 const ACE_CString& name)
00414 {
00415 if (name.length () == 0)
00416 {
00417 throw NotifyMonitoringExt::NameMapError ();
00418 }
00419
00420 ACE_CString full = this->name_ + "/" + name;
00421
00422 ACE_WRITE_GUARD (TAO_SYNCH_RW_MUTEX, guard, this->consumer_mutex_);
00423
00424 if (this->is_duplicate_name (this->consumer_map_, full))
00425 {
00426 throw NotifyMonitoringExt::NameAlreadyUsed ();
00427 }
00428
00429 if (this->consumer_map_.bind (id, full) != 0)
00430 {
00431 throw NotifyMonitoringExt::NameMapError ();
00432 }
00433
00434
00435
00436
00437 RemoveConsumerSupplierControl* rcsc = 0;
00438 ACE_NEW_THROW_EX (rcsc,
00439 RemoveConsumerSupplierControl (this, full, id, true),
00440 CORBA::NO_MEMORY ());
00441
00442 TAO_Control_Registry* cinstance = TAO_Control_Registry::instance ();
00443
00444 if (cinstance->add (rcsc))
00445 {
00446 ACE_GUARD (TAO_SYNCH_MUTEX, guard, this->names_mutex_);
00447
00448 this->control_names_.push_back (full);
00449 }
00450 else
00451 {
00452 delete rcsc;
00453 ACE_ERROR ((LM_ERROR,
00454 "Unable to add control: %s\n",
00455 full.c_str ()));
00456 }
00457 }
00458
00459 void
00460 TAO_MonitorEventChannel::cleanup_proxy (CosNotifyChannelAdmin::ProxyID id,
00461 bool is_supplier,
00462 bool experienced_timeout)
00463 {
00464 ACE_CString name;
00465
00466 if (is_supplier)
00467 {
00468 ACE_WRITE_GUARD (TAO_SYNCH_RW_MUTEX, guard, this->supplier_mutex_);
00469
00470
00471
00472
00473
00474
00475
00476
00477 this->supplier_map_.unbind (id, name);
00478
00479
00480
00481
00482 if (experienced_timeout && name.length () != 0)
00483 {
00484 ACE_WRITE_GUARD (ACE_SYNCH_RW_MUTEX, tguard,
00485 this->timedout_supplier_mutex_);
00486 this->timedout_supplier_map_.bind (id, name);
00487 }
00488 }
00489 else
00490 {
00491 ACE_WRITE_GUARD (TAO_SYNCH_RW_MUTEX, guard, this->consumer_mutex_);
00492
00493
00494 this->consumer_map_.unbind (id, name);
00495 }
00496
00497 if (name.length () != 0)
00498 {
00499 TAO_Control_Registry* cinstance = TAO_Control_Registry::instance ();
00500 cinstance->remove (name);
00501
00502 ACE_GUARD (TAO_SYNCH_MUTEX, guard, this->names_mutex_);
00503
00504 this->remove_list_name (this->control_names_, name);
00505 }
00506 }
00507
00508 void
00509 TAO_MonitorEventChannel::remove_consumeradmin (
00510 CosNotifyChannelAdmin::ProxyID id)
00511 {
00512 ACE_WRITE_GUARD (TAO_SYNCH_RW_MUTEX, guard, this->consumeradmin_mutex_);
00513
00514 this->consumeradmin_map_.unbind (id);
00515 }
00516
00517 void
00518 TAO_MonitorEventChannel::remove_supplieradmin (
00519 CosNotifyChannelAdmin::ProxyID id)
00520 {
00521 ACE_WRITE_GUARD (TAO_SYNCH_RW_MUTEX, guard, this->supplieradmin_mutex_);
00522
00523 this->supplieradmin_map_.unbind (id);
00524 }
00525
00526 void
00527 TAO_MonitorEventChannel::add_stats (const char* name)
00528 {
00529 if (name != 0 && this->name_.length () == 0)
00530 {
00531 this->name_ = name;
00532 }
00533
00534 if (this->name_.length () != 0)
00535 {
00536 ACE_CString dir_name (this->name_ + "/");
00537 ACE_CString stat_name = dir_name +
00538 NotifyMonitoringExt::EventChannelCreationTime;
00539 Monitor_Base* timestamp = 0;
00540 ACE_NEW_THROW_EX (timestamp,
00541 Timestamp_Monitor (stat_name.c_str ()),
00542 CORBA::NO_MEMORY ());
00543 ACE_Time_Value tv (ACE_OS::gettimeofday ());
00544 timestamp->receive (tv.sec () + (tv.usec () / 1000000.0));
00545
00546 if (!this->register_statistic (stat_name, timestamp))
00547 {
00548 ACE_ERROR ((LM_ERROR,
00549 "Unable to add statistic %s\n",
00550 stat_name.c_str ()));
00551 }
00552
00553
00554 timestamp->remove_ref ();
00555
00556 stat_name = dir_name
00557 + NotifyMonitoringExt::EventChannelConsumerCount;
00558 EventChannelConsumersSuppliers* consumers = 0;
00559 ACE_NEW_THROW_EX (consumers,
00560 EventChannelConsumersSuppliers (
00561 this,
00562 stat_name.c_str (),
00563 Monitor_Control_Types::MC_NUMBER),
00564 CORBA::NO_MEMORY ());
00565
00566 if (!this->register_statistic (stat_name, consumers))
00567 {
00568 ACE_ERROR ((LM_ERROR,
00569 "Unable to add statistic %s\n",
00570 stat_name.c_str ()));
00571 }
00572
00573
00574 consumers->remove_ref ();
00575
00576 stat_name = dir_name
00577 + NotifyMonitoringExt::EventChannelConsumerNames;
00578 consumers = 0;
00579 ACE_NEW_THROW_EX (consumers,
00580 EventChannelConsumersSuppliers (
00581 this,
00582 stat_name.c_str (),
00583 Monitor_Control_Types::MC_LIST),
00584 CORBA::NO_MEMORY ());
00585
00586 if (!this->register_statistic (stat_name, consumers))
00587 {
00588 ACE_ERROR ((LM_ERROR,
00589 "Unable to add statistic %s\n",
00590 stat_name.c_str ()));
00591 }
00592
00593
00594 consumers->remove_ref ();
00595
00596 stat_name = dir_name
00597 + NotifyMonitoringExt::EventChannelTimedoutConsumerNames;
00598 EventChannelTimedoutConsumers* tconsumers = 0;
00599 ACE_NEW_THROW_EX (tconsumers,
00600 EventChannelTimedoutConsumers (
00601 this,
00602 stat_name.c_str (),
00603 Monitor_Control_Types::MC_LIST),
00604 CORBA::NO_MEMORY ());
00605
00606 if (!this->register_statistic (stat_name, tconsumers))
00607 {
00608 ACE_ERROR ((LM_ERROR,
00609 "Unable to add statistic %s\n",
00610 stat_name.c_str ()));
00611 }
00612
00613
00614 tconsumers->remove_ref ();
00615
00616 stat_name = dir_name
00617 + NotifyMonitoringExt::EventChannelSupplierCount;
00618 EventChannelConsumersSuppliers* suppliers = 0;
00619 ACE_NEW_THROW_EX (suppliers,
00620 EventChannelConsumersSuppliers (
00621 this,
00622 stat_name.c_str (),
00623 Monitor_Control_Types::MC_NUMBER,
00624 true),
00625 CORBA::NO_MEMORY ());
00626
00627 if (!this->register_statistic (stat_name, suppliers))
00628 {
00629 ACE_ERROR ((LM_ERROR,
00630 "Unable to add statistic %s\n",
00631 stat_name.c_str ()));
00632 }
00633
00634
00635 suppliers->remove_ref ();
00636
00637 stat_name = dir_name
00638 + NotifyMonitoringExt::EventChannelSupplierNames;
00639 suppliers = 0;
00640 ACE_NEW_THROW_EX (suppliers,
00641 EventChannelConsumersSuppliers (
00642 this,
00643 stat_name.c_str (),
00644 Monitor_Control_Types::MC_LIST,
00645 true),
00646 CORBA::NO_MEMORY ());
00647
00648 if (!this->register_statistic (stat_name, suppliers))
00649 {
00650 ACE_ERROR ((LM_ERROR,
00651 "Unable to add statistic %s\n",
00652 stat_name.c_str ()));
00653 }
00654
00655
00656 suppliers->remove_ref ();
00657
00658 stat_name = dir_name
00659 + NotifyMonitoringExt::EventChannelConsumerAdminCount;
00660 EventChannelConsumerSupplierAdmins* conadmins = 0;
00661 ACE_NEW_THROW_EX (conadmins,
00662 EventChannelConsumerSupplierAdmins (
00663 this,
00664 stat_name.c_str (),
00665 Monitor_Control_Types::MC_NUMBER),
00666 CORBA::NO_MEMORY ());
00667
00668 if (!this->register_statistic (stat_name, conadmins))
00669 {
00670 ACE_ERROR ((LM_ERROR,
00671 "Unable to add statistic %s\n",
00672 stat_name.c_str ()));
00673 }
00674
00675
00676 conadmins->remove_ref ();
00677
00678 stat_name = dir_name
00679 + NotifyMonitoringExt::EventChannelConsumerAdminNames;
00680 conadmins = 0;
00681 ACE_NEW_THROW_EX (conadmins,
00682 EventChannelConsumerSupplierAdmins (
00683 this,
00684 stat_name.c_str (),
00685 Monitor_Control_Types::MC_LIST),
00686 CORBA::NO_MEMORY ());
00687
00688 if (!this->register_statistic (stat_name, conadmins))
00689 {
00690 ACE_ERROR ((LM_ERROR,
00691 "Unable to add statistic %s\n",
00692 stat_name.c_str ()));
00693 }
00694
00695
00696 conadmins->remove_ref ();
00697
00698 stat_name = dir_name
00699 + NotifyMonitoringExt::EventChannelSupplierAdminCount;
00700 EventChannelConsumerSupplierAdmins* supadmins = 0;
00701 ACE_NEW_THROW_EX (supadmins,
00702 EventChannelConsumerSupplierAdmins (
00703 this,
00704 stat_name.c_str (),
00705 Monitor_Control_Types::MC_NUMBER,
00706 true),
00707 CORBA::NO_MEMORY ());
00708
00709 if (!this->register_statistic (stat_name, supadmins))
00710 {
00711 ACE_ERROR ((LM_ERROR,
00712 "Unable to add statistic %s\n",
00713 stat_name.c_str ()));
00714 }
00715
00716
00717 supadmins->remove_ref ();
00718
00719 stat_name = dir_name
00720 + NotifyMonitoringExt::EventChannelSupplierAdminNames;
00721 supadmins = 0;
00722 ACE_NEW_THROW_EX (supadmins,
00723 EventChannelConsumerSupplierAdmins (
00724 this,
00725 stat_name.c_str (),
00726 Monitor_Control_Types::MC_LIST,
00727 true),
00728 CORBA::NO_MEMORY ());
00729
00730 if (!this->register_statistic (stat_name, supadmins))
00731 {
00732 ACE_ERROR ((LM_ERROR,
00733 "Unable to add statistic %s\n",
00734 stat_name.c_str ()));
00735 }
00736
00737
00738 supadmins->remove_ref ();
00739
00740 stat_name = dir_name + NotifyMonitoringExt::EventChannelQueueSize;
00741 QueuedEvents* events = 0;
00742 ACE_NEW_THROW_EX (events,
00743 QueuedEvents (this,
00744 stat_name.c_str (),
00745 Monitor_Control_Types::MC_NUMBER,
00746 false),
00747 CORBA::NO_MEMORY ());
00748
00749 if (!this->register_statistic (stat_name, events))
00750 {
00751 ACE_ERROR ((LM_ERROR,
00752 "Unable to add statistic %s\n",
00753 stat_name.c_str ()));
00754 }
00755
00756
00757 events->remove_ref ();
00758
00759 stat_name = dir_name
00760 + NotifyMonitoringExt::EventChannelQueueElementCount;
00761 events = 0;
00762 ACE_NEW_THROW_EX (events,
00763 QueuedEvents (this,
00764 stat_name.c_str (),
00765 Monitor_Control_Types::MC_NUMBER,
00766 true),
00767 CORBA::NO_MEMORY ());
00768
00769 events->add_to_registry ();
00770 events->remove_ref ();
00771
00772 stat_name = dir_name + NotifyMonitoringExt::EventChannelOldestEvent;
00773 OldestEvent* oldest = 0;
00774 ACE_NEW_THROW_EX (oldest,
00775 OldestEvent (this, stat_name.c_str (),
00776 Monitor_Control_Types::MC_TIME),
00777 CORBA::NO_MEMORY ());
00778
00779 if (!this->register_statistic (stat_name, oldest))
00780 {
00781 ACE_ERROR ((LM_ERROR,
00782 "Unable to add statistic %s\n",
00783 stat_name.c_str ()));
00784 }
00785
00786
00787 oldest->remove_ref ();
00788
00789 stat_name = dir_name
00790 + NotifyMonitoringExt::EventChannelSlowestConsumers;
00791 SlowestConsumers* slowest = 0;
00792 ACE_NEW_THROW_EX (slowest,
00793 SlowestConsumers (this,
00794 stat_name.c_str (),
00795 Monitor_Control_Types::MC_LIST),
00796 CORBA::NO_MEMORY ());
00797
00798 if (!this->register_statistic (stat_name, slowest))
00799 {
00800 ACE_ERROR ((LM_ERROR,
00801 "Unable to add statistic %s\n",
00802 stat_name.c_str ()));
00803 }
00804
00805
00806 slowest->remove_ref ();
00807
00808 stat_name = dir_name +
00809 NotifyMonitoringExt::EventChannelQueueOverflows;
00810 QueueOverflows* overflows = 0;
00811 ACE_NEW_THROW_EX (overflows,
00812 QueueOverflows (stat_name.c_str ()),
00813 CORBA::NO_MEMORY ());
00814 if (!this->register_statistic (stat_name, overflows))
00815 {
00816 delete overflows;
00817 ACE_ERROR ((LM_ERROR, "Unable to add statistic: %s\n",
00818 stat_name.c_str ()));
00819 }
00820
00821 TAO_Control_Registry* cinstance =
00822 TAO_Control_Registry::instance ();
00823
00824 ShutdownControl* sd = 0;
00825 ACE_NEW_THROW_EX (sd,
00826 ShutdownControl (this,
00827 this->name_),
00828 CORBA::NO_MEMORY ());
00829
00830 if (cinstance->add (sd))
00831 {
00832 ACE_GUARD (TAO_SYNCH_MUTEX, guard, this->names_mutex_);
00833 this->control_names_.push_back (this->name_);
00834 }
00835 else
00836 {
00837 delete sd;
00838 ACE_ERROR ((LM_ERROR,
00839 "Unable to add control: %s\n",
00840 this->name_.c_str ()));
00841 }
00842 }
00843 }
00844
00845 CosNotifyChannelAdmin::ConsumerAdmin_ptr
00846 TAO_MonitorEventChannel::new_for_consumers (
00847 CosNotifyChannelAdmin::InterFilterGroupOperator op,
00848 CosNotifyChannelAdmin::AdminID_out id)
00849 {
00850 return this->named_new_for_consumers (op, id, 0);
00851 }
00852
00853 CosNotifyChannelAdmin::ConsumerAdmin_ptr
00854 TAO_MonitorEventChannel::named_new_for_consumers (
00855 CosNotifyChannelAdmin::InterFilterGroupOperator op,
00856 CosNotifyChannelAdmin::AdminID_out id,
00857 const char* name)
00858 {
00859 if (name != 0 && name[0] == 0)
00860 {
00861 throw NotifyMonitoringExt::NameMapError ();
00862 }
00863
00864 CosNotifyChannelAdmin::ConsumerAdmin_var admin =
00865 this->TAO_Notify_EventChannel::new_for_consumers (op, id);
00866
00867
00868 TAO_MonitorConsumerAdmin* low_admin =
00869 dynamic_cast<TAO_MonitorConsumerAdmin*> (admin->_servant ());
00870
00871 if (low_admin == 0)
00872 {
00873
00874 throw CORBA::INTERNAL ();
00875 }
00876 else
00877 {
00878
00879 ACE_CString full = this->name_ + "/";
00880
00881 if (name == 0)
00882 {
00883 char idname[64];
00884 ACE_OS::sprintf (idname, "%d", id);
00885 full += idname;
00886 }
00887 else
00888 {
00889 full += name;
00890 }
00891
00892 ACE_WRITE_GUARD_RETURN (TAO_SYNCH_RW_MUTEX, guard,
00893 this->consumeradmin_mutex_,
00894 CosNotifyChannelAdmin::ConsumerAdmin::_nil ());
00895
00896 if (this->is_duplicate_name (this->consumeradmin_map_, full))
00897 {
00898 throw NotifyMonitoringExt::NameAlreadyUsed ();
00899 }
00900
00901 if (this->consumeradmin_map_.bind (id, full) != 0)
00902 {
00903 throw NotifyMonitoringExt::NameMapError ();
00904 }
00905
00906 low_admin->register_stats_controls (this, full);
00907 }
00908
00909 return admin._retn ();
00910 }
00911
00912 CosNotifyChannelAdmin::SupplierAdmin_ptr
00913 TAO_MonitorEventChannel::new_for_suppliers (
00914 CosNotifyChannelAdmin::InterFilterGroupOperator op,
00915 CosNotifyChannelAdmin::AdminID_out id)
00916 {
00917 return this->named_new_for_suppliers (op, id, 0);
00918 }
00919
00920 CosNotifyChannelAdmin::SupplierAdmin_ptr
00921 TAO_MonitorEventChannel::named_new_for_suppliers (
00922 CosNotifyChannelAdmin::InterFilterGroupOperator op,
00923 CosNotifyChannelAdmin::AdminID_out id,
00924 const char* name)
00925 {
00926 if (name != 0 && name[0] == 0)
00927 {
00928 throw NotifyMonitoringExt::NameMapError ();
00929 }
00930
00931 CosNotifyChannelAdmin::SupplierAdmin_var admin =
00932 this->TAO_Notify_EventChannel::new_for_suppliers (op, id);
00933
00934
00935 TAO_MonitorSupplierAdmin* low_admin =
00936 dynamic_cast<TAO_MonitorSupplierAdmin*> (admin->_servant ());
00937
00938 if (low_admin == 0)
00939 {
00940
00941 throw CORBA::INTERNAL ();
00942 }
00943 else
00944 {
00945
00946 ACE_CString full = this->name_ + "/";
00947
00948 if (name == 0)
00949 {
00950 char idname[64];
00951 ACE_OS::sprintf (idname, "%d", id);
00952 full += idname;
00953 }
00954 else
00955 {
00956 full += name;
00957 }
00958
00959 ACE_WRITE_GUARD_RETURN (TAO_SYNCH_RW_MUTEX, guard,
00960 this->supplieradmin_mutex_,
00961 CosNotifyChannelAdmin::SupplierAdmin::_nil ());
00962
00963 if (this->is_duplicate_name (this->supplieradmin_map_, full))
00964 {
00965 throw NotifyMonitoringExt::NameAlreadyUsed ();
00966 }
00967
00968 if (this->supplieradmin_map_.bind (id, full) != 0)
00969 {
00970 throw NotifyMonitoringExt::NameMapError ();
00971 }
00972
00973 low_admin->register_stats_controls (this, full);
00974 }
00975
00976 return admin._retn ();
00977 }
00978
00979 size_t
00980 TAO_MonitorEventChannel::get_consumers (
00981 Monitor_Control_Types::NameList* names)
00982 {
00983 size_t count = 0;
00984 CosNotifyChannelAdmin::AdminIDSeq_var conadmin_ids =
00985 this->get_all_consumeradmins ();
00986 CORBA::ULong length = conadmin_ids->length ();
00987
00988 for (CORBA::ULong j = 0; j < length; ++j)
00989 {
00990 CosNotifyChannelAdmin::ConsumerAdmin_var admin =
00991 this->get_consumeradmin (conadmin_ids[j]);
00992
00993 if (!CORBA::is_nil (admin.in ()))
00994 {
00995 CosNotifyChannelAdmin::ProxyIDSeq_var proxys =
00996 admin->push_suppliers ();
00997 CORBA::ULong plen = proxys->length ();
00998
00999 if (plen > 0)
01000 {
01001 ACE_READ_GUARD_RETURN (TAO_SYNCH_RW_MUTEX,
01002 guard,
01003 this->supplier_mutex_,
01004 0);
01005
01006 for (CORBA::ULong i = 0; i < plen; ++i)
01007 {
01008 if (names == 0)
01009 {
01010 if (this->supplier_map_.find (proxys[i]) == 0)
01011 {
01012 count++;
01013 }
01014 }
01015 else
01016 {
01017 ACE_CString name;
01018
01019 if (this->supplier_map_.find (proxys[i], name) == 0)
01020 {
01021 count++;
01022 names->push_back (name);
01023 }
01024 }
01025 }
01026 }
01027 }
01028 }
01029
01030 return count;
01031 }
01032
01033 void
01034 TAO_MonitorEventChannel::get_timedout_consumers (
01035 Monitor_Control_Types::NameList* names)
01036 {
01037 ACE_READ_GUARD (ACE_SYNCH_RW_MUTEX, guard, this->timedout_supplier_mutex_);
01038
01039 Map::const_iterator itr (this->timedout_supplier_map_);
01040 Map::value_type* entry = 0;
01041 while (itr.next (entry))
01042 {
01043 names->push_back (entry->item ());
01044 itr.advance ();
01045 }
01046 }
01047
01048 size_t
01049 TAO_MonitorEventChannel::get_suppliers (
01050 Monitor_Control_Types::NameList* names)
01051 {
01052 size_t count = 0;
01053 CosNotifyChannelAdmin::AdminIDSeq_var supadmin_ids =
01054 this->get_all_supplieradmins ();
01055 CORBA::ULong length = supadmin_ids->length ();
01056
01057 for (CORBA::ULong j = 0; j < length; ++j)
01058 {
01059 CosNotifyChannelAdmin::SupplierAdmin_var admin =
01060 this->get_supplieradmin (supadmin_ids[j]);
01061
01062 if (!CORBA::is_nil (admin.in ()))
01063 {
01064 CosNotifyChannelAdmin::ProxyIDSeq_var proxys =
01065 admin->push_consumers ();
01066 CORBA::ULong plen = proxys->length ();
01067
01068 if (plen > 0)
01069 {
01070 ACE_READ_GUARD_RETURN (TAO_SYNCH_RW_MUTEX,
01071 guard,
01072 this->consumer_mutex_,
01073 0);
01074
01075 for (CORBA::ULong i = 0; i < plen; ++i)
01076 {
01077 if (names == 0)
01078 {
01079 if (this->consumer_map_.find (proxys[i]) == 0)
01080 {
01081 count++;
01082 }
01083 }
01084 else
01085 {
01086 ACE_CString name;
01087
01088 if (this->consumer_map_.find (proxys[i], name) == 0)
01089 {
01090 count++;
01091 names->push_back (name);
01092 }
01093 }
01094 }
01095 }
01096 }
01097 }
01098
01099 return count;
01100 }
01101
01102 size_t
01103 TAO_MonitorEventChannel::get_consumeradmins (
01104 Monitor_Control_Types::NameList* names)
01105 {
01106 ACE_READ_GUARD_RETURN (TAO_SYNCH_RW_MUTEX,
01107 guard,
01108 this->consumeradmin_mutex_,
01109 0);
01110
01111 CosNotifyChannelAdmin::AdminIDSeq_var admin_ids =
01112 this->get_all_consumeradmins ();
01113
01114 return this->get_admins (this->consumeradmin_map_,
01115 admin_ids.in (),
01116 names);
01117 }
01118
01119 size_t
01120 TAO_MonitorEventChannel::get_supplieradmins (
01121 Monitor_Control_Types::NameList* names)
01122 {
01123 ACE_READ_GUARD_RETURN (TAO_SYNCH_RW_MUTEX,
01124 guard,
01125 this->supplieradmin_mutex_,
01126 0);
01127
01128 CosNotifyChannelAdmin::AdminIDSeq_var admin_ids =
01129 this->get_all_supplieradmins ();
01130
01131 return this->get_admins (this->supplieradmin_map_,
01132 admin_ids.in (),
01133 names);
01134 }
01135
01136 size_t
01137 TAO_MonitorEventChannel::get_admins (
01138 TAO_MonitorEventChannel::Map& map,
01139 const CosNotifyChannelAdmin::AdminIDSeq& admin_ids,
01140 Monitor_Control_Types::NameList* names)
01141 {
01142 size_t count = 0;
01143 CORBA::ULong length = admin_ids.length ();
01144
01145 for (CORBA::ULong j = 0; j < length; ++j)
01146 {
01147 if (names == 0)
01148 {
01149 if (map.find (admin_ids[j]) == 0)
01150 {
01151 count++;
01152 }
01153 }
01154 else
01155 {
01156 ACE_CString name;
01157
01158 if (map.find (admin_ids[j], name) == 0)
01159 {
01160 count++;
01161 names->push_back (name);
01162 }
01163 }
01164 }
01165
01166 return count;
01167 }
01168
01169 TAO_Notify_ThreadPool_Task*
01170 TAO_MonitorEventChannel::get_threadpool_task (
01171 CosNotifyChannelAdmin::AdminID id)
01172 {
01173 CosNotifyChannelAdmin::ConsumerAdmin_var admin =
01174 this->get_consumeradmin (id);
01175
01176 if (!CORBA::is_nil (admin.in ()))
01177 {
01178
01179
01180 TAO_Notify_ConsumerAdmin* low_admin =
01181 dynamic_cast<TAO_Notify_ConsumerAdmin*> (admin->_servant ());
01182
01183 if (low_admin != 0)
01184 {
01185 return
01186 dynamic_cast<TAO_Notify_ThreadPool_Task*> (
01187 low_admin->get_worker_task ());
01188 }
01189 }
01190
01191 return 0;
01192 }
01193
01194 size_t
01195 TAO_MonitorEventChannel::calculate_queue_size (bool count)
01196 {
01197 size_t size = 0;
01198 CosNotifyChannelAdmin::AdminIDSeq_var conadmin_ids =
01199 this->get_all_consumeradmins ();
01200 CORBA::ULong length = conadmin_ids->length ();
01201
01202 for (CORBA::ULong j = 0; j < length; j++)
01203 {
01204 TAO_Notify_ThreadPool_Task* task =
01205 this->get_threadpool_task (conadmin_ids[j]);
01206
01207 if (task != 0)
01208 {
01209 TAO_Notify_Message_Queue* queue = task->msg_queue ();
01210
01211 if (count)
01212 {
01213 size += queue->message_count ();
01214 }
01215 else
01216 {
01217
01218
01219
01220
01221
01222 size += (queue->message_count () );
01223 }
01224 }
01225 }
01226
01227 return size;
01228 }
01229
01230 void
01231 TAO_MonitorEventChannel::determine_slowest_consumer (
01232 Monitor_Control_Types::NameList* names)
01233 {
01234 size_t largest = 0;
01235 CosNotifyChannelAdmin::AdminID id = 0;
01236
01237 CosNotifyChannelAdmin::AdminIDSeq_var conadmin_ids =
01238 this->get_all_consumeradmins ();
01239 CORBA::ULong length = conadmin_ids->length ();
01240
01241 for (CORBA::ULong j = 0; j < length; ++j)
01242 {
01243 TAO_Notify_ThreadPool_Task* task =
01244 this->get_threadpool_task (conadmin_ids[j]);
01245
01246 if (task != 0)
01247 {
01248 TAO_Notify_Message_Queue* queue = task->msg_queue ();
01249 size_t count = queue->message_count ();
01250
01251 if (count > largest)
01252 {
01253 largest = count;
01254 id = conadmin_ids[j];
01255 }
01256 }
01257 }
01258
01259 if (largest > 0)
01260 {
01261 CosNotifyChannelAdmin::ConsumerAdmin_var admin =
01262 this->get_consumeradmin (id);
01263
01264 if (!CORBA::is_nil (admin.in ()))
01265 {
01266 CosNotifyChannelAdmin::ProxyIDSeq_var proxys =
01267 admin->push_suppliers ();
01268 CORBA::ULong plen = proxys->length ();
01269
01270 if (plen > 0)
01271 {
01272 ACE_READ_GUARD (TAO_SYNCH_RW_MUTEX,
01273 guard,
01274 this->supplier_mutex_);
01275
01276 for (CORBA::ULong i = 0; i < plen; ++i)
01277 {
01278 ACE_CString name;
01279
01280 if (this->supplier_map_.find (proxys[i], name) == 0)
01281 {
01282 names->push_back (name);
01283 }
01284 }
01285 }
01286 }
01287 }
01288 }
01289
01290 double
01291 TAO_MonitorEventChannel::get_oldest_event (void)
01292 {
01293 CosNotifyChannelAdmin::AdminIDSeq_var conadmin_ids =
01294 this->get_all_consumeradmins ();
01295 CORBA::ULong length = conadmin_ids->length ();
01296
01297
01298 if (length == 0)
01299 {
01300 return 0.0;
01301 }
01302
01303
01304
01305 ACE_Time_Value tv (ACE_Time_Value::max_time);
01306
01307 for (CORBA::ULong j = 0; j < length; ++j)
01308 {
01309 TAO_Notify_ThreadPool_Task* task =
01310 this->get_threadpool_task (conadmin_ids[j]);
01311
01312 if (task != 0)
01313 {
01314 ACE_Time_Value old =
01315 task->buffering_strategy ()->oldest_event ();
01316
01317 if (old < tv)
01318 {
01319 tv = old;
01320 }
01321 }
01322 }
01323
01324
01325
01326
01327 if (tv == ACE_Time_Value::max_time)
01328 {
01329 return 0.0;
01330 }
01331
01332 return tv.sec () + (tv.usec () / 1000000.0);
01333 }
01334
01335 bool
01336 TAO_MonitorEventChannel::is_duplicate_name (
01337 const TAO_MonitorEventChannel::Map& map,
01338 const ACE_CString& name) const
01339 {
01340 Map::const_iterator itr (map);
01341 Map::value_type* entry = 0;
01342
01343 while (itr.next (entry))
01344 {
01345 if (name == entry->item ())
01346 {
01347 return true;
01348 }
01349
01350 itr.advance ();
01351 }
01352
01353 return false;
01354 }
01355
01356 void
01357 TAO_MonitorEventChannel::remove_list_name (
01358 Monitor_Control_Types::NameList& list,
01359 const ACE_CString& name)
01360 {
01361 size_t size = list.size ();
01362
01363 for (size_t i = 0; i < size; ++i)
01364 {
01365 if (list[i] == name)
01366 {
01367 if (size == 1)
01368 {
01369 list.clear ();
01370 }
01371 else
01372 {
01373 if (i != size - 1)
01374 {
01375 list[i] = list[size - 1];
01376 }
01377
01378 list.resize (size - 1, "");
01379 }
01380
01381 break;
01382 }
01383 }
01384 }
01385
01386 bool
01387 TAO_MonitorEventChannel::destroy_consumer (
01388 CosNotifyChannelAdmin::ProxyID id)
01389 {
01390
01391
01392 CosNotifyChannelAdmin::AdminIDSeq_var conadmin_ids =
01393 this->get_all_consumeradmins ();
01394 CORBA::ULong length = conadmin_ids->length ();
01395
01396 for (CORBA::ULong j = 0; j < length; ++j)
01397 {
01398 CosNotifyChannelAdmin::ConsumerAdmin_var admin =
01399 this->get_consumeradmin (conadmin_ids[j]);
01400
01401 if (!CORBA::is_nil (admin.in ()))
01402 {
01403 try
01404 {
01405 CosNotifyChannelAdmin::ProxySupplier_var supplier =
01406 admin->get_proxy_supplier(id);
01407
01408 if (!CORBA::is_nil(supplier.in()))
01409 {
01410
01411
01412 TAO_Notify_ProxySupplier* low_proxy =
01413 dynamic_cast<TAO_Notify_ProxySupplier*> (
01414 supplier->_servant ());
01415
01416 low_proxy->destroy ();
01417 return true;
01418 }
01419 }
01420 catch(const CosNotifyChannelAdmin::ProxyNotFound&)
01421 {
01422 }
01423 }
01424 }
01425
01426 return false;
01427 }
01428
01429 bool
01430 TAO_MonitorEventChannel::destroy_supplier (
01431 CosNotifyChannelAdmin::ProxyID id)
01432 {
01433
01434
01435 CosNotifyChannelAdmin::AdminIDSeq_var supadmin_ids =
01436 this->get_all_supplieradmins ();
01437 CORBA::ULong length = supadmin_ids->length ();
01438
01439 for (CORBA::ULong j = 0; j < length; ++j)
01440 {
01441 CosNotifyChannelAdmin::SupplierAdmin_var admin =
01442 this->get_supplieradmin (supadmin_ids[j]);
01443
01444 if (!CORBA::is_nil (admin.in ()))
01445 {
01446 try
01447 {
01448 CosNotifyChannelAdmin::ProxyConsumer_var consumer =
01449 admin->get_proxy_consumer(id);
01450
01451 if (!CORBA::is_nil(consumer.in()))
01452 {
01453
01454
01455 TAO_Notify_ProxyConsumer* low_proxy =
01456 dynamic_cast<TAO_Notify_ProxyConsumer*> (
01457 consumer->_servant ());
01458
01459 low_proxy->destroy ();
01460 return true;
01461 }
01462 }
01463 catch(const CosNotifyChannelAdmin::ProxyNotFound&)
01464 {
01465 }
01466 }
01467 }
01468
01469 return false;
01470 }
01471
01472 TAO_END_VERSIONED_NAMESPACE_DECL
01473
01474 #endif
01475