00001
00002
00003 #include "orbsvcs/AV/TCP.h"
00004 #include "orbsvcs/AV/AVStreams_i.h"
00005
00006 #include "tao/debug.h"
00007 #include "ace/OS_NS_strings.h"
00008
00009 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00010
00011
00012
00013
00014
00015 TAO_AV_TCP_Transport::TAO_AV_TCP_Transport (void)
00016 :handler_ (0)
00017 {
00018 }
00019
00020 TAO_AV_TCP_Transport::TAO_AV_TCP_Transport (TAO_AV_TCP_Flow_Handler *handler)
00021 :handler_ (handler)
00022 {
00023 }
00024
00025 TAO_AV_TCP_Transport::~TAO_AV_TCP_Transport (void)
00026 {
00027 }
00028
00029 int
00030 TAO_AV_TCP_Transport::open (ACE_Addr * )
00031 {
00032 return 0;
00033 }
00034
00035 int
00036 TAO_AV_TCP_Transport::close (void)
00037 {
00038 return 0;
00039 }
00040
00041 int
00042 TAO_AV_TCP_Transport::mtu (void)
00043 {
00044 return -1;
00045 }
00046
00047 ACE_Addr*
00048 TAO_AV_TCP_Transport::get_peer_addr (void)
00049 {
00050 return 0;
00051 }
00052
00053 ssize_t
00054 TAO_AV_TCP_Transport::send (const ACE_Message_Block *mblk, ACE_Time_Value *)
00055 {
00056
00057
00058
00059 iovec iov[ACE_IOV_MAX];
00060 int iovcnt = 0;
00061 ssize_t n = 0;
00062 ssize_t nbytes = 0;
00063
00064 for (const ACE_Message_Block *i = mblk;
00065 i != 0;
00066 i = i->cont ())
00067 {
00068
00069 if (i->length () > 0)
00070 {
00071 iov[iovcnt].iov_base = i->rd_ptr ();
00072 iov[iovcnt].iov_len = static_cast<u_long> (i->length ());
00073 iovcnt++;
00074
00075
00076
00077
00078
00079
00080
00081 if (iovcnt == ACE_IOV_MAX)
00082 {
00083 n = this->handler_->peer ().sendv_n ((const iovec *) iov,
00084 iovcnt);
00085 if (n < 1)
00086 return n;
00087
00088 nbytes += n;
00089 iovcnt = 0;
00090 }
00091 }
00092 }
00093
00094
00095 if (iovcnt != 0)
00096 {
00097 n = this->handler_->peer ().sendv_n ((const iovec *) iov,
00098 iovcnt);
00099 if (n < 1)
00100 return n;
00101
00102 nbytes += n;
00103 }
00104
00105 return nbytes;
00106 }
00107
00108 ssize_t
00109 TAO_AV_TCP_Transport::send (const char *buf,
00110 size_t len,
00111 ACE_Time_Value *)
00112 {
00113 return this->handler_->peer ().send_n (buf, len);
00114 }
00115
00116 ssize_t
00117 TAO_AV_TCP_Transport::send (const iovec *iov,
00118 int iovcnt,
00119 ACE_Time_Value *)
00120 {
00121 return this->handler_->peer ().sendv_n ((const iovec *) iov,
00122 iovcnt);
00123 }
00124
00125 ssize_t
00126 TAO_AV_TCP_Transport::recv (char *buf,
00127 size_t len,
00128 ACE_Time_Value *)
00129 {
00130 return this->handler_->peer ().recv (buf, len);
00131 }
00132
00133 ssize_t
00134 TAO_AV_TCP_Transport::recv (char *buf,
00135 size_t len,
00136 int flags,
00137 ACE_Time_Value *)
00138 {
00139 return this->handler_->peer ().recv (buf,
00140 len,
00141 flags);
00142 }
00143
00144 ssize_t
00145 TAO_AV_TCP_Transport::recv (iovec *iov,
00146 int iovcnt,
00147 ACE_Time_Value *)
00148 {
00149 return handler_->peer ().recvv_n (iov, iovcnt);
00150 }
00151
00152
00153
00154
00155
00156 TAO_AV_TCP_Factory::TAO_AV_TCP_Factory (void)
00157 {
00158 }
00159
00160 TAO_AV_TCP_Factory::~TAO_AV_TCP_Factory (void)
00161 {
00162 }
00163
00164 int
00165 TAO_AV_TCP_Factory::init (int ,
00166 char * [])
00167 {
00168 return 0;
00169 }
00170
00171 int
00172 TAO_AV_TCP_Factory::match_protocol (const char *protocol_string)
00173 {
00174 if (ACE_OS::strcasecmp (protocol_string,"TCP") == 0)
00175 return 1;
00176 return 0;
00177 }
00178
00179 TAO_AV_Acceptor*
00180 TAO_AV_TCP_Factory::make_acceptor (void)
00181 {
00182 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"TAO_AV_TCP_Factory::make_acceptor "));
00183 TAO_AV_Acceptor *acceptor = 0;
00184 ACE_NEW_RETURN (acceptor,
00185 TAO_AV_TCP_Acceptor,
00186 0);
00187 return acceptor;
00188 }
00189
00190 TAO_AV_Connector*
00191 TAO_AV_TCP_Factory::make_connector (void)
00192 {
00193 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"TAO_AV_TCP_Factory::make_connector "));
00194 TAO_AV_Connector *connector = 0;
00195 ACE_NEW_RETURN (connector,
00196 TAO_AV_TCP_Connector,
00197 0);
00198 return connector;
00199 }
00200
00201
00202
00203
00204
00205 int
00206 TAO_AV_TCP_Object::handle_input (void)
00207 {
00208 int n = this->transport_->recv (this->frame_.rd_ptr (),
00209 this->frame_.size ());
00210 if (n == -1)
00211 ACE_ERROR_RETURN ((LM_ERROR,"TAO_AV_TCP_Flow_Handler::handle_input recv failed\n"),-1);
00212 if (n == 0)
00213 ACE_ERROR_RETURN ((LM_DEBUG,"TAO_AV_TCP_Flow_Handler::handle_input connection closed\n"),-1);
00214 this->frame_.wr_ptr (this->frame_.rd_ptr () + n);
00215
00216 return this->callback_->receive_frame (&this->frame_);
00217 }
00218
00219 int
00220 TAO_AV_TCP_Object::send_frame (ACE_Message_Block *frame,
00221 TAO_AV_frame_info * )
00222 {
00223 int result = this->transport_->send (frame);
00224 return result;
00225 }
00226
00227 int
00228 TAO_AV_TCP_Object::send_frame (const iovec *iov,
00229 int iovcnt,
00230 TAO_AV_frame_info * )
00231 {
00232 return this->transport_->send (iov,iovcnt);
00233 }
00234
00235 int
00236 TAO_AV_TCP_Object::send_frame (const char*buf,
00237 size_t len)
00238 {
00239 int result = this->transport_->send (buf, len, 0);
00240 return result;
00241 }
00242
00243
00244 TAO_AV_TCP_Object::TAO_AV_TCP_Object (TAO_AV_Callback *callback,
00245 TAO_AV_Transport *transport)
00246 :TAO_AV_Protocol_Object (callback,transport)
00247 {
00248
00249 this->frame_.size (BUFSIZ);
00250 }
00251
00252 TAO_AV_TCP_Object::~TAO_AV_TCP_Object (void)
00253 {
00254
00255 }
00256 int
00257 TAO_AV_TCP_Object::destroy (void)
00258 {
00259 this->callback_->handle_destroy ();
00260 delete this;
00261 return 0;
00262 }
00263
00264
00265
00266
00267 TAO_AV_TCP_Flow_Factory::TAO_AV_TCP_Flow_Factory (void)
00268 {
00269 }
00270
00271 TAO_AV_TCP_Flow_Factory::~TAO_AV_TCP_Flow_Factory (void)
00272 {
00273 }
00274
00275 int
00276 TAO_AV_TCP_Flow_Factory::init (int ,
00277 char * [])
00278 {
00279 return 0;
00280 }
00281
00282 int
00283 TAO_AV_TCP_Flow_Factory::match_protocol (const char *flow_string)
00284 {
00285 if (ACE_OS::strcasecmp (flow_string,"TCP") == 0)
00286 return 1;
00287 return 0;
00288 }
00289
00290 TAO_AV_Protocol_Object*
00291 TAO_AV_TCP_Flow_Factory::make_protocol_object (TAO_FlowSpec_Entry *entry,
00292 TAO_Base_StreamEndPoint *endpoint,
00293 TAO_AV_Flow_Handler *handler,
00294 TAO_AV_Transport *transport)
00295 {
00296 TAO_AV_Callback *callback = 0;
00297 if (endpoint->get_callback (entry->flowname (), callback))
00298 {
00299 ACE_ERROR_RETURN ((LM_ERROR, "(%N,%l) Invalid callback\n"), 0);
00300 }
00301
00302 TAO_AV_TCP_Object *object = 0;
00303 ACE_NEW_RETURN (object,
00304 TAO_AV_TCP_Object (callback,
00305 transport),
00306 0);
00307 callback->open (object,
00308 handler);
00309 endpoint->set_protocol_object (entry->flowname (),
00310 object);
00311
00312 endpoint->protocol_object_set ();
00313
00314 return object;
00315 }
00316
00317
00318
00319
00320
00321 int
00322 TAO_AV_TCP_Base_Connector::connector_open (TAO_AV_TCP_Connector *connector,
00323 ACE_Reactor *reactor)
00324 {
00325 this->connector_ = connector;
00326 this->reactor_ = reactor;
00327
00328 int result = ACE_Connector <TAO_AV_TCP_Flow_Handler,ACE_SOCK_CONNECTOR>::open (reactor);
00329 if (result < 0)
00330 ACE_ERROR_RETURN ((LM_ERROR,"TAO_AV_TCP_Base_Connector::open failed\n"),-1);
00331 return 0;
00332 }
00333
00334 int
00335 TAO_AV_TCP_Base_Connector::make_svc_handler (TAO_AV_TCP_Flow_Handler *&tcp_handler)
00336 {
00337 int const result =
00338 this->connector_->make_svc_handler (tcp_handler);
00339 if (result < 0)
00340 return result;
00341 tcp_handler->reactor (this->reactor_);
00342 return 0;
00343 }
00344
00345 int
00346 TAO_AV_TCP_Base_Connector::connector_connect (TAO_AV_TCP_Flow_Handler *&handler,
00347 const ACE_INET_Addr &remote_addr)
00348 {
00349 int const result =
00350 ACE_Connector <TAO_AV_TCP_Flow_Handler,ACE_SOCK_CONNECTOR>::connect (handler,
00351 remote_addr);
00352 if (result < 0)
00353 ACE_ERROR_RETURN ((LM_ERROR,"TAO_AV_TCP_Base_Connector::connect failed\n"),-1);
00354 return 0;
00355 }
00356
00357
00358
00359
00360 TAO_AV_TCP_Connector::TAO_AV_TCP_Connector (void)
00361 {
00362 }
00363
00364 TAO_AV_TCP_Connector::~TAO_AV_TCP_Connector (void)
00365 {
00366 }
00367
00368 int
00369 TAO_AV_TCP_Connector::make_svc_handler (TAO_AV_TCP_Flow_Handler *&tcp_handler)
00370 {
00371 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"TAO_AV_TCP_Connector::make_svc_handler\n"));
00372
00373 if (this->endpoint_ != 0)
00374 {
00375
00376
00377 ACE_NEW_RETURN (tcp_handler,
00378
00379 TAO_AV_TCP_Flow_Handler,
00380 -1);
00381 TAO_AV_Protocol_Object *object =
00382 this->flow_protocol_factory_->make_protocol_object (this->entry_,
00383 this->endpoint_,
00384 tcp_handler,
00385 tcp_handler->transport ());
00386 tcp_handler->protocol_object (object);
00387
00388
00389
00390 this->endpoint_->set_flow_handler (this->flowname_.c_str (),tcp_handler);
00391 this->entry_->protocol_object (object);
00392 this->entry_->handler (tcp_handler);
00393 }
00394 return 0;
00395 }
00396
00397 int
00398 TAO_AV_TCP_Connector::open (TAO_Base_StreamEndPoint *endpoint,
00399 TAO_AV_Core *av_core,
00400 TAO_AV_Flow_Protocol_Factory *factory)
00401
00402 {
00403 this->endpoint_ = endpoint;
00404 this->flow_protocol_factory_ = factory;
00405 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"TAO_AV_TCP_Connector::open "));
00406 int result = this->connector_.connector_open(this,
00407 av_core->reactor ());
00408 return result;
00409 }
00410
00411 int
00412 TAO_AV_TCP_Connector::connect (TAO_FlowSpec_Entry *entry,
00413 TAO_AV_Transport *&transport,
00414 TAO_AV_Core::Flow_Component flow_comp)
00415 {
00416 this->entry_ = entry;
00417 if (flow_comp == TAO_AV_Core::TAO_AV_CONTROL)
00418 this->flowname_ = TAO_AV_Core::get_control_flowname (entry->flowname ());
00419 else
00420 this->flowname_ = entry->flowname ();
00421 ACE_Addr *remote_addr = entry->address ();
00422 ACE_INET_Addr *inet_addr = dynamic_cast<ACE_INET_Addr *> (remote_addr);
00423 TAO_AV_TCP_Flow_Handler *handler = 0;
00424 int result = this->connector_.connector_connect (handler,
00425 *inet_addr);
00426 if (result < 0)
00427 ACE_ERROR_RETURN ((LM_ERROR,"TAO_AV_TCP_connector::connect failed\n"),-1);
00428 entry->handler (handler);
00429 transport = handler->transport ();
00430 return 0;
00431 }
00432
00433 int
00434 TAO_AV_TCP_Connector::close (void)
00435 {
00436 return 0;
00437 }
00438
00439
00440
00441
00442
00443 int
00444 TAO_AV_TCP_Base_Acceptor::acceptor_open (TAO_AV_TCP_Acceptor *acceptor,
00445 ACE_Reactor *reactor,
00446 const ACE_INET_Addr &local_addr,
00447 TAO_FlowSpec_Entry *entry)
00448 {
00449 this->acceptor_ = acceptor;
00450 this->reactor_ = reactor;
00451 this->entry_ = entry;
00452
00453 int const result =
00454 ACE_Acceptor <TAO_AV_TCP_Flow_Handler,ACE_SOCK_ACCEPTOR>::open (local_addr,reactor);
00455 if (result < 0)
00456 ACE_ERROR_RETURN ((LM_ERROR,"TAO_AV_TCP_Base_Connector::open failed\n"),-1);
00457 return 0;
00458 }
00459
00460 int
00461 TAO_AV_TCP_Base_Acceptor::make_svc_handler (TAO_AV_TCP_Flow_Handler *&handler)
00462 {
00463 int const result = this->acceptor_->make_svc_handler (handler);
00464 if (result < 0)
00465 return result;
00466 handler->reactor (this->reactor_);
00467 this->entry_->handler (handler);
00468 return 0;
00469 }
00470
00471
00472
00473
00474
00475
00476 TAO_AV_TCP_Acceptor::TAO_AV_TCP_Acceptor (void)
00477 {
00478 }
00479
00480 TAO_AV_TCP_Acceptor::~TAO_AV_TCP_Acceptor (void)
00481 {
00482 }
00483
00484 int
00485 TAO_AV_TCP_Acceptor::make_svc_handler (TAO_AV_TCP_Flow_Handler *&tcp_handler)
00486 {
00487 if (TAO_debug_level > 0)
00488 ACE_DEBUG ((LM_DEBUG,
00489 "TAO_AV_TCP_Acceptor::make_svc_handler\n"
00490 ));
00491
00492 if (this->endpoint_ != 0)
00493 {
00494 ACE_NEW_RETURN (tcp_handler,
00495 TAO_AV_TCP_Flow_Handler,
00496 -1);
00497
00498 TAO_AV_Protocol_Object *object =
00499 this->flow_protocol_factory_->make_protocol_object (this->entry_,
00500 this->endpoint_,
00501 tcp_handler,
00502 tcp_handler->transport ());
00503
00504 tcp_handler->protocol_object (object);
00505
00506 this->endpoint_->set_flow_handler (this->flowname_.c_str (),tcp_handler);
00507 this->entry_->protocol_object (object);
00508 this->entry_->handler (tcp_handler);
00509 }
00510 return 0;
00511 }
00512
00513 int
00514 TAO_AV_TCP_Acceptor::open (TAO_Base_StreamEndPoint *endpoint,
00515 TAO_AV_Core *av_core,
00516 TAO_FlowSpec_Entry *entry,
00517 TAO_AV_Flow_Protocol_Factory *factory,
00518 TAO_AV_Core::Flow_Component flow_comp)
00519 {
00520 this->flow_protocol_factory_ = factory;
00521
00522 if (TAO_debug_level > 0)
00523 ACE_DEBUG ((LM_DEBUG,
00524 "TAO_AV_TCP_Acceptor::open "));
00525
00526 this->av_core_ = av_core;
00527 this->endpoint_ = endpoint;
00528 this->entry_ = entry;
00529 if (flow_comp == TAO_AV_Core::TAO_AV_CONTROL)
00530 this->flowname_ = TAO_AV_Core::get_control_flowname (entry->flowname ());
00531 else
00532 this->flowname_ = entry->flowname ();
00533 ACE_Addr *address = entry->address ();
00534
00535 ACE_INET_Addr *inet_addr = (ACE_INET_Addr *) address;
00536
00537 inet_addr->set (inet_addr->get_port_number (),
00538 inet_addr->get_host_name ());
00539
00540 char buf[BUFSIZ];
00541 inet_addr->addr_to_string (buf,
00542 BUFSIZ);
00543
00544 if (TAO_debug_level > 0)
00545 ACE_DEBUG ((LM_DEBUG,
00546 "TAO_AV_TCP_Acceptor::open: %s",
00547 buf
00548 ));
00549
00550 int result = this->acceptor_.acceptor_open (this,
00551 av_core->reactor (),
00552 *inet_addr,
00553 entry);
00554
00555 if (result < 0)
00556 ACE_ERROR_RETURN ((LM_ERROR,
00557 "TAO_AV_TCP_Acceptor::open failed"),
00558 -1);
00559
00560 entry->set_local_addr (address);
00561 return 0;
00562 }
00563
00564 int
00565 TAO_AV_TCP_Acceptor::open_default (TAO_Base_StreamEndPoint *endpoint,
00566 TAO_AV_Core *av_core,
00567 TAO_FlowSpec_Entry *entry,
00568 TAO_AV_Flow_Protocol_Factory *factory,
00569 TAO_AV_Core::Flow_Component flow_comp)
00570 {
00571 this->flow_protocol_factory_ = factory;
00572 this->av_core_ = av_core;
00573 this->endpoint_ = endpoint;
00574 this->entry_ = entry;
00575 if (flow_comp == TAO_AV_Core::TAO_AV_CONTROL)
00576 this->flowname_ = TAO_AV_Core::get_control_flowname (entry->flowname());
00577 else
00578 this->flowname_ = entry->flowname ();
00579
00580 ACE_INET_Addr *address = 0;
00581 ACE_NEW_RETURN (address,
00582 ACE_INET_Addr ("0"),
00583 -1);
00584
00585 int result = this->acceptor_.acceptor_open (this,
00586 av_core->reactor (),
00587 *address,
00588 entry);
00589
00590
00591 if (result < 0)
00592 ACE_ERROR_RETURN ((LM_ERROR,
00593 "TAO_AV_TCP_Acceptor::open failed"),
00594 -1);
00595
00596 this->acceptor_.acceptor ().get_local_addr (*address);
00597
00598 address->set (address->get_port_number (),
00599 address->get_host_name ());
00600
00601 char buf[BUFSIZ];
00602 address->addr_to_string (buf,BUFSIZ);
00603
00604 if (TAO_debug_level > 0)
00605 ACE_DEBUG ((LM_DEBUG,
00606 "TAO_AV_TCP_Acceptor::open_default: %s\n",
00607 buf));
00608
00609 entry->set_local_addr (address);
00610
00611 return 0;
00612 }
00613
00614
00615 int
00616 TAO_AV_TCP_Acceptor::close (void)
00617 {
00618 return 0;
00619 }
00620
00621
00622
00623
00624
00625 TAO_AV_TCP_Flow_Handler::TAO_AV_TCP_Flow_Handler (TAO_AV_Callback * )
00626
00627 {
00628 ACE_NEW (this->transport_,
00629 TAO_AV_TCP_Transport (this));
00630 }
00631
00632 TAO_AV_TCP_Flow_Handler::~TAO_AV_TCP_Flow_Handler (void)
00633 {
00634 delete this->transport_;
00635 }
00636
00637 TAO_AV_Transport *
00638 TAO_AV_TCP_Flow_Handler::transport (void)
00639 {
00640 return this->transport_;
00641 }
00642
00643 int
00644 TAO_AV_TCP_Flow_Handler::open (void * )
00645 {
00646
00647 #if defined (TCP_NODELAY)
00648 int nodelay = 1;
00649
00650 if (this->peer ().set_option (IPPROTO_TCP,
00651 TCP_NODELAY,
00652 (void *) &nodelay,
00653 sizeof (nodelay)) == -1)
00654 ACE_ERROR_RETURN ((LM_ERROR,
00655 "NODELAY failed\n"),
00656 -1);
00657 #endif
00658
00659
00660 int bufSize=BUFSIZ;
00661 int s= sizeof (bufSize);
00662 if( this->peer ().get_option (SOL_SOCKET,
00663 SO_RCVBUF,
00664 (void *) &bufSize,&s
00665 ) == -1)
00666 bufSize=BUFSIZ;
00667
00668
00669 ((TAO_AV_TCP_Object*)(this->protocol_object_))->frame_.size (bufSize);
00670
00671
00672
00673
00674 ACE_INET_Addr addr;
00675
00676 if (this->peer ().get_remote_addr (addr) == -1)
00677 return -1;
00678
00679 char server[MAXHOSTNAMELEN + 16];
00680
00681 (void) addr.addr_to_string (server, sizeof (server));
00682
00683 if (TAO_debug_level > 0)
00684 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,
00685 "(%P|%t) connection to server <%s> on %d\n",
00686 server, this->peer ().get_handle ()));
00687
00688 this->peer ().enable (ACE_NONBLOCK);
00689
00690 if (this->reactor ()
00691 && this->reactor ()->register_handler
00692 (this,
00693 ACE_Event_Handler::READ_MASK) == -1)
00694 ACE_ERROR_RETURN ((LM_ERROR,
00695 ACE_TEXT ("%p\n"),
00696 ACE_TEXT ("unable to register client handler")),
00697 -1);
00698 return 0;
00699 }
00700
00701 int
00702 TAO_AV_TCP_Flow_Handler::handle_input (ACE_HANDLE )
00703 {
00704 return this->protocol_object_->handle_input ();
00705 }
00706
00707 int
00708 TAO_AV_TCP_Flow_Handler::handle_timeout (const ACE_Time_Value &tv,
00709 const void *arg)
00710 {
00711 return TAO_AV_Flow_Handler::handle_timeout (tv,arg);
00712 }
00713
00714 TAO_END_VERSIONED_NAMESPACE_DECL
00715
00716 ACE_STATIC_SVC_DEFINE (TAO_AV_TCP_Flow_Factory,
00717 ACE_TEXT ("TCP_Flow_Factory"),
00718 ACE_SVC_OBJ_T,
00719 &ACE_SVC_NAME (TAO_AV_TCP_Flow_Factory),
00720 ACE_Service_Type::DELETE_THIS |
00721 ACE_Service_Type::DELETE_OBJ,
00722 0)
00723
00724 ACE_FACTORY_DEFINE (TAO_AV, TAO_AV_TCP_Flow_Factory)
00725
00726 ACE_STATIC_SVC_DEFINE (TAO_AV_TCP_Factory,
00727 ACE_TEXT ("TCP_Factory"),
00728 ACE_SVC_OBJ_T,
00729 &ACE_SVC_NAME (TAO_AV_TCP_Factory),
00730 ACE_Service_Type::DELETE_THIS |
00731 ACE_Service_Type::DELETE_OBJ,
00732 0)
00733
00734 ACE_FACTORY_DEFINE (TAO_AV, TAO_AV_TCP_Factory)