00001
00002
00003 #ifndef TAO_UIPMC_TRANSPORT_CPP
00004 #define TAO_UIPMC_TRANSPORT_CPP
00005
00006 #include "orbsvcs/PortableGroup/UIPMC_Profile.h"
00007 #include "orbsvcs/PortableGroup/UIPMC_Transport.h"
00008 #include "orbsvcs/PortableGroup/UIPMC_Message_Block_Data_Iterator.h"
00009 #include "orbsvcs/PortableGroup/UIPMC_Wait_Never.h"
00010
00011 #include "tao/Acceptor_Registry.h"
00012 #include "tao/operation_details.h"
00013 #include "tao/Timeprobe.h"
00014 #include "tao/CDR.h"
00015 #include "tao/Transport_Mux_Strategy.h"
00016 #include "tao/Wait_Strategy.h"
00017 #include "tao/Stub.h"
00018 #include "tao/ORB_Core.h"
00019 #include "tao/debug.h"
00020 #include "tao/Resume_Handle.h"
00021 #include "tao/GIOP_Message_Base.h"
00022
00023 ACE_RCSID (PortableGroup,
00024 UIPMC_Transport,
00025 "$Id: UIPMC_Transport.cpp 81159 2008-03-31 08:41:47Z johnnyw $")
00026
00027
00028
00029
00030
00031
00032
00033
00034 #define MIOP_MAX_FRAGMENTS (1)
00035 #define MIOP_MAX_HEADER_SIZE (272) // See MIOP Spec. Must be a multiple of 8.
00036 #define MIOP_MAX_DGRAM_SIZE (ACE_MAX_DGRAM_SIZE)
00037
00038 #define MIOP_MAGIC_OFFSET (0)
00039 #define MIOP_VERSION_OFFSET (4)
00040 #define MIOP_FLAGS_OFFSET (5)
00041 #define MIOP_PACKET_LENGTH_OFFSET (6)
00042 #define MIOP_PACKET_NUMBER_OFFSET (8)
00043 #define MIOP_NUMBER_OF_PACKETS_OFFSET (12)
00044 #define MIOP_ID_LENGTH_OFFSET (16)
00045 #define MIOP_MIN_LENGTH_ID (0)
00046 #define MIOP_MAX_LENGTH_ID (252)
00047 #define MIOP_ID_DEFAULT_LENGTH (12)
00048 #define MIOP_ID_CONTENT_OFFSET (20)
00049 #define MIOP_HEADER_PADDING (0) // The ID field needs to be padded to
00050
00051 #define MIOP_HEADER_SIZE (MIOP_ID_CONTENT_OFFSET \
00052 + MIOP_ID_DEFAULT_LENGTH \
00053 + MIOP_HEADER_PADDING)
00054 #define MIOP_MIN_HEADER_SIZE (MIOP_ID_CONTENT_OFFSET \
00055 + MIOP_MIN_LENGTH_ID \
00056 + (8 - MIOP_MIN_LENGTH_ID) )
00057
00058 static const CORBA::Octet miop_magic[4] = { 0x4d, 0x49, 0x4f, 0x50 };
00059
00060 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00061
00062 struct MIOP_Packet
00063 {
00064 iovec iov[ACE_IOV_MAX];
00065 int iovcnt;
00066 int length;
00067 };
00068
00069 template<typename CONNECTION_HANDLER>
00070 TAO_UIPMC_Transport<CONNECTION_HANDLER>::TAO_UIPMC_Transport (
00071 CONNECTION_HANDLER *handler,
00072 TAO_ORB_Core *orb_core
00073 )
00074 : TAO_Transport (IOP::TAG_UIPMC,
00075 orb_core,
00076 MIOP_MAX_DGRAM_SIZE)
00077 , connection_handler_ (handler)
00078 {
00079
00080
00081 delete this->ws_;
00082 ACE_NEW (this->ws_,
00083 TAO_UIPMC_Wait_Never (this));
00084 }
00085
00086 template<typename CONNECTION_HANDLER>
00087 TAO_UIPMC_Transport<CONNECTION_HANDLER>::~TAO_UIPMC_Transport (void)
00088 {
00089 }
00090
00091 template<typename CONNECTION_HANDLER>
00092 ACE_Event_Handler *
00093 TAO_UIPMC_Transport<CONNECTION_HANDLER>::event_handler_i (void)
00094 {
00095 return this->connection_handler_;
00096 }
00097
00098 template<typename CONNECTION_HANDLER>
00099 TAO_Connection_Handler *
00100 TAO_UIPMC_Transport<CONNECTION_HANDLER>::connection_handler_i (void)
00101 {
00102 return this->connection_handler_;
00103 }
00104
00105 template<typename CONNECTION_HANDLER>
00106 void
00107 TAO_UIPMC_Transport<CONNECTION_HANDLER>::write_unique_id (TAO_OutputCDR &miop_hdr,
00108 unsigned long unique)
00109 {
00110
00111
00112
00113
00114
00115 static unsigned long counter = 1;
00116
00117
00118
00119 CORBA::Octet unique_id[MIOP_ID_DEFAULT_LENGTH];
00120
00121 unique_id[0] = static_cast<CORBA::Octet> (unique & 0xff);
00122 unique_id[1] = static_cast<CORBA::Octet> ((unique & 0xff00) >> 8);
00123 unique_id[2] = static_cast<CORBA::Octet> ((unique & 0xff0000) >> 16);
00124 unique_id[3] = static_cast<CORBA::Octet> ((unique & 0xff000000) >> 24);
00125
00126 unique_id[4] = static_cast<CORBA::Octet> (counter & 0xff);
00127 unique_id[5] = static_cast<CORBA::Octet> ((counter & 0xff00) >> 8);
00128 unique_id[6] = static_cast<CORBA::Octet> ((counter & 0xff0000) >> 16);
00129 unique_id[7] = static_cast<CORBA::Octet> ((counter & 0xff000000) >> 24);
00130
00131 unique_id[8] = 0;
00132 unique_id[9] = 0;
00133 unique_id[10] = 0;
00134 unique_id[11] = 0;
00135
00136 miop_hdr.write_ulong (MIOP_ID_DEFAULT_LENGTH);
00137 miop_hdr.write_octet_array (unique_id, MIOP_ID_DEFAULT_LENGTH);
00138 }
00139
00140 template<typename CONNECTION_HANDLER>
00141 ssize_t
00142 TAO_UIPMC_Transport<CONNECTION_HANDLER>::send (iovec *iov, int iovcnt,
00143 size_t &bytes_transferred,
00144 const ACE_Time_Value *)
00145 {
00146 const ACE_INET_Addr &addr = this->connection_handler_->addr ();
00147 bytes_transferred = 0;
00148
00149
00150
00151
00152
00153
00154
00155
00156
00157
00158 ssize_t bytes_to_send = 0;
00159 for (int i = 0; i < iovcnt; i++)
00160 bytes_to_send += iov[i].iov_len;
00161
00162 MIOP_Packet fragments[MIOP_MAX_FRAGMENTS];
00163 MIOP_Packet *current_fragment = 0;
00164 int num_fragments = 1;
00165
00166 UIPMC_Message_Block_Data_Iterator mb_iter (iov, iovcnt);
00167
00168
00169 current_fragment = &fragments[0];
00170 current_fragment->iovcnt = 1;
00171 current_fragment->length = 0;
00172
00173
00174 while (mb_iter.next_block (MIOP_MAX_DGRAM_SIZE - current_fragment->length,
00175 current_fragment->iov[current_fragment->iovcnt]))
00176 {
00177
00178 current_fragment->length += current_fragment->iov[current_fragment->iovcnt].iov_len;
00179 current_fragment->iovcnt++;
00180
00181
00182
00183 if (current_fragment->length == MIOP_MAX_DGRAM_SIZE ||
00184 current_fragment->iovcnt == ACE_IOV_MAX)
00185 {
00186
00187 num_fragments++;
00188
00189
00190 if (num_fragments > MIOP_MAX_FRAGMENTS)
00191 {
00192
00193
00194
00195
00196
00197
00198
00199 if (TAO_debug_level > 0)
00200 {
00201 ACE_DEBUG ((LM_DEBUG,
00202 ACE_TEXT ("\n\nTAO (%P|%t) - ")
00203 ACE_TEXT ("UIPMC_Transport::send ")
00204 ACE_TEXT ("Message of size %d needs too many MIOP fragments (max is %d).\n")
00205 ACE_TEXT ("You may be able to increase ACE_MAX_DGRAM_SIZE.\n"),
00206 bytes_to_send,
00207 MIOP_MAX_FRAGMENTS));
00208 }
00209
00210
00211 bytes_transferred = bytes_to_send;
00212 return 1;
00213 }
00214
00215
00216 current_fragment++;
00217 current_fragment->iovcnt = 1;
00218 current_fragment->length = MIOP_HEADER_SIZE;
00219 }
00220 }
00221
00222
00223
00224
00225
00226 char header_buffer[MIOP_HEADER_SIZE + 8];
00227 TAO_OutputCDR miop_hdr (header_buffer, MIOP_HEADER_SIZE + 8);
00228
00229 miop_hdr.write_octet_array (miop_magic, 4);
00230 miop_hdr.write_octet (0x10);
00231 CORBA::Octet *flags_field = reinterpret_cast<CORBA::Octet *> (miop_hdr.current ()->wr_ptr ());
00232
00233
00234
00235
00236
00237
00238 miop_hdr.write_octet (TAO_ENCAP_BYTE_ORDER);
00239
00240
00241
00242
00243 CORBA::UShort *packet_length = reinterpret_cast<CORBA::UShort *> (miop_hdr.current ()->wr_ptr ());
00244 miop_hdr.write_short (0);
00245
00246
00247 CORBA::ULong *packet_number = reinterpret_cast<CORBA::ULong *> (miop_hdr.current ()->wr_ptr ());
00248 miop_hdr.write_ulong (0);
00249
00250
00251 miop_hdr.write_ulong (num_fragments);
00252
00253
00254 ptrdiff_t unique_id = reinterpret_cast<ptrdiff_t> (iov);
00255 this->write_unique_id (miop_hdr,
00256 static_cast<unsigned long> (unique_id));
00257
00258
00259 current_fragment = &fragments[0];
00260 while (num_fragments > 0 &&
00261 current_fragment->iovcnt > 1)
00262 {
00263
00264 *packet_length = static_cast<CORBA::UShort> (current_fragment->length);
00265
00266
00267 if (num_fragments == 1)
00268 {
00269 *flags_field |= 0x02;
00270 }
00271
00272
00273 current_fragment->iov[0].iov_base = miop_hdr.current ()->rd_ptr ();
00274 current_fragment->iov[0].iov_len = MIOP_HEADER_SIZE;
00275
00276
00277 ssize_t rc = this->connection_handler_->send (current_fragment->iov,
00278 current_fragment->iovcnt,
00279 addr);
00280
00281 if (rc <= 0)
00282 {
00283 if (TAO_debug_level > 0)
00284 {
00285 ACE_DEBUG ((LM_DEBUG,
00286 ACE_TEXT ("\n\nTAO (%P|%t) - ")
00287 ACE_TEXT ("UIPMC_Transport::send")
00288 ACE_TEXT (" %p\n\n"),
00289 ACE_TEXT ("Error returned from transport:")));
00290 }
00291
00292
00293 bytes_transferred = bytes_to_send;
00294 return 1;
00295 }
00296
00297
00298
00299 bytes_transferred += rc - MIOP_HEADER_SIZE;
00300
00301 if (TAO_debug_level > 0)
00302 {
00303 ACE_DEBUG ((LM_DEBUG,
00304 "TAO_UIPMC_Transport::send: sent %d bytes to %s:%d\n",
00305 rc,
00306 addr.get_host_addr (),
00307 addr.get_port_number ()));
00308 }
00309
00310
00311 (*packet_number)++;
00312 ++current_fragment;
00313 --num_fragments;
00314 }
00315
00316
00317 return bytes_transferred;
00318 }
00319
00320 template<typename CONNECTION_HANDLER>
00321 ssize_t
00322 TAO_UIPMC_Transport<CONNECTION_HANDLER>::recv (char *buf,
00323 size_t len,
00324 const ACE_Time_Value * )
00325 {
00326 ACE_INET_Addr from_addr;
00327
00328 ssize_t n = this->connection_handler_->peer ().recv (buf,
00329 len,
00330 from_addr);
00331 if (TAO_debug_level > 5)
00332 {
00333 ACE_DEBUG ((LM_DEBUG,
00334 "TAO_UIPMC_Transport::recv: received %d bytes from %s:%d\n",
00335 n,
00336 from_addr.get_host_addr (),
00337 from_addr.get_port_number ()));
00338 }
00339
00340
00341 if (n < MIOP_MIN_HEADER_SIZE)
00342 {
00343 if (TAO_debug_level > 0)
00344 {
00345 ACE_DEBUG ((LM_DEBUG,
00346 "TAO_UIPMC_Transport::recv: packet of size %d is too small from %s:%d\n",
00347 n,
00348 from_addr.get_host_addr (),
00349 from_addr.get_port_number ()));
00350 }
00351 return 0;
00352 }
00353
00354
00355 if (buf[MIOP_MAGIC_OFFSET] != miop_magic [0] ||
00356 buf[MIOP_MAGIC_OFFSET + 1] != miop_magic [1] ||
00357 buf[MIOP_MAGIC_OFFSET + 2] != miop_magic [2] ||
00358 buf[MIOP_MAGIC_OFFSET + 3] != miop_magic [3])
00359 {
00360 if (TAO_debug_level > 0)
00361 {
00362 ACE_DEBUG ((LM_DEBUG,
00363 "TAO_UIPMC_Transport::recv: UIPMC packet didn't contain magic bytes.\n"));
00364 }
00365
00366 return 0;
00367 }
00368
00369
00370
00371
00372 CORBA::Octet byte_order = buf[MIOP_FLAGS_OFFSET] & 0x01;
00373
00374
00375
00376
00377 CORBA::ULong id_length;
00378 #if !defined (ACE_DISABLE_SWAP_ON_READ)
00379 if (byte_order == ACE_CDR_BYTE_ORDER)
00380 {
00381 id_length = *reinterpret_cast<ACE_CDR::ULong*> (&buf[MIOP_ID_LENGTH_OFFSET]);
00382 }
00383 else
00384 {
00385 ACE_CDR::swap_4 (&buf[MIOP_ID_LENGTH_OFFSET],
00386 reinterpret_cast<char*> (&id_length));
00387 }
00388 #else
00389 id_length = *reinterpret_cast<ACE_CDR::ULong*> (&buf[MIOP_ID_LENGTH_OFFSET]);
00390 #endif
00391
00392
00393 if (id_length > MIOP_MAX_LENGTH_ID ||
00394 static_cast<ssize_t> (MIOP_ID_CONTENT_OFFSET + id_length) > n)
00395 {
00396 if (TAO_debug_level > 0)
00397 {
00398 ACE_DEBUG ((LM_DEBUG,
00399 "TAO_UIPMC_Transport::recv: Invalid ID length.\n"));
00400 }
00401
00402 return 0;
00403 }
00404
00405
00406 ssize_t const miop_header_size = (MIOP_ID_CONTENT_OFFSET + id_length + 7) & ~0x7;
00407 if (miop_header_size > n)
00408 {
00409 if (TAO_debug_level > 0)
00410 {
00411 ACE_DEBUG ((LM_DEBUG,
00412 "TAO_UIPMC_Transport::recv: MIOP packet not large enough for padding.\n"));
00413 }
00414
00415 return 0;
00416 }
00417
00418 n -= miop_header_size;
00419 ACE_OS::memmove (buf, buf + miop_header_size, n);
00420
00421 return n;
00422 }
00423
00424 template<typename CONNECTION_HANDLER>
00425 int
00426 TAO_UIPMC_Transport<CONNECTION_HANDLER>::handle_input (TAO_Resume_Handle &rh,
00427 ACE_Time_Value *max_wait_time)
00428 {
00429
00430
00431
00432
00433
00434 char buf [MIOP_MAX_DGRAM_SIZE];
00435
00436 #if defined (ACE_INITIALIZE_MEMORY_BEFORE_USE)
00437 (void) ACE_OS::memset (buf,
00438 '\0',
00439 sizeof buf);
00440 #endif
00441
00442
00443 ACE_Data_Block db (sizeof (buf),
00444 ACE_Message_Block::MB_DATA,
00445 buf,
00446 this->orb_core_->input_cdr_buffer_allocator (),
00447 this->orb_core_->locking_strategy (),
00448 ACE_Message_Block::DONT_DELETE,
00449 this->orb_core_->input_cdr_dblock_allocator ());
00450
00451
00452 ACE_Message_Block message_block (&db,
00453 ACE_Message_Block::DONT_DELETE,
00454 this->orb_core_->input_cdr_msgblock_allocator ());
00455
00456
00457
00458 ACE_CDR::mb_align (&message_block);
00459
00460
00461
00462
00463 ssize_t n = this->recv (message_block.rd_ptr (),
00464 message_block.space (),
00465 max_wait_time);
00466
00467
00468 if (n <= 0)
00469 {
00470 if (TAO_debug_level)
00471 {
00472 ACE_DEBUG ((LM_DEBUG,
00473 ACE_TEXT ("TAO: (%P|%t|%N|%l) recv returned error on transport %d after fault %p\n"),
00474 this->id (),
00475 ACE_TEXT ("handle_input ()\n")));
00476 }
00477
00478 if (n == -1)
00479 this->tms_->connection_closed ();
00480
00481 return n;
00482 }
00483
00484
00485 message_block.wr_ptr (n);
00486
00487
00488 TAO_Queued_Data qd (&message_block);
00489 size_t mesg_length = 0;
00490
00491
00492
00493 if (this->messaging_object ()->parse_next_message (qd, mesg_length) == -1)
00494 {
00495 if (TAO_debug_level)
00496 {
00497 ACE_DEBUG ((LM_DEBUG,
00498 ACE_TEXT ("TAO: (%P|%t|%N|%l) handle_input failed on transport %d after fault\n"),
00499 this->id () ));
00500 }
00501
00502 return -1;
00503 }
00504
00505 if (message_block.length () > mesg_length)
00506 {
00507 if (TAO_debug_level)
00508 {
00509 ACE_DEBUG ((LM_DEBUG,
00510 ACE_TEXT ("TAO: (%P|%t|%N|%l) handle_input failed on transport %d after fault\n"),
00511 this->id () ));
00512 }
00513
00514 return -1;
00515 }
00516
00517
00518
00519
00520
00521
00522 return this->process_parsed_messages (&qd, rh);
00523 }
00524
00525 template<typename CONNECTION_HANDLER>
00526 int
00527 TAO_UIPMC_Transport<CONNECTION_HANDLER>::register_handler (void)
00528 {
00529
00530
00531
00532
00533
00534
00535
00536 return 0;
00537 }
00538
00539 template<typename CONNECTION_HANDLER>
00540 int
00541 TAO_UIPMC_Transport<CONNECTION_HANDLER>::send_request (TAO_Stub *stub,
00542 TAO_ORB_Core *orb_core,
00543 TAO_OutputCDR &stream,
00544 TAO_Message_Semantics message_semantics,
00545 ACE_Time_Value *max_wait_time)
00546 {
00547 if (this->ws_->sending_request (orb_core,
00548 message_semantics) == -1)
00549 return -1;
00550
00551 if (this->send_message (stream,
00552 stub,
00553 message_semantics,
00554 max_wait_time) == -1)
00555
00556 return -1;
00557
00558 return 0;
00559 }
00560
00561 template<typename CONNECTION_HANDLER>
00562 int
00563 TAO_UIPMC_Transport<CONNECTION_HANDLER>::send_message (TAO_OutputCDR &stream,
00564 TAO_Stub *stub,
00565 TAO_Message_Semantics message_semantics,
00566 ACE_Time_Value *max_wait_time)
00567 {
00568
00569 if (this->messaging_object_->format_message (stream) != 0)
00570 return -1;
00571
00572
00573
00574
00575
00576
00577 ssize_t n = this->send_message_shared (stub,
00578 message_semantics,
00579 stream.begin (),
00580 max_wait_time);
00581
00582 if (n == -1)
00583 {
00584 if (TAO_debug_level)
00585 ACE_DEBUG ((LM_DEBUG,
00586 ACE_TEXT ("TAO: (%P|%t|%N|%l) closing transport %d after fault %m\n"),
00587 this->id (),
00588 ACE_TEXT ("send_message ()\n")));
00589
00590 return -1;
00591 }
00592
00593 return 1;
00594 }
00595
00596 TAO_END_VERSIONED_NAMESPACE_DECL
00597
00598 #endif