sequence_tracker.cpp

Go to the documentation of this file.
00001 #ifndef SEQUENCE_TRACKER_IMPLEMENTATION_FILE
00002 #define SEQUENCE_TRACKER_IMPLEMENTATION_FILE
00003 
00004 /*****************************************************************************\
00005 *                                                                             *
00006 *  Name   : sequence_tracker                                                  *
00007 *  Author : Chris Koeritz                                                     *
00008 *                                                                             *
00009 *******************************************************************************
00010 * Copyright (c) 2002-$now By Author.  This program is free software; you can  *
00011 * redistribute it and/or modify it under the terms of the GNU General Public  *
00012 * License as published by the Free Software Foundation; either version 2 of   *
00013 * the License or (at your option) any later version.  This is online at:      *
00014 *     http://www.fsf.org/copyleft/gpl.html                                    *
00015 * Please send any updates to: fred@gruntose.com                               *
00016 \*****************************************************************************/
00017 
00018 #include "imp_sockets.h"
00019 #include "machine_uid.h"
00020 #include "sequence_tracker.h"
00021 
00022 #include <basis/function.h>
00023 #include <basis/istring.h>
00024 #include <basis/log_base.h>
00025 #include <basis/mutex.h>
00026 #include <data_struct/amorph.cpp>
00027 #include <data_struct/int_hash.cpp>
00028 #include <mechanisms/time_stamp.h>
00029 
00030 const int MAX_BITS_FOR_SEQ_HASH = 10;
00031   // the number of bits in the hash table of sequences, allowing 2^max buckets.
00032 
00033 const int CLEANING_SPAN = 20000;
00034   // if the sequence number is this far off from the one received, we will
00035   // clean up the span list.
00036 
00037 const int MAX_ITEMS = 200;
00038   // maximum number of items in tracker.  this is quite low since we don't
00039   // want to be lugging around thousands of indices.  for connection oriented,
00040   // it will never be much of an issue, although for a broadcast style bus it
00041   // could be kind of an issue if we do retransmissions with a lot of lag.
00042 
00043 #undef LOG
00044 #define LOG(s) CLASS_EMERGENCY_LOG(program_wide_logger(), s)
00045 
00046 //hmmm: we need to address when a host has:
00047 //        a) rolled over in sequences (not for 4 years at least?)
00048 //        b) changed its address where another host now has old address (which
00049 //           is also a weirdo sequence jump, maybe backwards, maybe not).
00050 //      would our timing help to guarantee that any oddness introduced is
00051 //      swamped out in a few minutes?  the worst thing would be a lost packet
00052 //      we dumped because we thought we'd seen it but hadn't.
00053 //      probably we will see a sequence that's really old seeming; should that
00054 //      be enough to trigger flushing the whole host?
00055 
00056 class sequence_record
00057 {
00058 public:
00059   int _sequence;  // the sequence number in question.
00060   time_stamp _when;  // when we received this sequence.
00061 
00062   sequence_record(int seq = 0) : _sequence(seq) {}
00063 
00064   istring text_form() const {
00065     return isprintf("seq=%d, time=", _sequence) + _when.text_form();
00066   }
00067 };
00068 
00070 
00071 class host_record
00072 {
00073 public:
00074   int _received_to;  // highest sequence we've got full reception prior to.
00075   machine_uid _host;  // the host we're concerned with.
00076   int_hash<sequence_record> _sequences;  // record of active sequences.
00077   time_stamp _last_active;  // the last time we heard from this host.
00078     // we could piece this together from the sequences but we prefer not to.
00079 
00080   host_record(const machine_uid &host)
00081   : _received_to(0), _host(host), _sequences(MAX_BITS_FOR_SEQ_HASH),
00082     _last_active()
00083   {}
00084 
00085   void clean_up(int coalesce_time) {
00086     // check sequences for being right next to the highest received sequence.
00087     // if they're one up, they can be collapsed without waiting for the aging
00088     // process.
00089     int_set ids;
00090     _sequences.ids(ids);
00091 
00092     // we restrict the size of the array with this block.
00093     if (ids.elements() > MAX_ITEMS) {
00094       int zap_point = ids.elements() - MAX_ITEMS;
00095         // we want to remove anything before this index.
00096       for (int s0 = 0; s0 < zap_point; s0++) {
00097         int seq = ids[s0];
00098         _sequences.zap(seq);
00099         // set our received_to value from the current element.
00100         if (_received_to < seq)
00101           _received_to = seq;
00102       }
00103       // now clean the list of our ids since they're gone.
00104       ids.zap(0, zap_point - 1);
00105     }
00106 
00107     for (int s1 = 0; s1 < ids.elements(); s1++) {
00108       sequence_record *seq;
00109       int id = ids[s1];
00110       if (!_sequences.find(id, seq)) continue;  // bad mistake going on.
00111       if (_received_to + 1 == seq->_sequence) {
00112         // we've hit one that can be collapsed.
00113         _received_to++;
00114         _sequences.zap(id);
00115         ids.zap(s1, s1);
00116         s1--;  // skip back before deleted item.
00117       }
00118     }
00119 
00120     // check sequence ages.  coalesce any older ones.
00121     for (int s2 = 0; s2 < ids.elements(); s2++) {
00122       sequence_record *seq;
00123       int id = ids[s2];
00124       if (!_sequences.find(id, seq)) continue;  // bad mistake going on.
00125       if (seq->_when < time_stamp(-coalesce_time)) {
00126         // this sequence number has floated too long; crush it now.
00127         if (_received_to < seq->_sequence)
00128           _received_to = seq->_sequence;  // update highest received seq.
00129         _sequences.zap(id);
00130       }
00131     }
00132   }
00133 
00134   istring text_form(bool verbose) {
00135     istring to_return;
00136     to_return += istring("host=") + _host.text_form()
00137         + isprintf(", rec_to=%d", _received_to)
00138         + ", active=" + _last_active.text_form();
00139     if (verbose) {
00140       int_set ids;
00141       _sequences.ids(ids);
00142       for (int i = 0; i < ids.elements(); i++) {
00143         sequence_record *found;
00144         if (!_sequences.find(ids[i], found))
00145           continue;  // that's a bad thing.
00146         to_return += istring(log_base::platform_ending()) + "\t"
00147             + found->text_form();
00148       }
00149     } else {
00150       to_return += isprintf(", sequences held=%d", _sequences.elements());
00151     }
00152     return to_return;
00153   }
00154 
00155 };
00156 
00158 
00159 //hmmm: should this be a hash table?
00160 
00161 class host_history : public amorph<host_record>
00162 {
00163 public:
00164   virtual ~host_history() {}
00165 
00166   IMPLEMENT_CLASS_NAME("host_history");
00167 
00168   int find_host(const machine_uid &to_find) {
00169     for (int i = 0; i < elements(); i++) {
00170       if (borrow(i)->_host == to_find) return i;
00171     }
00172     return common::NOT_FOUND;
00173   }
00174 
00175   bool whack_host(const machine_uid &to_find) {
00176     int indy = find_host(to_find);
00177     if (negative(indy)) return false;
00178     zap(indy, indy);
00179     return true;
00180   }
00181 
00182   void clean_up(int silence_time, int coalesce_time) {
00183     for (int h = 0; h < elements(); h++) {
00184       host_record &rec = *borrow(h);
00185       // check host liveliness.
00186       if (rec._last_active < time_stamp(-silence_time)) {
00187         // this host got too stale; whack it now.
00188         zap(h, h);
00189         h--;  // skip back to prior element.
00190         continue;
00191       }
00192       rec.clean_up(coalesce_time);
00193     }
00194   }
00195 
00196   bool add_sequence(const machine_uid &to_find, int sequence,
00197       int silence_time, int coalesce_time) {
00198     FUNCDEF("add_sequence");
00199     int indy = find_host(to_find);
00200     if (negative(indy)) {
00201       host_record *rec = new host_record(to_find);
00202       append(rec);
00203       indy = find_host(to_find);
00204       if (negative(indy)) {
00205         LOG(istring("*** failure to add a host to the tracker: ")
00206             + to_find.text_form());
00207         return false;  // serious error.
00208       }
00209     }
00210     host_record &rec = *borrow(indy);
00211     if (borrow(indy)->_received_to + 1 == sequence) {
00212       // this is just one up from our last received guy, so optimize it out.
00213       rec._received_to = sequence;
00214     } else if (sequence - borrow(indy)->_received_to > CLEANING_SPAN) {
00215       // if the number is wildly different, assume we haven't dealt with this
00216       // for too long.
00217       rec._received_to = sequence;
00218 #ifdef DEBUG_SEQUENCE_TRACKER
00219       LOG("sequence is wildly different, cleaning.");
00220 #endif
00221       clean_up(silence_time, coalesce_time);
00222     } else {
00223       // standard treatment, add it to the list.
00224       rec._sequences.add(sequence, new sequence_record(sequence));
00225       if (rec._sequences.elements() > MAX_ITEMS) {
00226         // too many sequences floating around now.  clean them up.
00227         clean_up(silence_time, coalesce_time);
00228       }
00229     }
00230     rec._last_active = time_stamp();
00231     return true;
00232   }
00233 
00234   istring text_form(bool verbose) {
00235     istring to_return;
00236     for (int i = 0; i < elements(); i++) {
00237       to_return += borrow(i)->text_form(verbose);
00238       if (i < elements() - 1)
00239         to_return += log_base::platform_ending();
00240     }
00241     return to_return;
00242   }
00243 
00244 };
00245 
00247 
00248 sequence_tracker::sequence_tracker(int coalesce_time, int silence_time)
00249 : _coalesce_time(coalesce_time),
00250   _silence_time(silence_time),
00251   _hosts(new host_history),
00252   _lock(new mutex)
00253 {
00254 }
00255 
00256 sequence_tracker::~sequence_tracker()
00257 {
00258   WHACK(_lock);
00259   WHACK(_hosts);
00260 }
00261 
00262 istring sequence_tracker::text_form(bool verbose) const
00263 {
00264   auto_synchronizer l(*_lock);
00265   return _hosts->text_form(verbose);
00266 }
00267 
00268 void sequence_tracker::add_pair(const machine_uid &host, int sequence)
00269 {
00270   auto_synchronizer l(*_lock);
00271   if (!_hosts->add_sequence(host, sequence, _silence_time, _coalesce_time)) {
00272 //complain? 
00273     return;
00274   }
00275 }
00276 
00277 bool sequence_tracker::have_seen(const machine_uid &host, int sequence)
00278 {
00279   auto_synchronizer l(*_lock);
00280   int indy = _hosts->find_host(host);
00281   if (negative(indy)) return false;
00282   host_record &rec = *_hosts->borrow(indy);
00283   if (sequence <= rec._received_to) return true;
00284   sequence_record *found;
00285   return !!rec._sequences.find(sequence, found);
00286 }
00287 
00288 void sequence_tracker::clean_up()
00289 {
00290   auto_synchronizer l(*_lock);
00291   _hosts->clean_up(_silence_time, _coalesce_time);
00292 }
00293 
00294 
00295 #endif //SEQUENCE_TRACKER_IMPLEMENTATION_FILE
00296 

Generated on Fri Nov 21 04:29:16 2008 for HOOPLE Libraries by  doxygen 1.5.1