#include <Link.h>


Public Member Functions | |
| ~Link () | |
| Link (Address const &addr, Parameters const ¶ms) | |
| virtual void | in_start (In_Element *in) |
| virtual void | out_start (Out_Element *out) |
| virtual void | in_stop () |
| virtual void | send (Message_ptr m) |
Private Member Functions | |
| virtual void | send_ (Message_ptr m) |
| void | recv () |
| virtual void | recv (Message_ptr) |
Static Private Member Functions | |
| static ACE_THR_FUNC_RETURN | recv_thunk (void *obj) |
Private Attributes | |
| Parameters const & | params_ |
| Address | addr_ |
| Address | self_ |
| ACE_SOCK_Dgram_Mcast | rsock_ |
| ACE_SOCK_Dgram | ssock_ |
| bool | stop_ |
| ACE_Thread_Manager | recv_mgr_ |
| Message_ptr | hold_ |
| Mutex | mutex_ |
Definition at line 19 of file Link.h.
| ACE_RMCast::Link::~Link | ( | ) |
| ACE_RMCast::Link::Link | ( | Address const & | addr, | |
| Parameters const & | params | |||
| ) |
Definition at line 21 of file Link.cpp.
: params_ (params), addr_ (addr), ssock_ (Address (static_cast<unsigned short> (0), static_cast<ACE_UINT32> (INADDR_ANY)), AF_INET, IPPROTO_UDP, 1), stop_ (false) { ACE_OS::srand ((unsigned int) ACE_OS::time (0)); rsock_.set_option (IP_MULTICAST_LOOP, 0); // rsock_.set_option (IP_MULTICAST_TTL, 0); // Set recv/send buffers. // { int r (131070); int s (sizeof (r)); static_cast<ACE_SOCK&> (rsock_).set_option ( SOL_SOCKET, SO_RCVBUF, &r, s); static_cast<ACE_SOCK&> (ssock_).set_option ( SOL_SOCKET, SO_RCVBUF, &r, s); rsock_.get_option (SOL_SOCKET, SO_RCVBUF, &r, &s); //cerr << 5 << "recv buffer size: " << r << endl; ssock_.get_option (SOL_SOCKET, SO_RCVBUF, &r, &s); //cerr << 5 << "send buffer size: " << r << endl; } // Bind address and port. // if (ACE_OS::connect (ssock_.get_handle (), reinterpret_cast<sockaddr*> (addr_.get_addr ()), addr_.get_addr_size ()) == -1) { ACE_OS::perror ("connect: "); ACE_OS::abort (); } ssock_.get_local_addr (self_);
| void ACE_RMCast::Link::in_start | ( | In_Element * | in | ) | [virtual] |
| void ACE_RMCast::Link::in_stop | ( | ) | [virtual] |
| void ACE_RMCast::Link::out_start | ( | Out_Element * | out | ) | [virtual] |
| void ACE_RMCast::Link::recv | ( | Message_ptr | ) | [private, virtual] |
| void ACE_RMCast::Link::recv | ( | ) | [private] |
Definition at line 185 of file Link.cpp.
{
size_t max_packet_size (params_.max_packet_size ());
// This is wicked.
//
ACE_Auto_Ptr<char> holder (
reinterpret_cast<char*> (
operator new (max_packet_size + ACE_CDR::MAX_ALIGNMENT)));
char* data = ACE_ptr_align_binary (holder.get (), ACE_CDR::MAX_ALIGNMENT);
size_t size (0);
while (true)
{
//@@ Should I lock here?
//
Address addr;
// Block for up to one tick waiting for an incomming message.
//
for (;;)
{
ACE_Time_Value t (params_.tick ());
ssize_t r = rsock_.recv (data, 4, addr, MSG_PEEK, &t);
// Check for cancellation request.
//
{
Lock l (mutex_);
if (stop_)
return;
}
if (r == -1)
{
if (errno != ETIME)
ACE_OS::abort ();
}
else
{
size = static_cast<size_t> (r);
break;
}
}
if (size != 4 || addr == self_)
{
// Discard bad messages and ones from ourselvs since
// we are using reliable loopback.
//
rsock_.recv (data, 0, addr);
continue;
}
u32 msg_size;
{
istream is (data, size, 1); // Always little-endian.
is >> msg_size;
}
if (msg_size <= 4 || msg_size > max_packet_size)
{
// Bad message.
//
rsock_.recv (data, 0, addr);
continue;
}
size = rsock_.recv (data, max_packet_size, addr);
if (msg_size != size)
{
// Bad message.
//
continue;
}
//cerr << 6 << "from: " << addr << endl;
Message_ptr m (new Message ());
m->add (Profile_ptr (new From (addr)));
m->add (Profile_ptr (new To (self_)));
istream is (data, size, 1); // Always little-endian.
is >> msg_size;
while (true)
{
u16 id, size;
if (!((is >> id) && (is >> size))) break;
//cerr << 6 << "reading profile with id " << id << " "
// << size << " bytes long" << endl;
Profile::Header hdr (id, size);
if (id == SN::id)
{
m->add (Profile_ptr (new SN (hdr, is)));
}
else if (id == Data::id)
{
m->add (Profile_ptr (new Data (hdr, is)));
}
else if (id == NAK::id)
{
m->add (Profile_ptr (new NAK (hdr, is)));
}
else if (id == NRTM::id)
{
m->add (Profile_ptr (new NRTM (hdr, is)));
}
else if (id == NoData::id)
{
m->add (Profile_ptr (new NoData (hdr, is)));
}
else if (id == Part::id)
{
m->add (Profile_ptr (new Part (hdr, is)));
}
else
{
//cerr << 0 << "unknown profile id " << hdr.id () << endl;
ACE_OS::abort ();
}
}
in_->recv (m);
| ACE_THR_FUNC_RETURN ACE_RMCast::Link::recv_thunk | ( | void * | obj | ) | [static, private] |
| void ACE_RMCast::Link::send | ( | Message_ptr | m | ) | [virtual] |
Reimplemented from ACE_RMCast::Out_Element.
Definition at line 106 of file Link.cpp.
{
// Simulate message loss and reordering.
//
if (params_.simulator ())
{
if ((ACE_OS::rand () % 17) != 0)
{
Lock l (mutex_);
if (hold_.get ())
{
send_ (m);
send_ (hold_);
hold_ = Message_ptr (0);
}
else
{
if ((ACE_OS::rand () % 17) != 0)
{
send_ (m);
}
else
{
hold_ = m;
// Make a copy in M so that the reliable loop below
// won't add FROM and TO to HOLD_.
//
m = hold_->clone ();
}
}
}
}
else
send_ (m);
// Reliable loop.
//
m->add (Profile_ptr (new From (self_)));
m->add (Profile_ptr (new To (self_)));
| void ACE_RMCast::Link::send_ | ( | Message_ptr | m | ) | [private, virtual] |
Definition at line 152 of file Link.cpp.
{
ostream os (m->size (), 1); // Always little-endian.
os << *m;
if (os.length () > size_t (params_.max_packet_size ()))
{
ACE_ERROR ((LM_ERROR,
"packet length (%d) exceeds max_poacket_size (%d)\n",
os.length (), params_.max_packet_size ()));
for (Message::ProfileIterator i (m->begin ()); !i.done (); i.advance ())
{
ACE_ERROR ((LM_ERROR,
"profile id: %d; size: %d\n",
(*i).ext_id_, (*i).int_id_->size ()));
}
ACE_OS::abort ();
}
ssock_.send (os.buffer (), os.length (), addr_);
/*
if (m->find (nrtm::id))
{
ACE_OS::write (1, os.buffer (), os.length ());
ACE_OS::exit (1);
}
Address ACE_RMCast::Link::addr_ [private] |
Message_ptr ACE_RMCast::Link::hold_ [private] |
Mutex ACE_RMCast::Link::mutex_ [private] |
Parameters const& ACE_RMCast::Link::params_ [private] |
ACE_SOCK_Dgram_Mcast ACE_RMCast::Link::rsock_ [private] |
Address ACE_RMCast::Link::self_ [private] |
ACE_SOCK_Dgram ACE_RMCast::Link::ssock_ [private] |
bool ACE_RMCast::Link::stop_ [private] |
1.7.0