00001
00002
00003 #include "orbsvcs/Event/EC_Dispatching_Task.h"
00004 #include "orbsvcs/Event/EC_ProxySupplier.h"
00005 #include "orbsvcs/Event/EC_Defaults.h"
00006
00007 #include "tao/ORB_Constants.h"
00008 #include "ace/OS_NS_errno.h"
00009 #include "ace/OS_NS_strings.h"
00010
00011 #if ! defined (__ACE_INLINE__)
00012 #include "orbsvcs/Event/EC_Dispatching_Task.inl"
00013 #endif
00014
00015 ACE_RCSID (Event,
00016 EC_Dispatching,
00017 "$Id: EC_Dispatching_Task.cpp 77613 2007-03-08 17:59:53Z fields_t $")
00018
00019
00020 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00021
00022 TAO_EC_Simple_Queue_Full_Action::TAO_EC_Simple_Queue_Full_Action (void)
00023 : queue_full_action_return_value_ (WAIT_TO_EMPTY)
00024 {
00025 }
00026
00027
00028
00029 int
00030 TAO_EC_Simple_Queue_Full_Action::init_svcs (void)
00031 {
00032 return ACE_Service_Config::static_svcs ()->
00033 insert (&ace_svc_desc_TAO_EC_Simple_Queue_Full_Action);
00034 }
00035
00036 int
00037 TAO_EC_Simple_Queue_Full_Action::init (int argc, char* argv[])
00038 {
00039
00040
00041
00042
00043
00044
00045
00046 do {
00047 if (argc == 0)
00048 break;
00049
00050 if (ACE_OS::strcasecmp ("wait", argv[0]) == 0)
00051 this->queue_full_action_return_value_ = WAIT_TO_EMPTY;
00052 else if (ACE_OS::strcasecmp ("discard", argv[0]) == 0)
00053 this->queue_full_action_return_value_ = SILENTLY_DISCARD;
00054 #if 0
00055 else
00056 ;
00057
00058 #endif
00059 } while (0);
00060
00061 return 0;
00062 }
00063
00064 int
00065 TAO_EC_Simple_Queue_Full_Action::fini (void)
00066 {
00067 return 0;
00068 }
00069
00070 int
00071 TAO_EC_Simple_Queue_Full_Action::queue_full_action (TAO_EC_Dispatching_Task * ,
00072 TAO_EC_ProxyPushSupplier * ,
00073 RtecEventComm::PushConsumer_ptr ,
00074 RtecEventComm::EventSet& )
00075 {
00076 return this->queue_full_action_return_value_;
00077 }
00078
00079 TAO_END_VERSIONED_NAMESPACE_DECL
00080
00081 ACE_STATIC_SVC_DEFINE (TAO_EC_Simple_Queue_Full_Action,
00082 ACE_TEXT (TAO_EC_DEFAULT_QUEUE_FULL_SERVICE_OBJECT_NAME),
00083 ACE_SVC_OBJ_T,
00084 &ACE_SVC_NAME (TAO_EC_Simple_Queue_Full_Action),
00085 ACE_Service_Type::DELETE_THIS | ACE_Service_Type::DELETE_OBJ,
00086 0)
00087 ACE_FACTORY_DEFINE (TAO_RTEvent_Serv, TAO_EC_Simple_Queue_Full_Action)
00088
00089
00090 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00091 int
00092 TAO_EC_Queue::is_full_i (void)
00093 {
00094 return static_cast<size_t> (this->cur_count_) > this->high_water_mark_;
00095 }
00096
00097
00098
00099 int
00100 TAO_EC_Dispatching_Task::svc (void)
00101 {
00102 int done = 0;
00103 while (!done)
00104 {
00105 try
00106 {
00107 ACE_Message_Block *mb = 0;
00108 if (this->getq (mb) == -1)
00109 if (ACE_OS::last_error () == ESHUTDOWN)
00110 return 0;
00111 else
00112 ACE_ERROR ((LM_ERROR,
00113 "EC (%P|%t) getq error in Dispatching Queue\n"));
00114
00115 TAO_EC_Dispatch_Command *command =
00116 dynamic_cast<TAO_EC_Dispatch_Command*> (mb);
00117
00118 if (command == 0)
00119 {
00120 ACE_Message_Block::release (mb);
00121 continue;
00122 }
00123
00124 int result = command->execute ();
00125
00126 ACE_Message_Block::release (mb);
00127
00128 if (result == -1)
00129 done = 1;
00130 }
00131 catch (const CORBA::Exception& ex)
00132 {
00133 ex._tao_print_exception ("EC (%P|%t) exception in dispatching queue");
00134 }
00135 }
00136 return 0;
00137 }
00138
00139 void
00140 TAO_EC_Dispatching_Task::push (TAO_EC_ProxyPushSupplier *proxy,
00141 RtecEventComm::PushConsumer_ptr consumer,
00142 RtecEventComm::EventSet& event)
00143 {
00144 if (this->msg_queue()->is_full ())
00145 {
00146 if (0 != this->queue_full_service_object_)
00147 {
00148 int action =
00149 this->queue_full_service_object_->queue_full_action (this, proxy,
00150 consumer, event);
00151
00152 if (action == TAO_EC_Queue_Full_Service_Object::SILENTLY_DISCARD)
00153 return;
00154
00155 }
00156
00157 }
00158
00159 if (this->allocator_ == 0)
00160 this->allocator_ = ACE_Allocator::instance ();
00161
00162 void* buf = this->allocator_->malloc (sizeof (TAO_EC_Push_Command));
00163
00164 if (buf == 0)
00165 throw CORBA::NO_MEMORY (TAO::VMCID, CORBA::COMPLETED_NO);
00166
00167 ACE_Message_Block *mb =
00168 new (buf) TAO_EC_Push_Command (proxy,
00169 consumer,
00170 event,
00171 this->data_block_.duplicate (),
00172 this->allocator_);
00173 this->putq (mb);
00174 }
00175
00176
00177
00178 TAO_EC_Dispatch_Command::~TAO_EC_Dispatch_Command (void)
00179 {
00180 }
00181
00182
00183
00184 int
00185 TAO_EC_Shutdown_Task_Command::execute (void)
00186 {
00187 return -1;
00188 }
00189
00190
00191
00192 TAO_EC_Push_Command::~TAO_EC_Push_Command (void)
00193 {
00194 this->proxy_->_decr_refcnt ();
00195 }
00196
00197 int
00198 TAO_EC_Push_Command::execute (void)
00199 {
00200 this->proxy_->push_to_consumer (this->consumer_.in (),
00201 this->event_);
00202 return 0;
00203 }
00204
00205 TAO_END_VERSIONED_NAMESPACE_DECL