00001
00002
00003 #include "orbsvcs/Event/EC_Default_ProxyConsumer.h"
00004 #include "orbsvcs/Event/EC_Event_Channel_Base.h"
00005 #include "orbsvcs/Event/EC_Supplier_Filter_Builder.h"
00006 #include "orbsvcs/Event/EC_Supplier_Filter.h"
00007 #include "ace/Synch_T.h"
00008
00009 ACE_RCSID(Event, EC_Default_ProxyConsumer, "EC_Default_ProxyConsumer.cpp,v 1.6 2006/03/14 06:14:25 jtc Exp")
00010
00011 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00012
00013 typedef ACE_Reverse_Lock<ACE_Lock> TAO_EC_Unlock;
00014
00015 TAO_EC_Default_ProxyPushConsumer::
00016 TAO_EC_Default_ProxyPushConsumer (TAO_EC_Event_Channel_Base* ec)
00017 : TAO_EC_ProxyPushConsumer (ec)
00018 {
00019 }
00020
00021 TAO_EC_Default_ProxyPushConsumer::~TAO_EC_Default_ProxyPushConsumer (void)
00022 {
00023 }
00024
00025 void
00026 TAO_EC_Default_ProxyPushConsumer::connect_push_supplier (
00027 RtecEventComm::PushSupplier_ptr push_supplier,
00028 const RtecEventChannelAdmin::SupplierQOS& qos
00029 ACE_ENV_ARG_DECL)
00030 ACE_THROW_SPEC ((CORBA::SystemException,
00031 RtecEventChannelAdmin::AlreadyConnected))
00032 {
00033 {
00034 ACE_GUARD_THROW_EX (
00035 ACE_Lock, ace_mon, *this->lock_,
00036 CORBA::INTERNAL ());
00037
00038 ACE_CHECK;
00039
00040 if (this->is_connected_i ())
00041 {
00042 if (this->event_channel_->supplier_reconnect () == 0)
00043 ACE_THROW (RtecEventChannelAdmin::AlreadyConnected ());
00044
00045
00046
00047 this->cleanup_i ();
00048
00049
00050
00051 TAO_EC_Unlock reverse_lock (*this->lock_);
00052
00053 {
00054 ACE_GUARD_THROW_EX (
00055 TAO_EC_Unlock, ace_mon, reverse_lock,
00056 CORBA::INTERNAL ());
00057
00058 ACE_CHECK;
00059
00060 this->event_channel_->reconnected (this ACE_ENV_ARG_PARAMETER);
00061 ACE_CHECK;
00062 }
00063
00064
00065
00066
00067 if (this->is_connected_i ())
00068 return;
00069 }
00070
00071 this->supplier_ =
00072 RtecEventComm::PushSupplier::_duplicate (push_supplier);
00073 this->connected_ = 1;
00074 this->qos_ = qos;
00075
00076 #if TAO_EC_ENABLE_DEBUG_MESSAGES
00077 ACE_DEBUG ((LM_DEBUG,
00078 "Building filter for supplier <%x>.\n",
00079 this));
00080 #endif
00081 this->filter_ =
00082 this->event_channel_->supplier_filter_builder ()->create (this->qos_);
00083 this->filter_->bind (this);
00084 }
00085
00086
00087 this->event_channel_->connected (this ACE_ENV_ARG_PARAMETER);
00088 }
00089
00090 void
00091 TAO_EC_Default_ProxyPushConsumer::push (const RtecEventComm::EventSet& event
00092 ACE_ENV_ARG_DECL)
00093 ACE_THROW_SPEC ((CORBA::SystemException))
00094 {
00095 TAO_EC_ProxyPushConsumer_Guard ace_mon (this->lock_,
00096 this->refcount_,
00097 this->event_channel_,
00098 this);
00099 if (!ace_mon.locked ())
00100 return;
00101
00102 ace_mon.filter->push (event, this
00103 ACE_ENV_ARG_PARAMETER);
00104 ACE_CHECK;
00105 }
00106
00107 void
00108 TAO_EC_Default_ProxyPushConsumer::disconnect_push_consumer (
00109 ACE_ENV_SINGLE_ARG_DECL)
00110 ACE_THROW_SPEC ((CORBA::SystemException))
00111 {
00112 RtecEventComm::PushSupplier_var supplier;
00113 int connected = 0;
00114
00115 {
00116 ACE_GUARD_THROW_EX (
00117 ACE_Lock, ace_mon, *this->lock_,
00118 CORBA::INTERNAL ());
00119
00120 ACE_CHECK;
00121
00122 connected = this->is_connected_i ();
00123 supplier = this->supplier_._retn ();
00124 this->connected_ = 0;
00125
00126 if (connected)
00127 this->cleanup_i ();
00128 }
00129
00130
00131 this->event_channel_->disconnected (this ACE_ENV_ARG_PARAMETER);
00132 ACE_CHECK;
00133
00134 if (CORBA::is_nil (supplier.in ()))
00135 {
00136 return;
00137 }
00138
00139 if (this->event_channel_->disconnect_callbacks ())
00140 {
00141 ACE_TRY
00142 {
00143 supplier->disconnect_push_supplier (ACE_ENV_SINGLE_ARG_PARAMETER);
00144 ACE_TRY_CHECK;
00145 }
00146 ACE_CATCHANY
00147 {
00148
00149
00150 }
00151 ACE_ENDTRY;
00152 }
00153 }
00154
00155 PortableServer::POA_ptr
00156 TAO_EC_Default_ProxyPushConsumer::_default_POA (ACE_ENV_SINGLE_ARG_DECL_NOT_USED)
00157 {
00158 return PortableServer::POA::_duplicate (this->default_POA_.in ());
00159 }
00160
00161 void
00162 TAO_EC_Default_ProxyPushConsumer::_add_ref (ACE_ENV_SINGLE_ARG_DECL_NOT_USED)
00163 {
00164 this->_incr_refcnt ();
00165 }
00166
00167 void
00168 TAO_EC_Default_ProxyPushConsumer::_remove_ref (ACE_ENV_SINGLE_ARG_DECL_NOT_USED)
00169 {
00170 this->_decr_refcnt ();
00171 }
00172
00173 void
00174 TAO_EC_Default_ProxyPushConsumer::activate (
00175 RtecEventChannelAdmin::ProxyPushConsumer_ptr &proxy
00176 ACE_ENV_ARG_DECL)
00177 ACE_THROW_SPEC ((CORBA::SystemException))
00178 {
00179 proxy = this->_this (ACE_ENV_SINGLE_ARG_PARAMETER);
00180 ACE_CHECK;
00181 }
00182
00183 PortableServer::ObjectId
00184 TAO_EC_Default_ProxyPushConsumer::object_id (ACE_ENV_SINGLE_ARG_DECL)
00185 ACE_THROW_SPEC ((CORBA::SystemException))
00186 {
00187 PortableServer::ObjectId_var result =
00188 this->default_POA_->servant_to_id (this ACE_ENV_ARG_PARAMETER);
00189 return result.in ();
00190 }
00191
00192 TAO_END_VERSIONED_NAMESPACE_DECL