00001 // SequenceProxyPushConsumer.cpp,v 1.17 2006/03/15 21:29:09 jtc Exp 00002 00003 #include "orbsvcs/Notify/Sequence/SequenceProxyPushConsumer.h" 00004 00005 ACE_RCSID (Notify, TAO_Notify_SequenceProxyPushConsumer, "SequenceProxyPushConsumer.cpp,v 1.17 2006/03/15 21:29:09 jtc Exp") 00006 00007 #include "tao/debug.h" 00008 #include "orbsvcs/Notify/Sequence/SequencePushSupplier.h" 00009 #include "orbsvcs/Notify/AdminProperties.h" 00010 #include "orbsvcs/Notify/Structured/StructuredEvent.h" 00011 #include "orbsvcs/Notify/Properties.h" 00012 00013 TAO_BEGIN_VERSIONED_NAMESPACE_DECL 00014 00015 TAO_Notify_SequenceProxyPushConsumer::TAO_Notify_SequenceProxyPushConsumer (void) 00016 :pacing_interval_ (CosNotification::PacingInterval) 00017 { 00018 } 00019 00020 TAO_Notify_SequenceProxyPushConsumer::~TAO_Notify_SequenceProxyPushConsumer () 00021 { 00022 } 00023 00024 void 00025 TAO_Notify_SequenceProxyPushConsumer::release (void) 00026 { 00027 delete this; 00028 //@@ inform factory 00029 } 00030 00031 CosNotifyChannelAdmin::ProxyType 00032 TAO_Notify_SequenceProxyPushConsumer::MyType (ACE_ENV_SINGLE_ARG_DECL_NOT_USED) 00033 ACE_THROW_SPEC (( 00034 CORBA::SystemException 00035 )) 00036 { 00037 return CosNotifyChannelAdmin::PUSH_SEQUENCE; 00038 } 00039 00040 void 00041 TAO_Notify_SequenceProxyPushConsumer::connect_sequence_push_supplier (CosNotifyComm::SequencePushSupplier_ptr push_supplier ACE_ENV_ARG_DECL) 00042 ACE_THROW_SPEC (( 00043 CORBA::SystemException 00044 , CosEventChannelAdmin::AlreadyConnected 00045 )) 00046 { 00047 // Convert Supplier to Base Type 00048 TAO_Notify_SequencePushSupplier *supplier; 00049 ACE_NEW_THROW_EX (supplier, 00050 TAO_Notify_SequencePushSupplier (this), 00051 CORBA::NO_MEMORY ()); 00052 00053 supplier->init (push_supplier ACE_ENV_ARG_PARAMETER); 00054 ACE_CHECK; 00055 00056 this->connect (supplier ACE_ENV_ARG_PARAMETER); 00057 ACE_CHECK; 00058 this->self_change (ACE_ENV_SINGLE_ARG_PARAMETER); 00059 } 00060 00061 void 00062 TAO_Notify_SequenceProxyPushConsumer::push_structured_events (const CosNotification::EventBatch& event_batch ACE_ENV_ARG_DECL) 00063 ACE_THROW_SPEC (( 00064 CORBA::SystemException 00065 , CosEventComm::Disconnected 00066 )) 00067 { 00068 // Check if we should proceed at all. 00069 if (this->admin_properties().reject_new_events () == 1 && this->admin_properties().queue_full ()) 00070 ACE_THROW (CORBA::IMP_LIMIT ()); 00071 00072 if (this->is_connected () == 0) 00073 { 00074 ACE_THROW (CosEventComm::Disconnected ()); 00075 } 00076 00077 for (CORBA::ULong i = 0; i < event_batch.length (); ++i) 00078 { 00079 const CosNotification::StructuredEvent& notification = event_batch[i]; 00080 00081 TAO_Notify_StructuredEvent_No_Copy event (notification); 00082 this->push_i (&event ACE_ENV_ARG_PARAMETER); 00083 ACE_CHECK; 00084 } 00085 } 00086 00087 void 00088 TAO_Notify_SequenceProxyPushConsumer::disconnect_sequence_push_consumer (ACE_ENV_SINGLE_ARG_DECL) 00089 ACE_THROW_SPEC (( 00090 CORBA::SystemException 00091 )) 00092 { 00093 TAO_Notify_SequenceProxyPushConsumer::Ptr guard( this ); 00094 this->destroy (ACE_ENV_SINGLE_ARG_PARAMETER); 00095 ACE_CHECK; 00096 this->self_change (ACE_ENV_SINGLE_ARG_PARAMETER); 00097 } 00098 00099 const char * 00100 TAO_Notify_SequenceProxyPushConsumer::get_proxy_type_name (void) const 00101 { 00102 return "sequence_proxy_push_consumer"; 00103 } 00104 00105 void 00106 TAO_Notify_SequenceProxyPushConsumer::load_attrs (const TAO_Notify::NVPList& attrs) 00107 { 00108 SuperClass::load_attrs(attrs); 00109 ACE_CString ior; 00110 if (attrs.load("PeerIOR", ior)) 00111 { 00112 CORBA::ORB_var orb = TAO_Notify_PROPERTIES::instance()->orb(); 00113 ACE_DECLARE_NEW_CORBA_ENV; 00114 ACE_TRY 00115 { 00116 CosNotifyComm::SequencePushSupplier_var ps = CosNotifyComm::SequencePushSupplier::_nil(); 00117 if ( ior.length() > 0 ) 00118 { 00119 CORBA::Object_var obj = orb->string_to_object(ior.c_str() ACE_ENV_ARG_PARAMETER); 00120 ACE_TRY_CHECK; 00121 ps = CosNotifyComm::SequencePushSupplier::_unchecked_narrow(obj.in() ACE_ENV_ARG_PARAMETER); 00122 ACE_TRY_CHECK; 00123 } 00124 // minor hack: suppress generating subscription updates during reload. 00125 bool save_updates = this->updates_off_; 00126 this->updates_off_ = true; 00127 this->connect_sequence_push_supplier(ps.in() ACE_ENV_ARG_PARAMETER); 00128 ACE_TRY_CHECK; 00129 this->updates_off_ = save_updates; 00130 } 00131 ACE_CATCHANY 00132 { 00133 ACE_ASSERT(0); 00134 } 00135 ACE_ENDTRY; 00136 } 00137 } 00138 00139 TAO_END_VERSIONED_NAMESPACE_DECL