00001
00002
00003 #include "orbsvcs/CosEvent/CEC_Dispatching_Task.h"
00004
00005 #include "tao/ORB_Constants.h"
00006 #include "ace/OS_NS_errno.h"
00007
00008
00009 #if ! defined (__ACE_INLINE__)
00010 #include "orbsvcs/CosEvent/CEC_Dispatching_Task.inl"
00011 #endif
00012
00013 #include "ace/OS_NS_errno.h"
00014
00015 ACE_RCSID (CosEvent,
00016 CEC_Dispatching,
00017 "$Id: CEC_Dispatching_Task.cpp 76589 2007-01-25 18:04:11Z elliott_c $")
00018
00019 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00020
00021 int
00022 TAO_CEC_Dispatching_Task::svc (void)
00023 {
00024 int done = 0;
00025 while (!done)
00026 {
00027 try
00028 {
00029 ACE_Message_Block *mb;
00030 if (this->getq (mb) == -1)
00031 if (ACE_OS::last_error () == ESHUTDOWN)
00032 return 0;
00033 else
00034 ACE_ERROR ((LM_ERROR,
00035 "EC (%P|%t) getq error in Dispatching Queue\n"));
00036
00037 TAO_CEC_Dispatch_Command *command =
00038 dynamic_cast<TAO_CEC_Dispatch_Command*> (mb);
00039
00040 if (command == 0)
00041 {
00042 ACE_Message_Block::release (mb);
00043 continue;
00044 }
00045
00046 int result = command->execute ();
00047
00048 ACE_Message_Block::release (mb);
00049
00050 if (result == -1)
00051 done = 1;
00052 }
00053 catch (const CORBA::Exception& ex)
00054 {
00055 ex._tao_print_exception ("EC (%P|%t) exception in dispatching queue");
00056 }
00057 }
00058 return 0;
00059 }
00060
00061 void
00062 TAO_CEC_Dispatching_Task::push (TAO_CEC_ProxyPushSupplier *proxy,
00063 CORBA::Any& event)
00064 {
00065 if (this->allocator_ == 0)
00066 this->allocator_ = ACE_Allocator::instance ();
00067
00068 void* buf = this->allocator_->malloc (sizeof (TAO_CEC_Push_Command));
00069
00070 if (buf == 0)
00071 throw CORBA::NO_MEMORY (TAO::VMCID, CORBA::COMPLETED_NO);
00072
00073 ACE_Message_Block *mb =
00074 new (buf) TAO_CEC_Push_Command (proxy,
00075 event,
00076 this->data_block_.duplicate (),
00077 this->allocator_);
00078 this->putq (mb);
00079 }
00080
00081 #if defined (TAO_HAS_TYPED_EVENT_CHANNEL)
00082 void
00083 TAO_CEC_Dispatching_Task::invoke (TAO_CEC_ProxyPushSupplier *proxy,
00084 TAO_CEC_TypedEvent& typed_event)
00085 {
00086 if (this->allocator_ == 0)
00087 this->allocator_ = ACE_Allocator::instance ();
00088
00089 void* buf = this->allocator_->malloc (sizeof (TAO_CEC_Invoke_Command));
00090
00091 if (buf == 0)
00092 throw CORBA::NO_MEMORY (TAO::VMCID, CORBA::COMPLETED_NO);
00093
00094 ACE_Message_Block *mb =
00095 new (buf) TAO_CEC_Invoke_Command (proxy,
00096 typed_event,
00097 this->data_block_.duplicate (),
00098 this->allocator_);
00099 this->putq (mb);
00100 }
00101 #endif
00102
00103
00104
00105 TAO_CEC_Dispatch_Command::~TAO_CEC_Dispatch_Command (void)
00106 {
00107 }
00108
00109
00110
00111 int
00112 TAO_CEC_Shutdown_Task_Command::execute (void)
00113 {
00114 return -1;
00115 }
00116
00117
00118
00119 TAO_CEC_Push_Command::~TAO_CEC_Push_Command (void)
00120 {
00121 this->proxy_->_decr_refcnt ();
00122 }
00123
00124 int
00125 TAO_CEC_Push_Command::execute (void)
00126 {
00127 this->proxy_->push_to_consumer (this->event_);
00128 return 0;
00129 }
00130
00131
00132
00133 #if defined (TAO_HAS_TYPED_EVENT_CHANNEL)
00134 TAO_CEC_Invoke_Command::~TAO_CEC_Invoke_Command (void)
00135 {
00136 this->proxy_->_decr_refcnt ();
00137 }
00138
00139 int
00140 TAO_CEC_Invoke_Command::execute (void)
00141 {
00142 this->proxy_->invoke_to_consumer (this->typed_event_);
00143 return 0;
00144 }
00145 #endif
00146
00147 TAO_END_VERSIONED_NAMESPACE_DECL