00001
00002
00003 #include "orbsvcs/Notify/ProxyConsumer.h"
00004
00005 #if ! defined (__ACE_INLINE__)
00006 #include "orbsvcs/Notify/ProxyConsumer.inl"
00007 #endif
00008
00009 ACE_RCSID(Notify, TAO_Notify_ProxyConsumer, "$Id: ProxyConsumer.cpp 79750 2007-10-04 19:07:26Z johnc $")
00010
00011 #include "tao/debug.h"
00012 #include "ace/Atomic_Op.h"
00013 #include "orbsvcs/Notify/Supplier.h"
00014 #include "orbsvcs/Notify/AdminProperties.h"
00015 #include "orbsvcs/Notify/Property.h"
00016 #include "orbsvcs/Notify/Proxy.h"
00017 #include "orbsvcs/Notify/Event_Manager.h"
00018 #include "orbsvcs/Notify/Method_Request_Lookup.h"
00019 #include "orbsvcs/Notify/Worker_Task.h"
00020 #include "orbsvcs/Notify/Properties.h"
00021 #include "orbsvcs/Notify/SupplierAdmin.h"
00022 #include "orbsvcs/Notify/EventChannel.h"
00023 #include "orbsvcs/Notify/Routing_Slip.h"
00024
00025
00026 #ifndef DEBUG_LEVEL
00027 # define DEBUG_LEVEL TAO_debug_level
00028 #endif //DEBUG_LEVEL
00029
00030 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00031
00032 TAO_Notify_ProxyConsumer::TAO_Notify_ProxyConsumer (void)
00033 : supplier_admin_ (0)
00034 {
00035 }
00036
00037 TAO_Notify_ProxyConsumer::~TAO_Notify_ProxyConsumer ()
00038 {
00039 }
00040
00041 TAO_Notify_Peer*
00042 TAO_Notify_ProxyConsumer::peer (void)
00043 {
00044 return this->supplier ();
00045 }
00046
00047 void
00048 TAO_Notify_ProxyConsumer::init (TAO_Notify::Topology_Parent* topology_parent)
00049 {
00050 ACE_ASSERT( this->supplier_admin_.get() == 0 );
00051
00052 TAO_Notify_Proxy::initialize (topology_parent);
00053
00054 this->supplier_admin_.reset (dynamic_cast<TAO_Notify_SupplierAdmin *>(topology_parent));
00055 ACE_ASSERT (this->supplier_admin_.get() != 0);
00056
00057 const CosNotification::QoSProperties &default_ps_qos =
00058 TAO_Notify_PROPERTIES::instance ()->default_proxy_consumer_qos_properties ();
00059
00060 {
00061 ACE_GUARD_THROW_EX (TAO_SYNCH_MUTEX, ace_mon, this->lock_,
00062 CORBA::INTERNAL ());
00063 this->TAO_Notify_Object::set_qos (default_ps_qos);
00064 }
00065 }
00066
00067 void
00068 TAO_Notify_ProxyConsumer::connect (TAO_Notify_Supplier *supplier)
00069 {
00070
00071 ACE_Auto_Ptr< TAO_Notify_Supplier > auto_supplier (supplier);
00072
00073 TAO_Notify_Atomic_Property_Long& supplier_count = this->admin_properties().suppliers ();
00074 const TAO_Notify_Property_Long& max_suppliers = this->admin_properties().max_suppliers ();
00075
00076 if (max_suppliers != 0 && supplier_count >= max_suppliers.value ())
00077 {
00078 throw CORBA::IMP_LIMIT (
00079 );
00080 }
00081
00082 {
00083 ACE_GUARD_THROW_EX (TAO_SYNCH_MUTEX, ace_mon, this->lock_,
00084 CORBA::INTERNAL ());
00085
00086
00087 if (this->is_connected () && TAO_Notify_PROPERTIES::instance()->allow_reconnect() == false)
00088 {
00089 throw CosEventChannelAdmin::AlreadyConnected ();
00090 }
00091
00092
00093 this->supplier_ = auto_supplier;
00094
00095 this->supplier_admin_->subscribed_types (this->subscribed_types_);
00096 }
00097
00098
00099 ACE_ASSERT (this->supplier_.get() != 0);
00100 this->supplier_->qos_changed (this->qos_properties_);
00101
00102 TAO_Notify_EventTypeSeq removed;
00103
00104 this->event_manager().offer_change (this, this->subscribed_types_, removed);
00105
00106 this->event_manager().connect (this);
00107
00108
00109 ++supplier_count;
00110 }
00111 void
00112 TAO_Notify_ProxyConsumer::push_i (TAO_Notify_Event * event)
00113 {
00114 if (this->supports_reliable_events ())
00115 {
00116 TAO_Notify_Event::Ptr pevent(event->queueable_copy());
00117 TAO_Notify::Routing_Slip_Ptr routing_slip =
00118 TAO_Notify::Routing_Slip::create (pevent);
00119 if (DEBUG_LEVEL > 0)
00120 ACE_DEBUG((LM_DEBUG, ACE_TEXT ("ProxyConsumer routing event.\n")));
00121 routing_slip->route (this, true);
00122 routing_slip->wait_persist ();
00123 }
00124 else
00125 {
00126 TAO_Notify_Method_Request_Lookup_No_Copy request (event, this);
00127 this->execute_task (request);
00128 }
00129 }
00130
00131 bool
00132 TAO_Notify_ProxyConsumer::supports_reliable_events () const
00133 {
00134 bool reliable = false;
00135 CosNotification::PropertyValue value;
00136 if (this->find_qos_property_value (CosNotification::EventReliability, value))
00137 {
00138 CORBA::Short setting;
00139 if (value >>= setting)
00140 {
00141 reliable = (setting == CosNotification::Persistent);
00142 }
00143 }
00144 return reliable;
00145 }
00146
00147 void
00148 TAO_Notify_ProxyConsumer::disconnect (void)
00149 {
00150 TAO_Notify_EventTypeSeq added;
00151
00152 event_manager().offer_change (this, added, this->subscribed_types_);
00153
00154 this->event_manager().disconnect (this);
00155
00156
00157 this->admin_properties().suppliers ()--;
00158 }
00159
00160 int
00161 TAO_Notify_ProxyConsumer::shutdown (void)
00162 {
00163 if (this->TAO_Notify_Object::shutdown () == 1)
00164 return 1;
00165
00166
00167 this->disconnect ();
00168
00169 if (this->supplier_.get() != 0)
00170 {
00171 this->supplier_->shutdown ();
00172 }
00173 return 0;
00174 }
00175
00176 void
00177 TAO_Notify_ProxyConsumer::destroy (void)
00178 {
00179 this->shutdown ();
00180 this->supplier_admin_->cleanup_proxy (this, false);
00181
00182
00183
00184 }
00185
00186 TAO_END_VERSIONED_NAMESPACE_DECL