Go to the documentation of this file.00001
00002
00003 #include "orbsvcs/Notify/ProxyConsumer.h"
00004
00005 #if ! defined (__ACE_INLINE__)
00006 #include "orbsvcs/Notify/ProxyConsumer.inl"
00007 #endif
00008
00009 ACE_RCSID(Notify, TAO_Notify_ProxyConsumer, "$Id: ProxyConsumer.cpp 84728 2009-03-05 19:58:54Z dai_y $")
00010
00011 #include "tao/debug.h"
00012 #include "ace/Atomic_Op.h"
00013 #include "orbsvcs/Notify/Supplier.h"
00014 #include "orbsvcs/Notify/AdminProperties.h"
00015 #include "orbsvcs/Notify/Property.h"
00016 #include "orbsvcs/Notify/Proxy.h"
00017 #include "orbsvcs/Notify/Event_Manager.h"
00018 #include "orbsvcs/Notify/Method_Request_Lookup.h"
00019 #include "orbsvcs/Notify/Worker_Task.h"
00020 #include "orbsvcs/Notify/Properties.h"
00021 #include "orbsvcs/Notify/SupplierAdmin.h"
00022 #include "orbsvcs/Notify/EventChannel.h"
00023 #include "orbsvcs/Notify/Routing_Slip.h"
00024
00025
00026 #ifndef DEBUG_LEVEL
00027 # define DEBUG_LEVEL TAO_debug_level
00028 #endif //DEBUG_LEVEL
00029
00030 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00031
00032 TAO_Notify_ProxyConsumer::TAO_Notify_ProxyConsumer (void)
00033 : supplier_admin_ (0)
00034 {
00035 }
00036
00037 TAO_Notify_ProxyConsumer::~TAO_Notify_ProxyConsumer ()
00038 {
00039 }
00040
00041 TAO_Notify_Peer*
00042 TAO_Notify_ProxyConsumer::peer (void)
00043 {
00044 return this->supplier ();
00045 }
00046
00047 void
00048 TAO_Notify_ProxyConsumer::init (TAO_Notify::Topology_Parent* topology_parent)
00049 {
00050 ACE_ASSERT( this->supplier_admin_.get() == 0 );
00051
00052 TAO_Notify_Proxy::initialize (topology_parent);
00053
00054 this->supplier_admin_.reset (dynamic_cast<TAO_Notify_SupplierAdmin *>(topology_parent));
00055 ACE_ASSERT (this->supplier_admin_.get() != 0);
00056
00057 this->filter_admin_.event_channel (this->supplier_admin_->event_channel());
00058
00059 const CosNotification::QoSProperties &default_ps_qos =
00060 TAO_Notify_PROPERTIES::instance ()->default_proxy_consumer_qos_properties ();
00061
00062 {
00063 ACE_GUARD_THROW_EX (TAO_SYNCH_MUTEX, ace_mon, this->lock_,
00064 CORBA::INTERNAL ());
00065 this->TAO_Notify_Object::set_qos (default_ps_qos);
00066 }
00067 }
00068
00069 void
00070 TAO_Notify_ProxyConsumer::connect (TAO_Notify_Supplier *supplier)
00071 {
00072
00073 ACE_Auto_Ptr< TAO_Notify_Supplier > auto_supplier (supplier);
00074
00075 TAO_Notify_Atomic_Property_Long& supplier_count = this->admin_properties().suppliers ();
00076 const TAO_Notify_Property_Long& max_suppliers = this->admin_properties().max_suppliers ();
00077
00078 if (max_suppliers != 0 && supplier_count >= max_suppliers.value ())
00079 {
00080 throw CORBA::IMP_LIMIT (
00081 );
00082 }
00083
00084 {
00085 ACE_GUARD_THROW_EX (TAO_SYNCH_MUTEX, ace_mon, this->lock_,
00086 CORBA::INTERNAL ());
00087
00088
00089 if (this->is_connected () && TAO_Notify_PROPERTIES::instance()->allow_reconnect() == false)
00090 {
00091 throw CosEventChannelAdmin::AlreadyConnected ();
00092 }
00093
00094
00095 this->supplier_ = auto_supplier;
00096
00097 this->supplier_admin_->subscribed_types (this->subscribed_types_);
00098 }
00099
00100
00101 ACE_ASSERT (this->supplier_.get() != 0);
00102 this->supplier_->qos_changed (this->qos_properties_);
00103
00104 TAO_Notify_EventTypeSeq removed;
00105
00106 this->event_manager().offer_change (this, this->subscribed_types_, removed);
00107
00108 this->event_manager().connect (this);
00109
00110
00111 ++supplier_count;
00112 }
00113 void
00114 TAO_Notify_ProxyConsumer::push_i (TAO_Notify_Event * event)
00115 {
00116 last_ping_ = ACE_OS::gettimeofday ();
00117
00118 if (this->supports_reliable_events ())
00119 {
00120 TAO_Notify_Event::Ptr pevent(event->queueable_copy());
00121 TAO_Notify::Routing_Slip_Ptr routing_slip =
00122 TAO_Notify::Routing_Slip::create (pevent);
00123 if (DEBUG_LEVEL > 0)
00124 ACE_DEBUG((LM_DEBUG, ACE_TEXT ("ProxyConsumer routing event.\n")));
00125 routing_slip->route (this, true);
00126 routing_slip->wait_persist ();
00127 }
00128 else
00129 {
00130 TAO_Notify_Method_Request_Lookup_No_Copy request (event, this);
00131 this->execute_task (request);
00132 }
00133 }
00134
00135 bool
00136 TAO_Notify_ProxyConsumer::supports_reliable_events () const
00137 {
00138 bool reliable = false;
00139 CosNotification::PropertyValue value;
00140 if (this->find_qos_property_value (CosNotification::EventReliability, value))
00141 {
00142 CORBA::Short setting;
00143 if (value >>= setting)
00144 {
00145 reliable = (setting == CosNotification::Persistent);
00146 }
00147 }
00148 return reliable;
00149 }
00150
00151 void
00152 TAO_Notify_ProxyConsumer::disconnect (void)
00153 {
00154 TAO_Notify_EventTypeSeq added;
00155
00156 event_manager().offer_change (this, added, this->subscribed_types_);
00157
00158 this->event_manager().disconnect (this);
00159
00160
00161 this->admin_properties().suppliers ()--;
00162 }
00163
00164 int
00165 TAO_Notify_ProxyConsumer::shutdown (void)
00166 {
00167 if (this->TAO_Notify_Object::shutdown () == 1)
00168 return 1;
00169
00170
00171 this->disconnect ();
00172
00173 if (this->supplier_.get() != 0)
00174 {
00175 this->supplier_->shutdown ();
00176 }
00177 return 0;
00178 }
00179
00180 void
00181 TAO_Notify_ProxyConsumer::destroy (void)
00182 {
00183 this->shutdown ();
00184 this->supplier_admin_->cleanup_proxy (this, false, false);
00185
00186
00187
00188 }
00189
00190
00191 ACE_Time_Value
00192 TAO_Notify_ProxyConsumer::last_ping() const
00193 {
00194 return this->last_ping_.value ();
00195 }
00196
00197
00198 void
00199 TAO_Notify_ProxyConsumer::last_ping(const ACE_Time_Value& tv)
00200 {
00201 this->last_ping_ = tv;
00202 }
00203
00204
00205 TAO_END_VERSIONED_NAMESPACE_DECL