00001
00002
00003 #include "orbsvcs/CosEvent/CEC_Dispatching_Task.h"
00004
00005 #include "tao/ORB_Constants.h"
00006 #include "ace/OS_NS_errno.h"
00007
00008
00009 #if ! defined (__ACE_INLINE__)
00010 #include "orbsvcs/CosEvent/CEC_Dispatching_Task.i"
00011 #endif
00012
00013 #include "ace/OS_NS_errno.h"
00014
00015 ACE_RCSID (CosEvent,
00016 CEC_Dispatching,
00017 "CEC_Dispatching_Task.cpp,v 1.17 2006/03/14 06:14:24 jtc Exp")
00018
00019 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00020
00021 int
00022 TAO_CEC_Dispatching_Task::svc (void)
00023 {
00024 int done = 0;
00025 while (!done)
00026 {
00027 ACE_TRY_NEW_ENV
00028 {
00029 ACE_Message_Block *mb;
00030 if (this->getq (mb) == -1)
00031 if (ACE_OS::last_error () == ESHUTDOWN)
00032 return 0;
00033 else
00034 ACE_ERROR ((LM_ERROR,
00035 "EC (%P|%t) getq error in Dispatching Queue\n"));
00036
00037 TAO_CEC_Dispatch_Command *command =
00038 dynamic_cast<TAO_CEC_Dispatch_Command*> (mb);
00039
00040 if (command == 0)
00041 {
00042 ACE_Message_Block::release (mb);
00043 continue;
00044 }
00045
00046 int result = command->execute (ACE_ENV_SINGLE_ARG_PARAMETER);
00047 ACE_TRY_CHECK;
00048
00049 ACE_Message_Block::release (mb);
00050
00051 if (result == -1)
00052 done = 1;
00053 }
00054 ACE_CATCHANY
00055 {
00056 ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,
00057 "EC (%P|%t) exception in dispatching queue");
00058 }
00059 ACE_ENDTRY;
00060 }
00061 return 0;
00062 }
00063
00064 void
00065 TAO_CEC_Dispatching_Task::push (TAO_CEC_ProxyPushSupplier *proxy,
00066 CORBA::Any& event
00067 ACE_ENV_ARG_DECL)
00068 {
00069 if (this->allocator_ == 0)
00070 this->allocator_ = ACE_Allocator::instance ();
00071
00072 void* buf = this->allocator_->malloc (sizeof (TAO_CEC_Push_Command));
00073
00074 if (buf == 0)
00075 ACE_THROW (CORBA::NO_MEMORY (TAO::VMCID,
00076 CORBA::COMPLETED_NO));
00077
00078 ACE_Message_Block *mb =
00079 new (buf) TAO_CEC_Push_Command (proxy,
00080 event,
00081 this->data_block_.duplicate (),
00082 this->allocator_);
00083 this->putq (mb);
00084 }
00085
00086 #if defined (TAO_HAS_TYPED_EVENT_CHANNEL)
00087 void
00088 TAO_CEC_Dispatching_Task::invoke (TAO_CEC_ProxyPushSupplier *proxy,
00089 TAO_CEC_TypedEvent& typed_event
00090 ACE_ENV_ARG_DECL)
00091 {
00092 if (this->allocator_ == 0)
00093 this->allocator_ = ACE_Allocator::instance ();
00094
00095 void* buf = this->allocator_->malloc (sizeof (TAO_CEC_Invoke_Command));
00096
00097 if (buf == 0)
00098 ACE_THROW (CORBA::NO_MEMORY (TAO::VMCID,
00099 CORBA::COMPLETED_NO));
00100
00101 ACE_Message_Block *mb =
00102 new (buf) TAO_CEC_Invoke_Command (proxy,
00103 typed_event,
00104 this->data_block_.duplicate (),
00105 this->allocator_);
00106 this->putq (mb);
00107 }
00108 #endif
00109
00110
00111
00112 TAO_CEC_Dispatch_Command::~TAO_CEC_Dispatch_Command (void)
00113 {
00114 }
00115
00116
00117
00118 int
00119 TAO_CEC_Shutdown_Task_Command::execute (ACE_ENV_SINGLE_ARG_DECL_NOT_USED)
00120 {
00121 return -1;
00122 }
00123
00124
00125
00126 TAO_CEC_Push_Command::~TAO_CEC_Push_Command (void)
00127 {
00128 this->proxy_->_decr_refcnt ();
00129 }
00130
00131 int
00132 TAO_CEC_Push_Command::execute (ACE_ENV_SINGLE_ARG_DECL)
00133 {
00134 this->proxy_->push_to_consumer (this->event_ ACE_ENV_ARG_PARAMETER);
00135 ACE_CHECK_RETURN (-1);
00136 return 0;
00137 }
00138
00139
00140
00141 #if defined (TAO_HAS_TYPED_EVENT_CHANNEL)
00142 TAO_CEC_Invoke_Command::~TAO_CEC_Invoke_Command (void)
00143 {
00144 this->proxy_->_decr_refcnt ();
00145 }
00146
00147 int
00148 TAO_CEC_Invoke_Command::execute (ACE_ENV_SINGLE_ARG_DECL)
00149 {
00150 this->proxy_->invoke_to_consumer (this->typed_event_ ACE_ENV_ARG_PARAMETER);
00151 ACE_CHECK_RETURN (-1);
00152 return 0;
00153 }
00154 #endif
00155
00156 TAO_END_VERSIONED_NAMESPACE_DECL