00001
00002
00003 #include "orbsvcs/Event/EC_Per_Supplier_Filter.h"
00004 #include "orbsvcs/Event/EC_Event_Channel_Base.h"
00005 #include "orbsvcs/Event/EC_ProxySupplier.h"
00006 #include "orbsvcs/Event/EC_ProxyConsumer.h"
00007 #include "orbsvcs/Event/EC_Scheduling_Strategy.h"
00008 #include "orbsvcs/Event/EC_QOS_Info.h"
00009
00010 #include "orbsvcs/ESF/ESF_Proxy_Collection.h"
00011
00012 #include "orbsvcs/Event_Service_Constants.h"
00013
00014 #if ! defined (__ACE_INLINE__)
00015 #include "orbsvcs/Event/EC_Per_Supplier_Filter.inl"
00016 #endif
00017
00018 ACE_RCSID(Event, EC_Per_Supplier_Filter, "$Id: EC_Per_Supplier_Filter.cpp 76626 2007-01-26 13:50:03Z elliott_c $")
00019
00020 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00021
00022 TAO_EC_Per_Supplier_Filter::
00023 TAO_EC_Per_Supplier_Filter (TAO_EC_Event_Channel_Base* ec)
00024 : event_channel_ (ec),
00025 consumer_ (0),
00026 refcnt_ (1)
00027 {
00028 this->event_channel_->create_proxy_collection (this->collection_);
00029 }
00030
00031 TAO_EC_Per_Supplier_Filter::~TAO_EC_Per_Supplier_Filter (void)
00032 {
00033 this->event_channel_->destroy_proxy_collection (this->collection_);
00034 this->collection_ = 0;
00035 }
00036
00037 void
00038 TAO_EC_Per_Supplier_Filter::bind (TAO_EC_ProxyPushConsumer* consumer)
00039 {
00040 ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->lock_);
00041
00042 if (this->consumer_ != 0)
00043 return;
00044
00045 this->consumer_ = consumer;
00046 }
00047
00048 void
00049 TAO_EC_Per_Supplier_Filter::unbind (TAO_EC_ProxyPushConsumer* consumer)
00050 {
00051 ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->lock_);
00052
00053 if (this->consumer_ == 0 || this->consumer_ != consumer)
00054 return;
00055
00056 this->consumer_ = 0;
00057
00058 try
00059 {
00060 this->shutdown ();
00061 }
00062 catch (const CORBA::Exception&)
00063 {
00064
00065 }
00066 }
00067
00068 void
00069 TAO_EC_Per_Supplier_Filter::connected (TAO_EC_ProxyPushSupplier* supplier)
00070 {
00071 ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->lock_);
00072
00073 if (this->consumer_ == 0)
00074 return;
00075
00076 const RtecEventChannelAdmin::SupplierQOS& pub =
00077 this->consumer_->publications_i ();
00078
00079 for (CORBA::ULong j = 0; j < pub.publications.length (); ++j)
00080 {
00081 const RtecEventComm::Event& event =
00082 pub.publications[j].event;
00083
00084 #if TAO_EC_ENABLE_DEBUG_MESSAGES
00085 ACE_DEBUG ((LM_DEBUG, "Connecting consumer <%x> to <%x>, "
00086 "trying event <%d:%d> ",
00087 supplier, this,
00088 event.header.source, event.header.type));
00089 #endif
00090 if (supplier->can_match (event.header))
00091 {
00092 #if TAO_EC_ENABLE_DEBUG_MESSAGES
00093 ACE_DEBUG ((LM_DEBUG, " matched\n"));
00094 #endif
00095 this->collection_->connected (supplier);
00096 return;
00097 }
00098 #if TAO_EC_ENABLE_DEBUG_MESSAGES
00099 ACE_DEBUG ((LM_DEBUG, " not matched\n"));
00100 #endif
00101 }
00102 }
00103
00104 void
00105 TAO_EC_Per_Supplier_Filter::reconnected (TAO_EC_ProxyPushSupplier* supplier)
00106 {
00107 ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->lock_);
00108
00109 if (this->consumer_ == 0)
00110 return;
00111
00112 const RtecEventChannelAdmin::SupplierQOS& pub =
00113 this->consumer_->publications_i ();
00114
00115 for (CORBA::ULong j = 0; j < pub.publications.length (); ++j)
00116 {
00117 const RtecEventComm::Event& event =
00118 pub.publications[j].event;
00119
00120
00121
00122
00123 if (supplier->can_match (event.header))
00124 {
00125
00126 this->collection_->connected (supplier);
00127 return;
00128 }
00129 }
00130 this->collection_->disconnected (supplier);
00131 }
00132
00133 void
00134 TAO_EC_Per_Supplier_Filter::disconnected (TAO_EC_ProxyPushSupplier* supplier)
00135 {
00136 this->collection_->disconnected (supplier);
00137 }
00138
00139 void
00140 TAO_EC_Per_Supplier_Filter::shutdown (void)
00141 {
00142 this->collection_->shutdown ();
00143 }
00144
00145 void
00146 TAO_EC_Per_Supplier_Filter::push (const RtecEventComm::EventSet& event,
00147 TAO_EC_ProxyPushConsumer *consumer)
00148 {
00149 TAO_EC_Scheduling_Strategy* scheduling_strategy =
00150 this->event_channel_->scheduling_strategy ();
00151 scheduling_strategy->schedule_event (event,
00152 consumer,
00153 this);
00154 }
00155
00156 void
00157 TAO_EC_Per_Supplier_Filter::push_scheduled_event (RtecEventComm::EventSet &event,
00158 const TAO_EC_QOS_Info &event_info)
00159 {
00160 TAO_EC_Filter_Worker worker (event, event_info);
00161 this->collection_->for_each (&worker);
00162 }
00163
00164 CORBA::ULong
00165 TAO_EC_Per_Supplier_Filter::_incr_refcnt (void)
00166 {
00167 ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, this->lock_, 0);
00168
00169 this->refcnt_++;
00170 return this->refcnt_;
00171 }
00172
00173 CORBA::ULong
00174 TAO_EC_Per_Supplier_Filter::_decr_refcnt (void)
00175 {
00176 {
00177 ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, this->lock_, 0);
00178
00179 this->refcnt_--;
00180 if (this->refcnt_ != 0)
00181 return this->refcnt_;
00182 }
00183 this->event_channel_->supplier_filter_builder ()->destroy (this);
00184 return 0;
00185 }
00186
00187
00188
00189 TAO_EC_Supplier_Filter*
00190 TAO_EC_Per_Supplier_Filter_Builder::create (
00191 RtecEventChannelAdmin::SupplierQOS&)
00192 {
00193 return new TAO_EC_Per_Supplier_Filter (this->event_channel_);
00194 }
00195
00196 void
00197 TAO_EC_Per_Supplier_Filter_Builder::destroy (
00198 TAO_EC_Supplier_Filter* x)
00199 {
00200 delete x;
00201 }
00202
00203 TAO_END_VERSIONED_NAMESPACE_DECL