00001
00002
00003 #ifndef TAO_UIPMC_TRANSPORT_CPP
00004 #define TAO_UIPMC_TRANSPORT_CPP
00005
00006 #include "orbsvcs/PortableGroup/UIPMC_Profile.h"
00007 #include "orbsvcs/PortableGroup/UIPMC_Transport.h"
00008 #include "orbsvcs/PortableGroup/UIPMC_Message_Block_Data_Iterator.h"
00009 #include "orbsvcs/PortableGroup/UIPMC_Wait_Never.h"
00010
00011 #include "tao/Acceptor_Registry.h"
00012 #include "tao/operation_details.h"
00013 #include "tao/Timeprobe.h"
00014 #include "tao/CDR.h"
00015 #include "tao/Transport_Mux_Strategy.h"
00016 #include "tao/Wait_Strategy.h"
00017 #include "tao/Stub.h"
00018 #include "tao/ORB_Core.h"
00019 #include "tao/debug.h"
00020 #include "tao/Resume_Handle.h"
00021 #include "tao/GIOP_Message_Base.h"
00022
00023 ACE_RCSID (PortableGroup,
00024 UIPMC_Transport,
00025 "$Id: UIPMC_Transport.cpp 79151 2007-08-01 09:04:36Z johnnyw $")
00026
00027
00028
00029
00030
00031
00032
00033
00034 #define MIOP_MAX_FRAGMENTS (1)
00035 #define MIOP_MAX_HEADER_SIZE (272)
00036 #define MIOP_MAX_DGRAM_SIZE (ACE_MAX_DGRAM_SIZE)
00037
00038 #define MIOP_MAGIC_OFFSET (0)
00039 #define MIOP_VERSION_OFFSET (4)
00040 #define MIOP_FLAGS_OFFSET (5)
00041 #define MIOP_PACKET_LENGTH_OFFSET (6)
00042 #define MIOP_PACKET_NUMBER_OFFSET (8)
00043 #define MIOP_NUMBER_OF_PACKETS_OFFSET (12)
00044 #define MIOP_ID_LENGTH_OFFSET (16)
00045 #define MIOP_MIN_LENGTH_ID (0)
00046 #define MIOP_MAX_LENGTH_ID (252)
00047 #define MIOP_ID_DEFAULT_LENGTH (12)
00048 #define MIOP_ID_CONTENT_OFFSET (20)
00049 #define MIOP_HEADER_PADDING (0)
00050
00051 #define MIOP_HEADER_SIZE (MIOP_ID_CONTENT_OFFSET \
00052 + MIOP_ID_DEFAULT_LENGTH \
00053 + MIOP_HEADER_PADDING)
00054 #define MIOP_MIN_HEADER_SIZE (MIOP_ID_CONTENT_OFFSET \
00055 + MIOP_MIN_LENGTH_ID \
00056 + (8 - MIOP_MIN_LENGTH_ID) )
00057
00058 static const CORBA::Octet miop_magic[4] = { 0x4d, 0x49, 0x4f, 0x50 };
00059
00060 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00061
00062 struct MIOP_Packet
00063 {
00064 iovec iov[ACE_IOV_MAX];
00065 int iovcnt;
00066 int length;
00067 };
00068
00069 template<typename CONNECTION_HANDLER>
00070 TAO_UIPMC_Transport<CONNECTION_HANDLER>::TAO_UIPMC_Transport (
00071 CONNECTION_HANDLER *handler,
00072 TAO_ORB_Core *orb_core
00073 )
00074 : TAO_Transport (IOP::TAG_UIPMC,
00075 orb_core)
00076 , connection_handler_ (handler)
00077 , messaging_object_ (0)
00078 {
00079
00080 ACE_NEW (this->messaging_object_,
00081 TAO_GIOP_Message_Base (orb_core,
00082 this,
00083 MIOP_MAX_DGRAM_SIZE));
00084
00085
00086
00087 delete this->ws_;
00088 ACE_NEW (this->ws_,
00089 TAO_UIPMC_Wait_Never (this));
00090 }
00091
00092 template<typename CONNECTION_HANDLER>
00093 TAO_UIPMC_Transport<CONNECTION_HANDLER>::~TAO_UIPMC_Transport (void)
00094 {
00095 delete this->messaging_object_;
00096 }
00097
00098 template<typename CONNECTION_HANDLER>
00099 ACE_Event_Handler *
00100 TAO_UIPMC_Transport<CONNECTION_HANDLER>::event_handler_i (void)
00101 {
00102 return this->connection_handler_;
00103 }
00104
00105 template<typename CONNECTION_HANDLER>
00106 TAO_Connection_Handler *
00107 TAO_UIPMC_Transport<CONNECTION_HANDLER>::connection_handler_i (void)
00108 {
00109 return this->connection_handler_;
00110 }
00111
00112 template<typename CONNECTION_HANDLER>
00113 TAO_Pluggable_Messaging *
00114 TAO_UIPMC_Transport<CONNECTION_HANDLER>::messaging_object (void)
00115 {
00116 return this->messaging_object_;
00117 }
00118
00119 template<typename CONNECTION_HANDLER>
00120 void
00121 TAO_UIPMC_Transport<CONNECTION_HANDLER>::write_unique_id (TAO_OutputCDR &miop_hdr,
00122 unsigned long unique)
00123 {
00124
00125
00126
00127
00128
00129 static unsigned long counter = 1;
00130
00131
00132
00133 CORBA::Octet unique_id[MIOP_ID_DEFAULT_LENGTH];
00134
00135 unique_id[0] = static_cast<CORBA::Octet> (unique & 0xff);
00136 unique_id[1] = static_cast<CORBA::Octet> ((unique & 0xff00) >> 8);
00137 unique_id[2] = static_cast<CORBA::Octet> ((unique & 0xff0000) >> 16);
00138 unique_id[3] = static_cast<CORBA::Octet> ((unique & 0xff000000) >> 24);
00139
00140 unique_id[4] = static_cast<CORBA::Octet> (counter & 0xff);
00141 unique_id[5] = static_cast<CORBA::Octet> ((counter & 0xff00) >> 8);
00142 unique_id[6] = static_cast<CORBA::Octet> ((counter & 0xff0000) >> 16);
00143 unique_id[7] = static_cast<CORBA::Octet> ((counter & 0xff000000) >> 24);
00144
00145 unique_id[8] = 0;
00146 unique_id[9] = 0;
00147 unique_id[10] = 0;
00148 unique_id[11] = 0;
00149
00150 miop_hdr.write_ulong (MIOP_ID_DEFAULT_LENGTH);
00151 miop_hdr.write_octet_array (unique_id, MIOP_ID_DEFAULT_LENGTH);
00152 }
00153
00154 template<typename CONNECTION_HANDLER>
00155 ssize_t
00156 TAO_UIPMC_Transport<CONNECTION_HANDLER>::send (iovec *iov, int iovcnt,
00157 size_t &bytes_transferred,
00158 const ACE_Time_Value *)
00159 {
00160 const ACE_INET_Addr &addr = this->connection_handler_->addr ();
00161 bytes_transferred = 0;
00162
00163
00164
00165
00166
00167
00168
00169
00170
00171
00172 ssize_t bytes_to_send = 0;
00173 for (int i = 0; i < iovcnt; i++)
00174 bytes_to_send += iov[i].iov_len;
00175
00176 MIOP_Packet fragments[MIOP_MAX_FRAGMENTS];
00177 MIOP_Packet *current_fragment;
00178 int num_fragments = 1;
00179
00180 UIPMC_Message_Block_Data_Iterator mb_iter (iov, iovcnt);
00181
00182
00183 current_fragment = &fragments[0];
00184 current_fragment->iovcnt = 1;
00185 current_fragment->length = MIOP_HEADER_SIZE;
00186
00187
00188 while (mb_iter.next_block (MIOP_MAX_DGRAM_SIZE - current_fragment->length,
00189 current_fragment->iov[current_fragment->iovcnt]))
00190 {
00191
00192 current_fragment->length += current_fragment->iov[current_fragment->iovcnt].iov_len;
00193 current_fragment->iovcnt++;
00194
00195
00196
00197 if (current_fragment->length == MIOP_MAX_DGRAM_SIZE ||
00198 current_fragment->iovcnt == ACE_IOV_MAX)
00199 {
00200
00201 num_fragments++;
00202
00203
00204 if (num_fragments > MIOP_MAX_FRAGMENTS)
00205 {
00206
00207
00208
00209
00210
00211
00212
00213 if (TAO_debug_level > 0)
00214 {
00215 ACE_DEBUG ((LM_DEBUG,
00216 ACE_TEXT ("\n\nTAO (%P|%t) - ")
00217 ACE_TEXT ("UIPMC_Transport::send ")
00218 ACE_TEXT ("Message of size %d needs too many MIOP fragments (max is %d).\n")
00219 ACE_TEXT ("You may be able to increase ACE_MAX_DGRAM_SIZE.\n"),
00220 bytes_to_send,
00221 MIOP_MAX_FRAGMENTS));
00222 }
00223
00224
00225 bytes_transferred = bytes_to_send;
00226 return 1;
00227 }
00228
00229
00230 current_fragment++;
00231 current_fragment->iovcnt = 1;
00232 current_fragment->length = MIOP_HEADER_SIZE;
00233 }
00234 }
00235
00236
00237
00238
00239
00240 char header_buffer[MIOP_HEADER_SIZE + 8];
00241 TAO_OutputCDR miop_hdr (header_buffer, MIOP_HEADER_SIZE + 8);
00242
00243 miop_hdr.write_octet_array (miop_magic, 4);
00244 miop_hdr.write_octet (0x10);
00245 CORBA::Octet *flags_field = reinterpret_cast<CORBA::Octet *> (miop_hdr.current ()->wr_ptr ());
00246
00247
00248
00249
00250
00251
00252 miop_hdr.write_octet (TAO_ENCAP_BYTE_ORDER);
00253
00254
00255
00256
00257 CORBA::UShort *packet_length = reinterpret_cast<CORBA::UShort *> (miop_hdr.current ()->wr_ptr ());
00258 miop_hdr.write_short (0);
00259
00260
00261 CORBA::ULong *packet_number = reinterpret_cast<CORBA::ULong *> (miop_hdr.current ()->wr_ptr ());
00262 miop_hdr.write_ulong (0);
00263
00264
00265 miop_hdr.write_ulong (num_fragments);
00266
00267
00268 ptrdiff_t unique_id = reinterpret_cast<ptrdiff_t> (iov);
00269 this->write_unique_id (miop_hdr,
00270 static_cast<unsigned long> (unique_id));
00271
00272
00273 current_fragment = &fragments[0];
00274 while (num_fragments > 0 &&
00275 current_fragment->iovcnt > 1)
00276 {
00277
00278 *packet_length = static_cast<CORBA::UShort> (current_fragment->length);
00279
00280
00281 if (num_fragments == 1)
00282 {
00283 *flags_field |= 0x02;
00284 }
00285
00286
00287 current_fragment->iov[0].iov_base = miop_hdr.current ()->rd_ptr ();
00288 current_fragment->iov[0].iov_len = MIOP_HEADER_SIZE;
00289
00290
00291 ssize_t rc = this->connection_handler_->send (current_fragment->iov,
00292 current_fragment->iovcnt,
00293 addr);
00294
00295 if (rc <= 0)
00296 {
00297 if (TAO_debug_level > 0)
00298 {
00299 ACE_DEBUG ((LM_DEBUG,
00300 ACE_TEXT ("\n\nTAO (%P|%t) - ")
00301 ACE_TEXT ("UIPMC_Transport::send")
00302 ACE_TEXT (" %p\n\n"),
00303 ACE_TEXT ("Error returned from transport:")));
00304 }
00305
00306
00307 bytes_transferred = bytes_to_send;
00308 return 1;
00309 }
00310
00311
00312
00313 bytes_transferred += rc - MIOP_HEADER_SIZE;
00314
00315 if (TAO_debug_level > 0)
00316 {
00317 ACE_DEBUG ((LM_DEBUG,
00318 "TAO_UIPMC_Transport::send: sent %d bytes to %s:%d\n",
00319 rc,
00320 addr.get_host_addr (),
00321 addr.get_port_number ()));
00322 }
00323
00324
00325 (*packet_number)++;
00326 ++current_fragment;
00327 --num_fragments;
00328 }
00329
00330
00331 return bytes_transferred;
00332 }
00333
00334 template<typename CONNECTION_HANDLER>
00335 ssize_t
00336 TAO_UIPMC_Transport<CONNECTION_HANDLER>::recv (char *buf,
00337 size_t len,
00338 const ACE_Time_Value * )
00339 {
00340 ACE_INET_Addr from_addr;
00341
00342 ssize_t n = this->connection_handler_->peer ().recv (buf,
00343 len,
00344 from_addr);
00345 if (TAO_debug_level > 5)
00346 {
00347 ACE_DEBUG ((LM_DEBUG,
00348 "TAO_UIPMC_Transport::recv: received %d bytes from %s:%d\n",
00349 n,
00350 from_addr.get_host_addr (),
00351 from_addr.get_port_number ()));
00352 }
00353
00354
00355 if (n < MIOP_MIN_HEADER_SIZE)
00356 {
00357 if (TAO_debug_level > 0)
00358 {
00359 ACE_DEBUG ((LM_DEBUG,
00360 "TAO_UIPMC_Transport::recv: packet of size %d is too small from %s:%d\n",
00361 n,
00362 from_addr.get_host_addr (),
00363 from_addr.get_port_number ()));
00364 }
00365 return 0;
00366 }
00367
00368
00369 if (buf[MIOP_MAGIC_OFFSET] != miop_magic [0] ||
00370 buf[MIOP_MAGIC_OFFSET + 1] != miop_magic [1] ||
00371 buf[MIOP_MAGIC_OFFSET + 2] != miop_magic [2] ||
00372 buf[MIOP_MAGIC_OFFSET + 3] != miop_magic [3])
00373 {
00374 if (TAO_debug_level > 0)
00375 {
00376 ACE_DEBUG ((LM_DEBUG,
00377 "TAO_UIPMC_Transport::recv: UIPMC packet didn't contain magic bytes.\n"));
00378 }
00379
00380 return 0;
00381 }
00382
00383
00384
00385
00386 CORBA::Octet byte_order = buf[MIOP_FLAGS_OFFSET] & 0x01;
00387
00388
00389
00390
00391 CORBA::ULong id_length;
00392 #if !defined (ACE_DISABLE_SWAP_ON_READ)
00393 if (byte_order == ACE_CDR_BYTE_ORDER)
00394 {
00395 id_length = *reinterpret_cast<ACE_CDR::ULong*> (&buf[MIOP_ID_LENGTH_OFFSET]);
00396 }
00397 else
00398 {
00399 ACE_CDR::swap_4 (&buf[MIOP_ID_LENGTH_OFFSET],
00400 reinterpret_cast<char*> (&id_length));
00401 }
00402 #else
00403 id_length = *reinterpret_cast<ACE_CDR::ULong*> (&buf[MIOP_ID_LENGTH_OFFSET]);
00404 #endif
00405
00406
00407 if (id_length > MIOP_MAX_LENGTH_ID ||
00408 static_cast<ssize_t> (MIOP_ID_CONTENT_OFFSET + id_length) > n)
00409 {
00410 if (TAO_debug_level > 0)
00411 {
00412 ACE_DEBUG ((LM_DEBUG,
00413 "TAO_UIPMC_Transport::recv: Invalid ID length.\n"));
00414 }
00415
00416 return 0;
00417 }
00418
00419
00420 ssize_t miop_header_size = (MIOP_ID_CONTENT_OFFSET + id_length + 7) & ~0x7;
00421 if (miop_header_size > n)
00422 {
00423 if (TAO_debug_level > 0)
00424 {
00425 ACE_DEBUG ((LM_DEBUG,
00426 "TAO_UIPMC_Transport::recv: MIOP packet not large enough for padding.\n"));
00427 }
00428
00429 return 0;
00430 }
00431
00432 n -= miop_header_size;
00433 ACE_OS::memmove (buf, buf + miop_header_size, n);
00434
00435 return n;
00436 }
00437
00438 template<typename CONNECTION_HANDLER>
00439 int
00440 TAO_UIPMC_Transport<CONNECTION_HANDLER>::handle_input (TAO_Resume_Handle &rh,
00441 ACE_Time_Value *max_wait_time)
00442 {
00443
00444
00445
00446
00447
00448 char buf [MIOP_MAX_DGRAM_SIZE];
00449
00450 #if defined (ACE_INITIALIZE_MEMORY_BEFORE_USE)
00451 (void) ACE_OS::memset (buf,
00452 '\0',
00453 sizeof buf);
00454 #endif
00455
00456
00457 ACE_Data_Block db (sizeof (buf),
00458 ACE_Message_Block::MB_DATA,
00459 buf,
00460 this->orb_core_->input_cdr_buffer_allocator (),
00461 this->orb_core_->locking_strategy (),
00462 ACE_Message_Block::DONT_DELETE,
00463 this->orb_core_->input_cdr_dblock_allocator ());
00464
00465
00466 ACE_Message_Block message_block (&db,
00467 ACE_Message_Block::DONT_DELETE,
00468 this->orb_core_->input_cdr_msgblock_allocator ());
00469
00470
00471
00472 ACE_CDR::mb_align (&message_block);
00473
00474
00475
00476
00477 ssize_t n = this->recv (message_block.rd_ptr (),
00478 message_block.space (),
00479 max_wait_time);
00480
00481
00482 if (n <= 0)
00483 {
00484 if (TAO_debug_level)
00485 {
00486 ACE_DEBUG ((LM_DEBUG,
00487 ACE_TEXT ("TAO: (%P|%t|%N|%l) recv returned error on transport %d after fault %p\n"),
00488 this->id (),
00489 ACE_TEXT ("handle_input ()\n")));
00490 }
00491
00492 if (n == -1)
00493 this->tms_->connection_closed ();
00494
00495 return n;
00496 }
00497
00498
00499 message_block.wr_ptr (n);
00500
00501
00502 TAO_Queued_Data qd (&message_block);
00503 size_t mesg_length = 0;
00504
00505
00506
00507 if (this->messaging_object ()->parse_next_message (qd, mesg_length) == -1)
00508 {
00509 if (TAO_debug_level)
00510 {
00511 ACE_DEBUG ((LM_DEBUG,
00512 ACE_TEXT ("TAO: (%P|%t|%N|%l) handle_input failed on transport %d after fault\n"),
00513 this->id () ));
00514 }
00515
00516 return -1;
00517 }
00518
00519 if (message_block.length () > mesg_length)
00520 {
00521 if (TAO_debug_level)
00522 {
00523 ACE_DEBUG ((LM_DEBUG,
00524 ACE_TEXT ("TAO: (%P|%t|%N|%l) handle_input failed on transport %d after fault\n"),
00525 this->id () ));
00526 }
00527
00528 return -1;
00529 }
00530
00531
00532
00533
00534
00535
00536 return this->process_parsed_messages (&qd, rh);
00537 }
00538
00539 template<typename CONNECTION_HANDLER>
00540 int
00541 TAO_UIPMC_Transport<CONNECTION_HANDLER>::register_handler (void)
00542 {
00543
00544
00545
00546
00547
00548
00549
00550 return 0;
00551 }
00552
00553 template<typename CONNECTION_HANDLER>
00554 int
00555 TAO_UIPMC_Transport<CONNECTION_HANDLER>::send_request (TAO_Stub *stub,
00556 TAO_ORB_Core *orb_core,
00557 TAO_OutputCDR &stream,
00558 int message_semantics,
00559 ACE_Time_Value *max_wait_time)
00560 {
00561 if (this->ws_->sending_request (orb_core,
00562 message_semantics) == -1)
00563 return -1;
00564
00565 if (this->send_message (stream,
00566 stub,
00567 message_semantics,
00568 max_wait_time) == -1)
00569
00570 return -1;
00571
00572 return 0;
00573 }
00574
00575 template<typename CONNECTION_HANDLER>
00576 int
00577 TAO_UIPMC_Transport<CONNECTION_HANDLER>::send_message (TAO_OutputCDR &stream,
00578 TAO_Stub *stub,
00579 int message_semantics,
00580 ACE_Time_Value *max_wait_time)
00581 {
00582
00583 if (this->messaging_object_->format_message (stream) != 0)
00584 return -1;
00585
00586
00587
00588
00589
00590
00591 ssize_t n = this->send_message_shared (stub,
00592 message_semantics,
00593 stream.begin (),
00594 max_wait_time);
00595
00596 if (n == -1)
00597 {
00598 if (TAO_debug_level)
00599 ACE_DEBUG ((LM_DEBUG,
00600 ACE_TEXT ("TAO: (%P|%t|%N|%l) closing transport %d after fault %m\n"),
00601 this->id (),
00602 ACE_TEXT ("send_message ()\n")));
00603
00604 return -1;
00605 }
00606
00607 return 1;
00608 }
00609
00610 template<typename CONNECTION_HANDLER>
00611 int
00612 TAO_UIPMC_Transport<CONNECTION_HANDLER>::messaging_init (CORBA::Octet major,
00613 CORBA::Octet minor)
00614 {
00615 this->messaging_object_->init (major, minor);
00616 return 1;
00617 }
00618
00619 TAO_END_VERSIONED_NAMESPACE_DECL
00620
00621 #endif