EC_Dispatching_Task.cpp

Go to the documentation of this file.
00001 // EC_Dispatching_Task.cpp,v 1.26 2006/03/14 06:14:25 jtc Exp
00002 
00003 #include "orbsvcs/Event/EC_Dispatching_Task.h"
00004 #include "orbsvcs/Event/EC_ProxySupplier.h"
00005 #include "orbsvcs/Event/EC_Defaults.h"
00006 
00007 #include "tao/ORB_Constants.h"
00008 #include "ace/OS_NS_errno.h"
00009 #include "ace/OS_NS_strings.h"
00010 
00011 #if ! defined (__ACE_INLINE__)
00012 #include "orbsvcs/Event/EC_Dispatching_Task.i"
00013 #endif /* __ACE_INLINE__ */
00014 
00015 ACE_RCSID (Event,
00016            EC_Dispatching,
00017            "EC_Dispatching_Task.cpp,v 1.26 2006/03/14 06:14:25 jtc Exp")
00018 
00019 
00020 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00021 
00022 TAO_EC_Simple_Queue_Full_Action::TAO_EC_Simple_Queue_Full_Action (void)
00023   : queue_full_action_return_value_ (WAIT_TO_EMPTY)
00024 {
00025 }
00026 
00027 /// Helper function to register the default action into the service
00028 /// configurator.
00029 int
00030 TAO_EC_Simple_Queue_Full_Action::init_svcs (void)
00031 {
00032   return ACE_Service_Config::static_svcs ()->
00033     insert (&ace_svc_desc_TAO_EC_Simple_Queue_Full_Action);
00034 }
00035 
00036 int
00037 TAO_EC_Simple_Queue_Full_Action::init (int argc, char* argv[])
00038 {
00039   // Here we look at the args and set an internal flag indicating whether
00040   // the default action should be to wait for the queue to not be full
00041   // or whether it should be to silently discard the event.
00042 
00043   // @@ This should use the arg shifter stuff, but let's keep it simple to
00044   // start.
00045 
00046   do {
00047     if (argc == 0)
00048       break;
00049 
00050     if (ACE_OS::strcasecmp ("wait", argv[0]) == 0)
00051       this->queue_full_action_return_value_ = WAIT_TO_EMPTY;
00052     else if (ACE_OS::strcasecmp ("discard", argv[0]) == 0)
00053       this->queue_full_action_return_value_ = SILENTLY_DISCARD;
00054 #if 0
00055     else
00056       ;
00057     // probably ought to print an error message here
00058 #endif
00059   } while (0);
00060 
00061   return 0;
00062 }
00063 
00064 int
00065 TAO_EC_Simple_Queue_Full_Action::fini (void)
00066 {
00067   return 0;
00068 }
00069 
00070 int
00071 TAO_EC_Simple_Queue_Full_Action::queue_full_action (TAO_EC_Dispatching_Task * /*task*/,
00072                                                     TAO_EC_ProxyPushSupplier * /*proxy*/,
00073                                                     RtecEventComm::PushConsumer_ptr /*consumer*/,
00074                                                     RtecEventComm::EventSet& /*event*/
00075                                                     ACE_ENV_ARG_DECL_NOT_USED)
00076 {
00077   return this->queue_full_action_return_value_;
00078 }
00079 
00080 TAO_END_VERSIONED_NAMESPACE_DECL
00081 
00082 ACE_STATIC_SVC_DEFINE (TAO_EC_Simple_Queue_Full_Action,
00083                        ACE_TEXT (TAO_EC_DEFAULT_QUEUE_FULL_SERVICE_OBJECT_NAME),
00084                        ACE_SVC_OBJ_T,
00085                        &ACE_SVC_NAME (TAO_EC_Simple_Queue_Full_Action),
00086                        ACE_Service_Type::DELETE_THIS | ACE_Service_Type::DELETE_OBJ,
00087                        0)
00088 ACE_FACTORY_DEFINE (TAO_RTEvent_Serv, TAO_EC_Simple_Queue_Full_Action)
00089 
00090 
00091 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00092 int
00093 TAO_EC_Queue::is_full_i (void)
00094 {
00095   return static_cast<size_t> (this->cur_count_) > this->high_water_mark_;
00096 }
00097 
00098 // ****************************************************************
00099 
00100 int
00101 TAO_EC_Dispatching_Task::svc (void)
00102 {
00103   int done = 0;
00104   while (!done)
00105     {
00106       ACE_TRY_NEW_ENV
00107         {
00108           ACE_Message_Block *mb = 0;
00109           if (this->getq (mb) == -1)
00110             if (ACE_OS::last_error () == ESHUTDOWN)
00111               return 0;
00112           else
00113             ACE_ERROR ((LM_ERROR,
00114                         "EC (%P|%t) getq error in Dispatching Queue\n"));
00115 
00116           TAO_EC_Dispatch_Command *command =
00117             dynamic_cast<TAO_EC_Dispatch_Command*> (mb);
00118 
00119           if (command == 0)
00120             {
00121               ACE_Message_Block::release (mb);
00122               continue;
00123             }
00124 
00125           int result = command->execute (ACE_ENV_SINGLE_ARG_PARAMETER);
00126           ACE_TRY_CHECK;
00127 
00128           ACE_Message_Block::release (mb);
00129 
00130           if (result == -1)
00131             done = 1;
00132         }
00133       ACE_CATCHANY
00134         {
00135           ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,
00136                                "EC (%P|%t) exception in dispatching queue");
00137         }
00138       ACE_ENDTRY;
00139     }
00140   return 0;
00141 }
00142 
00143 void
00144 TAO_EC_Dispatching_Task::push (TAO_EC_ProxyPushSupplier *proxy,
00145                                RtecEventComm::PushConsumer_ptr consumer,
00146                                RtecEventComm::EventSet& event
00147                                ACE_ENV_ARG_DECL)
00148 {
00149   if (this->msg_queue()->is_full ())
00150     {
00151       int action =
00152         this->queue_full_service_object_->queue_full_action (this, proxy,
00153                                                              consumer, event
00154                                                              ACE_ENV_ARG_PARAMETER);
00155       ACE_CHECK;
00156 
00157       if (action == TAO_EC_Queue_Full_Service_Object::SILENTLY_DISCARD)
00158         return;
00159       // if action == WAIT_TO_EMPTY then we just go ahead and queue it
00160     }
00161 
00162   if (this->allocator_ == 0)
00163     this->allocator_ = ACE_Allocator::instance ();
00164 
00165   void* buf = this->allocator_->malloc (sizeof (TAO_EC_Push_Command));
00166 
00167   if (buf == 0)
00168     ACE_THROW (CORBA::NO_MEMORY (TAO::VMCID,
00169                                  CORBA::COMPLETED_NO));
00170 
00171   ACE_Message_Block *mb =
00172     new (buf) TAO_EC_Push_Command (proxy,
00173                                    consumer,
00174                                    event,
00175                                    this->data_block_.duplicate (),
00176                                    this->allocator_);
00177   this->putq (mb);
00178 }
00179 
00180 // ****************************************************************
00181 
00182 TAO_EC_Dispatch_Command::~TAO_EC_Dispatch_Command (void)
00183 {
00184 }
00185 
00186 // ****************************************************************
00187 
00188 int
00189 TAO_EC_Shutdown_Task_Command::execute (ACE_ENV_SINGLE_ARG_DECL_NOT_USED)
00190 {
00191   return -1;
00192 }
00193 
00194 // ****************************************************************
00195 
00196 TAO_EC_Push_Command::~TAO_EC_Push_Command (void)
00197 {
00198   this->proxy_->_decr_refcnt ();
00199 }
00200 
00201 int
00202 TAO_EC_Push_Command::execute (ACE_ENV_SINGLE_ARG_DECL)
00203 {
00204   this->proxy_->push_to_consumer (this->consumer_.in (),
00205                                   this->event_
00206                                    ACE_ENV_ARG_PARAMETER);
00207   return 0;
00208 }
00209 
00210 TAO_END_VERSIONED_NAMESPACE_DECL

Generated on Thu Nov 9 13:11:07 2006 for TAO_RTEvent by doxygen 1.3.6