Acknowledge.h

Go to the documentation of this file.
00001 // file      : ace/RMCast/Acknowledge.h
00002 // author    : Boris Kolpackov <boris@kolpackov.net>
00003 // cvs-id    : Acknowledge.h,v 1.10 2005/10/16 14:31:48 schmidt Exp
00004 
00005 #ifndef ACE_RMCAST_ACKNOWLEDGE_H
00006 #define ACE_RMCAST_ACKNOWLEDGE_H
00007 
00008 #include "ace/Hash_Map_Manager.h"
00009 #include "ace/Thread_Manager.h"
00010 
00011 #include "Stack.h"
00012 #include "Protocol.h"
00013 #include "Bits.h"
00014 #include "Parameters.h"
00015 
00016 #if !defined (ACE_RMCAST_DEFAULT_MAP_SIZE)
00017 #define ACE_RMCAST_DEFAULT_MAP_SIZE 10
00018 #endif /* ACE_RMCAST_DEFAULT_MAP_SIZE */
00019 
00020 #if !defined (ACE_RMCAST_DEFAULT_QUEUE_SIZE)
00021 #define ACE_RMCAST_DEFAULT_QUEUE_SIZE 10
00022 #endif /* ACE_RMCAST_DEFAULT_QUEUE_SIZE */
00023 
00024 namespace ACE_RMCast
00025 {
00026   class Acknowledge : public Element
00027   {
00028   public:
00029     Acknowledge (Parameters const& params);
00030 
00031     virtual void
00032     in_start (In_Element* in);
00033 
00034     virtual void
00035     out_start (Out_Element* out);
00036 
00037     virtual void
00038     out_stop ();
00039 
00040   public:
00041     virtual void
00042     recv (Message_ptr m);
00043 
00044     virtual void
00045     send (Message_ptr m);
00046 
00047   // Sun C++ 5.4 can't handle private here.
00048   //
00049   // private:
00050   public:
00051     struct Descr
00052     {
00053       //@@ There should be no default c-tor.
00054       //
00055       Descr ()
00056           : nak_count_ (0), timer_ (1)
00057       {
00058       }
00059 
00060       Descr (unsigned long timer)
00061           : nak_count_ (0), timer_ (timer)
00062       {
00063       }
00064 
00065       Descr (Message_ptr m)
00066           : m_ (m)
00067       {
00068       }
00069 
00070     public:
00071       bool
00072       lost () const
00073       {
00074         return m_.get () == 0;
00075       }
00076 
00077     public:
00078       Message_ptr
00079       msg ()
00080       {
00081         return m_;
00082       }
00083 
00084       void
00085       msg (Message_ptr m)
00086       {
00087         m_ = m;
00088       }
00089 
00090     public:
00091       unsigned long
00092       nak_count () const
00093       {
00094         return nak_count_;
00095       }
00096 
00097       void
00098       nak_count (unsigned long v)
00099       {
00100         nak_count_ = v;
00101       }
00102 
00103       unsigned long
00104       timer () const
00105       {
00106         return timer_;
00107       }
00108 
00109       void
00110       timer (unsigned long v)
00111       {
00112         timer_ = v;
00113       }
00114 
00115     private:
00116       Message_ptr m_;
00117 
00118       unsigned long nak_count_;
00119       unsigned long timer_;
00120     };
00121 
00122   private:
00123     struct Queue : ACE_Hash_Map_Manager<u64, Descr, ACE_Null_Mutex>
00124     {
00125       typedef ACE_Hash_Map_Manager<u64, Descr, ACE_Null_Mutex> Base;
00126 
00127       // Should never be here but required by ACE_Hash_Blah_Blah.
00128       //
00129       Queue ()
00130           : Base (ACE_RMCAST_DEFAULT_MAP_SIZE), sn_ (0), max_sn_ (0)
00131       {
00132       }
00133 
00134       Queue (u64 sn)
00135         : Base (ACE_RMCAST_DEFAULT_MAP_SIZE), sn_ (sn), max_sn_ (sn)
00136       {
00137       }
00138 
00139       Queue (Queue const& q)
00140         : Base (ACE_RMCAST_DEFAULT_MAP_SIZE), sn_ (q.sn_), max_sn_ (sn_)
00141       {
00142         for (Queue::const_iterator i (q), e (q, 1); i != e; ++i)
00143         {
00144           bind ((*i).ext_id_, (*i).int_id_);
00145         }
00146       }
00147 
00148     public:
00149       int
00150       bind (u64 sn, Descr const& d)
00151       {
00152         int r (Base::bind (sn, d));
00153 
00154         if (r == 0 && sn > max_sn_) max_sn_ = sn;
00155 
00156         return r;
00157       }
00158 
00159       int
00160       rebind (u64 sn, Descr const& d)
00161       {
00162         int r (Base::rebind (sn, d));
00163 
00164         if (r == 0 && sn > max_sn_) max_sn_ = sn;
00165 
00166         return r;
00167       }
00168 
00169       int
00170       unbind (u64 sn)
00171       {
00172         int r (Base::unbind (sn));
00173 
00174         if (r == 0 && sn == max_sn_)
00175         {
00176           for (--max_sn_; max_sn_ >= sn_; --max_sn_)
00177           {
00178             if (find (max_sn_) == 0) break;
00179           }
00180         }
00181 
00182         return r;
00183       }
00184 
00185     public:
00186       u64
00187       sn () const
00188       {
00189         return sn_;
00190       }
00191 
00192       void
00193       sn (u64 sn)
00194       {
00195         sn_ = sn;
00196       }
00197 
00198       u64
00199       max_sn () const
00200       {
00201         if (current_size () == 0) return sn_;
00202 
00203         return max_sn_;
00204       }
00205 
00206     private:
00207       u64 sn_, max_sn_;
00208     };
00209 
00210     typedef
00211     ACE_Hash_Map_Manager_Ex<Address,
00212                             Queue,
00213                             AddressHasher,
00214                             ACE_Equal_To<Address>,
00215                             ACE_Null_Mutex>
00216     Map;
00217 
00218   private:
00219     void
00220     collapse (Queue& q);
00221 
00222     void
00223     track ();
00224 
00225     void
00226     track_queue (Address const& addr, Queue& q, Messages& msgs);
00227 
00228     Profile_ptr
00229     create_nrtm (u32 max_elem);
00230 
00231     static ACE_THR_FUNC_RETURN
00232     track_thunk (void* obj);
00233 
00234   private:
00235     Parameters const& params_;
00236 
00237     Map hold_;
00238     Mutex mutex_;
00239     Condition cond_;
00240 
00241     unsigned long nrtm_timer_;
00242 
00243     bool stop_;
00244     ACE_Thread_Manager tracker_mgr_;
00245   };
00246 }
00247 
00248 #endif  // ACE_RMCAST_ACKNOWLEDGE_H

Generated on Thu Nov 9 11:40:40 2006 for ACE_RMCast by doxygen 1.3.6