00001
00002
00003 #include "orbsvcs/Event/EC_Default_ProxySupplier.h"
00004 #include "orbsvcs/Event/EC_Event_Channel_Base.h"
00005 #include "orbsvcs/Event/EC_Filter_Builder.h"
00006
00007 #include "ace/Reverse_Lock_T.h"
00008
00009 ACE_RCSID (Event,
00010 EC_ProxySupplier,
00011 "$Id: EC_Default_ProxySupplier.cpp 77001 2007-02-12 07:54:49Z johnnyw $")
00012
00013 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00014
00015 typedef ACE_Reverse_Lock<ACE_Lock> TAO_EC_Unlock;
00016
00017 TAO_EC_Default_ProxyPushSupplier::TAO_EC_Default_ProxyPushSupplier (
00018 TAO_EC_Event_Channel_Base* ec,
00019 int validate_connection)
00020 : TAO_EC_ProxyPushSupplier (ec, validate_connection)
00021 {
00022 }
00023
00024 TAO_EC_Default_ProxyPushSupplier::~TAO_EC_Default_ProxyPushSupplier (void)
00025 {
00026 }
00027
00028 void
00029 TAO_EC_Default_ProxyPushSupplier::connect_push_consumer (
00030 RtecEventComm::PushConsumer_ptr push_consumer,
00031 const RtecEventChannelAdmin::ConsumerQOS& qos)
00032 {
00033
00034 if (CORBA::is_nil (push_consumer))
00035 throw CORBA::BAD_PARAM ();
00036
00037 {
00038 ACE_GUARD_THROW_EX (
00039 ACE_Lock, ace_mon, *this->lock_,
00040 CORBA::INTERNAL ());
00041
00042
00043 if (this->is_connected_i ())
00044 {
00045 if (this->event_channel_->consumer_reconnect () == 0)
00046 throw RtecEventChannelAdmin::AlreadyConnected ();
00047
00048
00049 this->cleanup_i ();
00050
00051 this->consumer_ =
00052 RtecEventComm::PushConsumer::_duplicate (push_consumer);
00053 this->qos_ = qos;
00054 this->child_ =
00055 this->event_channel_->filter_builder ()->build (this,
00056 this->qos_);
00057
00058 this->adopt_child (this->child_);
00059
00060 TAO_EC_Unlock reverse_lock (*this->lock_);
00061
00062 {
00063 ACE_GUARD_THROW_EX (
00064 TAO_EC_Unlock, ace_mon, reverse_lock,
00065 CORBA::INTERNAL ());
00066
00067
00068 this->event_channel_->reconnected (this);
00069 }
00070
00071
00072
00073
00074 if (this->is_connected_i ())
00075 return;
00076 }
00077
00078 #if (TAO_HAS_CORBA_MESSAGING == 1)
00079 if ( consumer_validate_connection_ == 1 )
00080 {
00081
00082 CORBA::PolicyList_var unused;
00083 int status = push_consumer->_validate_connection (unused);
00084 #if TAO_EC_ENABLE_DEBUG_MESSAGES
00085 ACE_DEBUG ((LM_DEBUG, "Validated connection to PushConsumer on connect. Status[%d]\n", status));
00086 #else
00087 ACE_UNUSED_ARG(status);
00088 #endif
00089 }
00090 #endif
00091
00092 this->consumer_ =
00093 RtecEventComm::PushConsumer::_duplicate (push_consumer);
00094 this->qos_ = qos;
00095
00096 #if TAO_EC_ENABLE_DEBUG_MESSAGES
00097 ACE_DEBUG ((LM_DEBUG,
00098 "Building filters for consumer <%x>.\n",
00099 this));
00100 #endif
00101 this->child_ =
00102 this->event_channel_->filter_builder ()->build (this,
00103 this->qos_);
00104
00105 this->adopt_child (this->child_);
00106 }
00107
00108
00109 this->event_channel_->connected (this);
00110 }
00111
00112 void
00113 TAO_EC_Default_ProxyPushSupplier::disconnect_push_supplier ()
00114 {
00115 RtecEventComm::PushConsumer_var consumer;
00116 int connected = 0;
00117
00118 {
00119 ACE_GUARD_THROW_EX (
00120 ACE_Lock, ace_mon, *this->lock_,
00121 CORBA::INTERNAL ());
00122
00123
00124 connected = this->is_connected_i ();
00125 consumer = this->consumer_._retn ();
00126
00127 if (connected)
00128 this->cleanup_i ();
00129 }
00130
00131
00132 this->event_channel_->disconnected (this);
00133
00134 if (!connected)
00135 {
00136 return;
00137 }
00138
00139 if (this->event_channel_->disconnect_callbacks ())
00140 {
00141 try
00142 {
00143 consumer->disconnect_push_consumer ();
00144 }
00145 catch (const CORBA::Exception& ex)
00146 {
00147
00148
00149 ex._tao_print_exception ("ProxySupplier::disconnect_push_supplier");
00150 }
00151 }
00152 }
00153
00154 void
00155 TAO_EC_Default_ProxyPushSupplier::suspend_connection (void)
00156 {
00157 this->suspend_connection_locked ();
00158 }
00159
00160 void
00161 TAO_EC_Default_ProxyPushSupplier::resume_connection (void)
00162 {
00163 this->resume_connection_locked ();
00164 }
00165
00166 PortableServer::POA_ptr
00167 TAO_EC_Default_ProxyPushSupplier::_default_POA (void)
00168 {
00169 return PortableServer::POA::_duplicate (this->default_POA_.in ());
00170 }
00171
00172 void
00173 TAO_EC_Default_ProxyPushSupplier::_add_ref (void)
00174 {
00175 this->_incr_refcnt ();
00176 }
00177
00178 void
00179 TAO_EC_Default_ProxyPushSupplier::_remove_ref (void)
00180 {
00181 this->_decr_refcnt ();
00182 }
00183
00184 void
00185 TAO_EC_Default_ProxyPushSupplier::activate (
00186 RtecEventChannelAdmin::ProxyPushSupplier_ptr &proxy)
00187 {
00188 proxy = this->_this ();
00189 }
00190
00191 PortableServer::ObjectId
00192 TAO_EC_Default_ProxyPushSupplier::object_id (void)
00193 {
00194 PortableServer::ObjectId_var result =
00195 this->default_POA_->servant_to_id (this);
00196
00197 return result.in ();
00198 }
00199
00200 TAO_END_VERSIONED_NAMESPACE_DECL