00001
00002
00003 #include "orbsvcs/PortableGroup/UIPMC_Profile.h"
00004 #include "orbsvcs/PortableGroup/UIPMC_Transport.h"
00005 #include "orbsvcs/PortableGroup/UIPMC_Connection_Handler.h"
00006 #include "orbsvcs/PortableGroup/UIPMC_Message_Block_Data_Iterator.h"
00007 #include "orbsvcs/PortableGroup/UIPMC_Acceptor.h"
00008 #include "orbsvcs/PortableGroup/UIPMC_Wait_Never.h"
00009
00010 #include "tao/Acceptor_Registry.h"
00011 #include "tao/operation_details.h"
00012 #include "tao/Timeprobe.h"
00013 #include "tao/CDR.h"
00014 #include "tao/Transport_Mux_Strategy.h"
00015 #include "tao/Wait_Strategy.h"
00016 #include "tao/Stub.h"
00017 #include "tao/ORB_Core.h"
00018 #include "tao/debug.h"
00019 #include "tao/Resume_Handle.h"
00020 #include "tao/GIOP_Message_Base.h"
00021 #include "tao/GIOP_Message_Lite.h"
00022
00023 ACE_RCSID (PortableGroup,
00024 UIPMC_Transport,
00025 "UIPMC_Transport.cpp,v 1.37 2006/05/23 16:09:34 mitza Exp")
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035 #define MIOP_MAX_FRAGMENTS (1)
00036 #define MIOP_MAX_HEADER_SIZE (272)
00037 #define MIOP_MAX_DGRAM_SIZE (ACE_MAX_DGRAM_SIZE)
00038
00039 #define MIOP_MAGIC_OFFSET (0)
00040 #define MIOP_VERSION_OFFSET (4)
00041 #define MIOP_FLAGS_OFFSET (5)
00042 #define MIOP_PACKET_LENGTH_OFFSET (6)
00043 #define MIOP_PACKET_NUMBER_OFFSET (8)
00044 #define MIOP_NUMBER_OF_PACKETS_OFFSET (12)
00045 #define MIOP_ID_LENGTH_OFFSET (16)
00046 #define MIOP_MIN_LENGTH_ID (0)
00047 #define MIOP_MAX_LENGTH_ID (252)
00048 #define MIOP_ID_DEFAULT_LENGTH (12)
00049 #define MIOP_ID_CONTENT_OFFSET (20)
00050 #define MIOP_HEADER_PADDING (0)
00051
00052 #define MIOP_HEADER_SIZE (MIOP_ID_CONTENT_OFFSET \
00053 + MIOP_ID_DEFAULT_LENGTH \
00054 + MIOP_HEADER_PADDING)
00055 #define MIOP_MIN_HEADER_SIZE (MIOP_ID_CONTENT_OFFSET \
00056 + MIOP_MIN_LENGTH_ID \
00057 + (8 - MIOP_MIN_LENGTH_ID) )
00058
00059 static const CORBA::Octet miop_magic[4] = { 0x4d, 0x49, 0x4f, 0x50 };
00060
00061 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00062
00063 struct MIOP_Packet
00064 {
00065 iovec iov[ACE_IOV_MAX];
00066 int iovcnt;
00067 int length;
00068 };
00069
00070
00071 TAO_UIPMC_Transport::TAO_UIPMC_Transport (TAO_UIPMC_Connection_Handler *handler,
00072 TAO_ORB_Core *orb_core,
00073 CORBA::Boolean )
00074 : TAO_Transport (IOP::TAG_UIPMC,
00075 orb_core)
00076 , connection_handler_ (handler)
00077 , messaging_object_ (0)
00078 {
00079
00080 ACE_NEW (this->messaging_object_,
00081 TAO_GIOP_Message_Base (orb_core,
00082 this,
00083 MIOP_MAX_DGRAM_SIZE));
00084
00085
00086
00087 delete this->ws_;
00088 ACE_NEW (this->ws_,
00089 TAO_UIPMC_Wait_Never (this));
00090 }
00091
00092 TAO_UIPMC_Transport::~TAO_UIPMC_Transport (void)
00093 {
00094 delete this->messaging_object_;
00095 }
00096
00097 ACE_Event_Handler *
00098 TAO_UIPMC_Transport::event_handler_i (void)
00099 {
00100 return this->connection_handler_;
00101 }
00102
00103 TAO_Connection_Handler *
00104 TAO_UIPMC_Transport::connection_handler_i (void)
00105 {
00106 return this->connection_handler_;
00107 }
00108
00109 TAO_Pluggable_Messaging *
00110 TAO_UIPMC_Transport::messaging_object (void)
00111 {
00112 return this->messaging_object_;
00113 }
00114
00115
00116 void
00117 TAO_UIPMC_Transport::write_unique_id (TAO_OutputCDR &miop_hdr, unsigned long unique)
00118 {
00119
00120
00121
00122
00123
00124 static unsigned long counter = 1;
00125
00126
00127
00128 CORBA::Octet unique_id[MIOP_ID_DEFAULT_LENGTH];
00129
00130 unique_id[0] = static_cast<CORBA::Octet> (unique & 0xff);
00131 unique_id[1] = static_cast<CORBA::Octet> ((unique & 0xff00) >> 8);
00132 unique_id[2] = static_cast<CORBA::Octet> ((unique & 0xff0000) >> 16);
00133 unique_id[3] = static_cast<CORBA::Octet> ((unique & 0xff000000) >> 24);
00134
00135 unique_id[4] = static_cast<CORBA::Octet> (counter & 0xff);
00136 unique_id[5] = static_cast<CORBA::Octet> ((counter & 0xff00) >> 8);
00137 unique_id[6] = static_cast<CORBA::Octet> ((counter & 0xff0000) >> 16);
00138 unique_id[7] = static_cast<CORBA::Octet> ((counter & 0xff000000) >> 24);
00139
00140 unique_id[8] = 0;
00141 unique_id[9] = 0;
00142 unique_id[10] = 0;
00143 unique_id[11] = 0;
00144
00145 miop_hdr.write_ulong (MIOP_ID_DEFAULT_LENGTH);
00146 miop_hdr.write_octet_array (unique_id, MIOP_ID_DEFAULT_LENGTH);
00147 }
00148
00149 ssize_t
00150 TAO_UIPMC_Transport::send (iovec *iov, int iovcnt,
00151 size_t &bytes_transferred,
00152 const ACE_Time_Value *)
00153 {
00154 const ACE_INET_Addr &addr = this->connection_handler_->addr ();
00155 bytes_transferred = 0;
00156
00157
00158
00159
00160
00161
00162
00163
00164
00165
00166 ssize_t bytes_to_send = 0;
00167 for (int i = 0; i < iovcnt; i++)
00168 bytes_to_send += iov[i].iov_len;
00169
00170 MIOP_Packet fragments[MIOP_MAX_FRAGMENTS];
00171 MIOP_Packet *current_fragment;
00172 int num_fragments = 1;
00173
00174 UIPMC_Message_Block_Data_Iterator mb_iter (iov, iovcnt);
00175
00176
00177 current_fragment = &fragments[0];
00178 current_fragment->iovcnt = 1;
00179 current_fragment->length = MIOP_HEADER_SIZE;
00180
00181
00182 while (mb_iter.next_block (MIOP_MAX_DGRAM_SIZE - current_fragment->length,
00183 current_fragment->iov[current_fragment->iovcnt]))
00184 {
00185
00186 current_fragment->length += current_fragment->iov[current_fragment->iovcnt].iov_len;
00187 current_fragment->iovcnt++;
00188
00189
00190
00191 if (current_fragment->length == MIOP_MAX_DGRAM_SIZE ||
00192 current_fragment->iovcnt == ACE_IOV_MAX)
00193 {
00194
00195 num_fragments++;
00196
00197
00198 if (num_fragments > MIOP_MAX_FRAGMENTS)
00199 {
00200
00201
00202
00203
00204
00205
00206
00207 if (TAO_debug_level > 0)
00208 {
00209 ACE_DEBUG ((LM_DEBUG,
00210 ACE_TEXT ("\n\nTAO (%P|%t) ")
00211 ACE_TEXT ("UIPMC_Transport::send_i ")
00212 ACE_TEXT ("Message of size %d needs too many MIOP fragments (max is %d).\n")
00213 ACE_TEXT ("You may be able to increase ACE_MAX_DGRAM_SIZE.\n"),
00214 bytes_to_send,
00215 MIOP_MAX_FRAGMENTS));
00216 }
00217
00218
00219 bytes_transferred = bytes_to_send;
00220 return 1;
00221 }
00222
00223
00224 current_fragment++;
00225 current_fragment->iovcnt = 1;
00226 current_fragment->length = MIOP_HEADER_SIZE;
00227 }
00228 }
00229
00230
00231
00232
00233
00234 char header_buffer[MIOP_HEADER_SIZE + 8];
00235 TAO_OutputCDR miop_hdr (header_buffer, MIOP_HEADER_SIZE + 8);
00236
00237 miop_hdr.write_octet_array (miop_magic, 4);
00238 miop_hdr.write_octet (0x10);
00239 CORBA::Octet *flags_field = reinterpret_cast<CORBA::Octet *> (miop_hdr.current ()->wr_ptr ());
00240
00241
00242
00243
00244
00245
00246 miop_hdr.write_octet (TAO_ENCAP_BYTE_ORDER);
00247
00248
00249
00250
00251 CORBA::UShort *packet_length = reinterpret_cast<CORBA::UShort *> (miop_hdr.current ()->wr_ptr ());
00252 miop_hdr.write_short (0);
00253
00254
00255 CORBA::ULong *packet_number = reinterpret_cast<CORBA::ULong *> (miop_hdr.current ()->wr_ptr ());
00256 miop_hdr.write_ulong (0);
00257
00258
00259 miop_hdr.write_ulong (num_fragments);
00260
00261
00262 ptrdiff_t unique_id = reinterpret_cast<ptrdiff_t> (iov);
00263 this->write_unique_id (miop_hdr,
00264 static_cast<unsigned long> (unique_id));
00265
00266
00267 current_fragment = &fragments[0];
00268 while (num_fragments > 0 &&
00269 current_fragment->iovcnt > 1)
00270 {
00271
00272 *packet_length = static_cast<CORBA::UShort> (current_fragment->length);
00273
00274
00275 if (num_fragments == 1)
00276 {
00277 *flags_field |= 0x02;
00278 }
00279
00280
00281 current_fragment->iov[0].iov_base = miop_hdr.current ()->rd_ptr ();
00282 current_fragment->iov[0].iov_len = MIOP_HEADER_SIZE;
00283
00284
00285 ssize_t rc = this->connection_handler_->dgram ().send (current_fragment->iov,
00286 current_fragment->iovcnt,
00287 addr);
00288
00289 if (rc <= 0)
00290 {
00291 if (TAO_debug_level > 0)
00292 {
00293 ACE_DEBUG ((LM_DEBUG,
00294 ACE_TEXT ("\n\nTAO (%P|%t) ")
00295 ACE_TEXT ("UIPMC_Transport::send")
00296 ACE_TEXT (" %p\n\n"),
00297 ACE_TEXT ("Error returned from transport:")));
00298 }
00299
00300
00301 bytes_transferred = bytes_to_send;
00302 return 1;
00303 }
00304
00305
00306
00307 bytes_transferred += rc - MIOP_HEADER_SIZE;
00308
00309 if (TAO_debug_level > 0)
00310 {
00311 ACE_DEBUG ((LM_DEBUG,
00312 "TAO_UIPMC_Transport::send_i: sent %d bytes to %s:%d\n",
00313 rc,
00314 addr.get_host_addr (),
00315 addr.get_port_number ()));
00316 }
00317
00318
00319 (*packet_number)++;
00320 ++current_fragment;
00321 --num_fragments;
00322 }
00323
00324
00325 return bytes_transferred;
00326 }
00327
00328
00329 ssize_t
00330 TAO_UIPMC_Transport::recv (char *buf,
00331 size_t len,
00332 const ACE_Time_Value * )
00333 {
00334 ACE_INET_Addr from_addr;
00335
00336 ssize_t n = this->connection_handler_->mcast_dgram ().recv (buf,
00337 len,
00338 from_addr);
00339 if (TAO_debug_level > 5)
00340 {
00341 ACE_DEBUG ((LM_DEBUG,
00342 "TAO_UIPMC_Transport::recv_i: received %d bytes from %s:%d\n",
00343 n,
00344 from_addr.get_host_addr (),
00345 from_addr.get_port_number ()));
00346 }
00347
00348
00349 if (n < MIOP_MIN_HEADER_SIZE)
00350 {
00351 if (TAO_debug_level > 0)
00352 {
00353 ACE_DEBUG ((LM_DEBUG,
00354 "TAO_UIPMC_Transport::recv_i: packet of size %d is too small from %s:%d\n",
00355 n,
00356 from_addr.get_host_addr (),
00357 from_addr.get_port_number ()));
00358 }
00359 return 0;
00360 }
00361
00362
00363 if (buf[MIOP_MAGIC_OFFSET] != miop_magic [0] ||
00364 buf[MIOP_MAGIC_OFFSET + 1] != miop_magic [1] ||
00365 buf[MIOP_MAGIC_OFFSET + 2] != miop_magic [2] ||
00366 buf[MIOP_MAGIC_OFFSET + 3] != miop_magic [3])
00367 {
00368 if (TAO_debug_level > 0)
00369 {
00370 ACE_DEBUG ((LM_DEBUG,
00371 "TAO_UIPMC_Transport::recv_i: UIPMC packet didn't contain magic bytes.\n"));
00372 }
00373
00374 return 0;
00375 }
00376
00377
00378
00379
00380 CORBA::Octet byte_order = buf[MIOP_FLAGS_OFFSET] & 0x01;
00381
00382
00383
00384
00385 CORBA::ULong id_length;
00386 #if !defined (ACE_DISABLE_SWAP_ON_READ)
00387 if (byte_order == ACE_CDR_BYTE_ORDER)
00388 {
00389 id_length = *reinterpret_cast<ACE_CDR::ULong*> (&buf[MIOP_ID_LENGTH_OFFSET]);
00390 }
00391 else
00392 {
00393 ACE_CDR::swap_4 (&buf[MIOP_ID_LENGTH_OFFSET],
00394 reinterpret_cast<char*> (&id_length));
00395 }
00396 #else
00397 id_length = *reinterpret_cast<ACE_CDR::ULong*> (&buf[MIOP_ID_LENGTH_OFFSET]);
00398 #endif
00399
00400
00401 if (id_length > MIOP_MAX_LENGTH_ID ||
00402 static_cast<ssize_t> (MIOP_ID_CONTENT_OFFSET + id_length) > n)
00403 {
00404 if (TAO_debug_level > 0)
00405 {
00406 ACE_DEBUG ((LM_DEBUG,
00407 "TAO_UIPMC_Transport::recv_i: Invalid ID length.\n"));
00408 }
00409
00410 return 0;
00411 }
00412
00413
00414 ssize_t miop_header_size = (MIOP_ID_CONTENT_OFFSET + id_length + 7) & ~0x7;
00415 if (miop_header_size > n)
00416 {
00417 if (TAO_debug_level > 0)
00418 {
00419 ACE_DEBUG ((LM_DEBUG,
00420 "TAO_UIPMC_Transport::recv_i: MIOP packet not large enough for padding.\n"));
00421 }
00422
00423 return 0;
00424 }
00425
00426 n -= miop_header_size;
00427 ACE_OS::memmove (buf, buf + miop_header_size, n);
00428
00429 return n;
00430 }
00431
00432 int
00433 TAO_UIPMC_Transport::handle_input (TAO_Resume_Handle &rh,
00434 ACE_Time_Value *max_wait_time,
00435 int )
00436 {
00437
00438
00439
00440
00441
00442 char buf [MIOP_MAX_DGRAM_SIZE];
00443
00444 #if defined (ACE_INITIALIZE_MEMORY_BEFORE_USE)
00445 (void) ACE_OS::memset (buf,
00446 '\0',
00447 sizeof buf);
00448 #endif
00449
00450
00451 ACE_Data_Block db (sizeof (buf),
00452 ACE_Message_Block::MB_DATA,
00453 buf,
00454 this->orb_core_->input_cdr_buffer_allocator (),
00455 this->orb_core_->locking_strategy (),
00456 ACE_Message_Block::DONT_DELETE,
00457 this->orb_core_->input_cdr_dblock_allocator ());
00458
00459
00460 ACE_Message_Block message_block (&db,
00461 ACE_Message_Block::DONT_DELETE,
00462 this->orb_core_->input_cdr_msgblock_allocator ());
00463
00464
00465
00466 ACE_CDR::mb_align (&message_block);
00467
00468
00469
00470
00471 ssize_t n = this->recv (message_block.rd_ptr (),
00472 message_block.space (),
00473 max_wait_time);
00474
00475
00476 if (n <= 0)
00477 {
00478 if (TAO_debug_level)
00479 {
00480 ACE_DEBUG ((LM_DEBUG,
00481 ACE_TEXT ("TAO: (%P|%t|%N|%l) recv returned error on transport %d after fault %p\n"),
00482 this->id (),
00483 ACE_TEXT ("handle_input_i ()\n")));
00484 }
00485
00486 if (n == -1)
00487 this->tms_->connection_closed ();
00488
00489 return n;
00490 }
00491
00492
00493 message_block.wr_ptr (n);
00494
00495
00496
00497 TAO_Queued_Data qd (&message_block);
00498 size_t mesg_length;
00499
00500
00501
00502 if (this->messaging_object ()->parse_next_message (message_block,
00503 qd,
00504 mesg_length) == -1)
00505 {
00506 if (TAO_debug_level)
00507 {
00508 ACE_DEBUG ((LM_DEBUG,
00509 ACE_TEXT ("TAO: (%P|%t|%N|%l) handle_input failed on transport %d after fault\n"),
00510 this->id () ));
00511 }
00512
00513 return -1;
00514 }
00515
00516 if (message_block.length () > mesg_length)
00517 {
00518 if (TAO_debug_level)
00519 {
00520 ACE_DEBUG ((LM_DEBUG,
00521 ACE_TEXT ("TAO: (%P|%t|%N|%l) handle_input failed on transport %d after fault\n"),
00522 this->id () ));
00523 }
00524
00525 return -1;
00526 }
00527
00528
00529
00530
00531
00532
00533 return this->process_parsed_messages (&qd, rh);
00534 }
00535
00536 int
00537 TAO_UIPMC_Transport::register_handler (void)
00538 {
00539
00540
00541
00542
00543
00544
00545
00546 return 0;
00547 }
00548
00549 int
00550 TAO_UIPMC_Transport::send_request (TAO_Stub *stub,
00551 TAO_ORB_Core *orb_core,
00552 TAO_OutputCDR &stream,
00553 int message_semantics,
00554 ACE_Time_Value *max_wait_time)
00555 {
00556 if (this->ws_->sending_request (orb_core,
00557 message_semantics) == -1)
00558 return -1;
00559
00560 if (this->send_message (stream,
00561 stub,
00562 message_semantics,
00563 max_wait_time) == -1)
00564
00565 return -1;
00566
00567 return 0;
00568 }
00569
00570 int
00571 TAO_UIPMC_Transport::send_message (TAO_OutputCDR &stream,
00572 TAO_Stub *stub,
00573 int message_semantics,
00574 ACE_Time_Value *max_wait_time)
00575 {
00576
00577 if (this->messaging_object_->format_message (stream) != 0)
00578 return -1;
00579
00580
00581
00582
00583
00584
00585 ssize_t n = this->send_message_shared (stub,
00586 message_semantics,
00587 stream.begin (),
00588 max_wait_time);
00589
00590 if (n == -1)
00591 {
00592 if (TAO_debug_level)
00593 ACE_DEBUG ((LM_DEBUG,
00594 ACE_TEXT ("TAO: (%P|%t|%N|%l) closing transport %d after fault %p\n"),
00595 this->id (),
00596 ACE_TEXT ("send_message ()\n")));
00597
00598 return -1;
00599 }
00600
00601 return 1;
00602 }
00603
00604 int
00605 TAO_UIPMC_Transport::messaging_init (CORBA::Octet major,
00606 CORBA::Octet minor)
00607 {
00608 this->messaging_object_->init (major,
00609 minor);
00610 return 1;
00611 }
00612
00613 TAO_END_VERSIONED_NAMESPACE_DECL