00001 #ifndef SEQUENCE_TRACKER_IMPLEMENTATION_FILE
00002 #define SEQUENCE_TRACKER_IMPLEMENTATION_FILE
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018 #include "imp_sockets.h"
00019 #include "machine_uid.h"
00020 #include "sequence_tracker.h"
00021
00022 #include <basis/function.h>
00023 #include <basis/istring.h>
00024 #include <basis/log_base.h>
00025 #include <basis/mutex.h>
00026 #include <data_struct/amorph.cpp>
00027 #include <data_struct/int_hash.cpp>
00028 #include <mechanisms/time_stamp.h>
00029
00030 const int MAX_BITS_FOR_SEQ_HASH = 10;
00031
00032
00033 const int CLEANING_SPAN = 20000;
00034
00035
00036
00037 const int MAX_ITEMS = 200;
00038
00039
00040
00041
00042
00043 #undef LOG
00044 #define LOG(s) CLASS_EMERGENCY_LOG(program_wide_logger(), s)
00045
00046
00047
00048
00049
00050
00051
00052
00053
00054
00055
00056 class sequence_record
00057 {
00058 public:
00059 int _sequence;
00060 time_stamp _when;
00061
00062 sequence_record(int seq = 0) : _sequence(seq) {}
00063
00064 istring text_form() const {
00065 return isprintf("seq=%d, time=", _sequence) + _when.text_form();
00066 }
00067 };
00068
00070
00071 class host_record
00072 {
00073 public:
00074 int _received_to;
00075 machine_uid _host;
00076 int_hash<sequence_record> _sequences;
00077 time_stamp _last_active;
00078
00079
00080 host_record(const machine_uid &host)
00081 : _received_to(0), _host(host), _sequences(MAX_BITS_FOR_SEQ_HASH),
00082 _last_active()
00083 {}
00084
00085 void clean_up(int coalesce_time) {
00086
00087
00088
00089 int_set ids;
00090 _sequences.ids(ids);
00091
00092
00093 if (ids.elements() > MAX_ITEMS) {
00094 int zap_point = ids.elements() - MAX_ITEMS;
00095
00096 for (int s0 = 0; s0 < zap_point; s0++) {
00097 int seq = ids[s0];
00098 _sequences.zap(seq);
00099
00100 if (_received_to < seq)
00101 _received_to = seq;
00102 }
00103
00104 ids.zap(0, zap_point - 1);
00105 }
00106
00107 for (int s1 = 0; s1 < ids.elements(); s1++) {
00108 sequence_record *seq;
00109 int id = ids[s1];
00110 if (!_sequences.find(id, seq)) continue;
00111 if (_received_to + 1 == seq->_sequence) {
00112
00113 _received_to++;
00114 _sequences.zap(id);
00115 ids.zap(s1, s1);
00116 s1--;
00117 }
00118 }
00119
00120
00121 for (int s2 = 0; s2 < ids.elements(); s2++) {
00122 sequence_record *seq;
00123 int id = ids[s2];
00124 if (!_sequences.find(id, seq)) continue;
00125 if (seq->_when < time_stamp(-coalesce_time)) {
00126
00127 if (_received_to < seq->_sequence)
00128 _received_to = seq->_sequence;
00129 _sequences.zap(id);
00130 }
00131 }
00132 }
00133
00134 istring text_form(bool verbose) {
00135 istring to_return;
00136 to_return += istring("host=") + _host.text_form()
00137 + isprintf(", rec_to=%d", _received_to)
00138 + ", active=" + _last_active.text_form();
00139 if (verbose) {
00140 int_set ids;
00141 _sequences.ids(ids);
00142 for (int i = 0; i < ids.elements(); i++) {
00143 sequence_record *found;
00144 if (!_sequences.find(ids[i], found))
00145 continue;
00146 to_return += istring(log_base::platform_ending()) + "\t"
00147 + found->text_form();
00148 }
00149 } else {
00150 to_return += isprintf(", sequences held=%d", _sequences.elements());
00151 }
00152 return to_return;
00153 }
00154
00155 };
00156
00158
00159
00160
00161 class host_history : public amorph<host_record>
00162 {
00163 public:
00164 virtual ~host_history() {}
00165
00166 IMPLEMENT_CLASS_NAME("host_history");
00167
00168 int find_host(const machine_uid &to_find) {
00169 for (int i = 0; i < elements(); i++) {
00170 if (borrow(i)->_host == to_find) return i;
00171 }
00172 return common::NOT_FOUND;
00173 }
00174
00175 bool whack_host(const machine_uid &to_find) {
00176 int indy = find_host(to_find);
00177 if (negative(indy)) return false;
00178 zap(indy, indy);
00179 return true;
00180 }
00181
00182 void clean_up(int silence_time, int coalesce_time) {
00183 for (int h = 0; h < elements(); h++) {
00184 host_record &rec = *borrow(h);
00185
00186 if (rec._last_active < time_stamp(-silence_time)) {
00187
00188 zap(h, h);
00189 h--;
00190 continue;
00191 }
00192 rec.clean_up(coalesce_time);
00193 }
00194 }
00195
00196 bool add_sequence(const machine_uid &to_find, int sequence,
00197 int silence_time, int coalesce_time) {
00198 FUNCDEF("add_sequence");
00199 int indy = find_host(to_find);
00200 if (negative(indy)) {
00201 host_record *rec = new host_record(to_find);
00202 append(rec);
00203 indy = find_host(to_find);
00204 if (negative(indy)) {
00205 LOG(istring("*** failure to add a host to the tracker: ")
00206 + to_find.text_form());
00207 return false;
00208 }
00209 }
00210 host_record &rec = *borrow(indy);
00211 if (borrow(indy)->_received_to + 1 == sequence) {
00212
00213 rec._received_to = sequence;
00214 } else if (sequence - borrow(indy)->_received_to > CLEANING_SPAN) {
00215
00216
00217 rec._received_to = sequence;
00218 #ifdef DEBUG_SEQUENCE_TRACKER
00219 LOG("sequence is wildly different, cleaning.");
00220 #endif
00221 clean_up(silence_time, coalesce_time);
00222 } else {
00223
00224 rec._sequences.add(sequence, new sequence_record(sequence));
00225 if (rec._sequences.elements() > MAX_ITEMS) {
00226
00227 clean_up(silence_time, coalesce_time);
00228 }
00229 }
00230 rec._last_active = time_stamp();
00231 return true;
00232 }
00233
00234 istring text_form(bool verbose) {
00235 istring to_return;
00236 for (int i = 0; i < elements(); i++) {
00237 to_return += borrow(i)->text_form(verbose);
00238 if (i < elements() - 1)
00239 to_return += log_base::platform_ending();
00240 }
00241 return to_return;
00242 }
00243
00244 };
00245
00247
00248 sequence_tracker::sequence_tracker(int coalesce_time, int silence_time)
00249 : _coalesce_time(coalesce_time),
00250 _silence_time(silence_time),
00251 _hosts(new host_history),
00252 _lock(new mutex)
00253 {
00254 }
00255
00256 sequence_tracker::~sequence_tracker()
00257 {
00258 WHACK(_lock);
00259 WHACK(_hosts);
00260 }
00261
00262 istring sequence_tracker::text_form(bool verbose) const
00263 {
00264 auto_synchronizer l(*_lock);
00265 return _hosts->text_form(verbose);
00266 }
00267
00268 void sequence_tracker::add_pair(const machine_uid &host, int sequence)
00269 {
00270 auto_synchronizer l(*_lock);
00271 if (!_hosts->add_sequence(host, sequence, _silence_time, _coalesce_time)) {
00272
00273 return;
00274 }
00275 }
00276
00277 bool sequence_tracker::have_seen(const machine_uid &host, int sequence)
00278 {
00279 auto_synchronizer l(*_lock);
00280 int indy = _hosts->find_host(host);
00281 if (negative(indy)) return false;
00282 host_record &rec = *_hosts->borrow(indy);
00283 if (sequence <= rec._received_to) return true;
00284 sequence_record *found;
00285 return !!rec._sequences.find(sequence, found);
00286 }
00287
00288 void sequence_tracker::clean_up()
00289 {
00290 auto_synchronizer l(*_lock);
00291 _hosts->clean_up(_silence_time, _coalesce_time);
00292 }
00293
00294
00295 #endif //SEQUENCE_TRACKER_IMPLEMENTATION_FILE
00296