00001 #ifndef ZING_TABLE_IMPLEMENTATION_FILE
00002 #define ZING_TABLE_IMPLEMENTATION_FILE
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018 #include "event_record.h"
00019 #include "zing_table.h"
00020
00021 #include <basis/auto_synch.h>
00022 #include <basis/mutex.h>
00023 #include <data_struct/amorph.cpp>
00024 #include <data_struct/int_hash.cpp>
00025 #include <data_struct/memory_limiter.h>
00026 #include <mechanisms/heartbeat.h>
00027 #include <mechanisms/time_stamp.h>
00028 #include <opsystem/system_values.h>
00029 #include <textual/string_manipulation.h>
00030
00031 using namespace basis;
00032
00033
00034
00035
00036 #undef LOG
00037 #define LOG(s) CLASS_EMERGENCY_LOG(program_wide_logger(), s)
00038
00039 const int ZING_COMPLAINT_INTERVAL = 2 * MINUTE_ms;
00040
00041
00043
00044 class queued_zings : public amorph<event_record> {};
00045
00047
00048 class zing_queue_amorph : public int_hash<queued_zings>
00049 {
00050 public:
00051 zing_queue_amorph() : int_hash<queued_zings>(10) {}
00052
00053
00054 };
00055
00057
00058 zing_table::zing_table(int overall_memory, int per_user_memory)
00059 : _clients(new zing_queue_amorph),
00060 _lock(new mutex),
00061 _limiter(new memory_limiter(overall_memory, per_user_memory))
00062 {}
00063
00064 zing_table::~zing_table()
00065 {
00066 WHACK(_lock);
00067 WHACK(_clients);
00068 WHACK(_limiter);
00069 }
00070
00071 bool zing_table::add_client(int new_id)
00072 {
00073 auto_synchronizer l(*_lock);
00074 queued_zings *cli = _clients->find(new_id);
00075 if (cli) return false;
00076 queued_zings *new_entry = new queued_zings;
00077 _clients->add(new_id, new_entry);
00078
00079 return true;
00080 }
00081
00082 bool zing_table::remove_client(int outgoing_id)
00083 {
00084 auto_synchronizer l(*_lock);
00085 queued_zings *cli = _clients->find(outgoing_id);
00086 if (!cli) return false;
00087 _clients->zap(outgoing_id);
00088 return true;
00089 }
00090
00091 void zing_table::zing_event(int client_id, int event, const byte_array &data)
00092 {
00093 zing_event(client_id, event, data, int_set());
00094 }
00095
00096 void zing_table::zing_event(int client_id, int event, const byte_array &data,
00097 const int_set &targets)
00098 {
00099 FUNCDEF("zing_event");
00100 auto_synchronizer l(*_lock);
00101
00102
00103
00104
00105
00106
00107
00108
00109
00110
00111
00112
00113 event_record to_add(client_id, event, data, targets);
00114 byte_array packed_blob;
00115 to_add.pack(packed_blob);
00116
00117 int size_guess = sizeof(event_record) + packed_blob.length()
00118 + 10 * sizeof(int) + (targets.length() + 1) * sizeof(int);
00119 if (!_limiter->okay_allocation(client_id, size_guess)) {
00120
00121 static time_stamp last_complaint(-ZING_COMPLAINT_INTERVAL * 2);
00122
00123 if (time_stamp().value() - last_complaint.value()
00124 > ZING_COMPLAINT_INTERVAL) {
00125 LOG(isprintf("data overflow on client %d!", client_id));
00126 last_complaint = time_stamp();
00127 }
00128 return;
00129 }
00130
00131 queued_zings *cab = _clients->find(client_id);
00132 if (!cab) return;
00133
00134 event_record *to_store
00135 = new event_record(client_id, event, packed_blob, targets);
00136 cab->append(to_store);
00137 }
00138
00139 int zing_table::show_events(int client_id, const istring &name,
00140 istring &output, int indentation)
00141 {
00142 istring indent = string_manipulation::indentation(indentation);
00143 int events_printed = 0;
00144
00145 auto_synchronizer l(*_lock);
00146 queued_zings *cab = _clients->find(client_id);
00147 if (!cab) return 0;
00148
00149 system_values event_namer(system_values::EVENT_VALUES());
00150 for (int i = 0; i < cab->elements(); i++) {
00151 event_record *w = cab->borrow(i);
00152 if (w) {
00153 istring event_name;
00154 istring descrip;
00155 istring file_loc;
00156 event_namer.lookup(w->_event, event_name, descrip, file_loc);
00157
00158 if (!event_name)
00159 event_name = isprintf("%d", w->_event);
00160 output += indent + isprintf("%s with id %d has an event \"%s\" "
00161 "listed (%d bytes).", name.s(), client_id, event_name.s(),
00162 w->_data.length()) + log_base::platform_ending();
00163 events_printed++;
00164 }
00165 }
00166 return events_printed;
00167 }
00168
00169 bool zing_table::pop_event(int client_id, int &event, byte_array &data,
00170 int_set &targets)
00171 {
00172 FUNCDEF("pop_event");
00173 event = 0;
00174 data = NIL;
00175
00176 auto_synchronizer l(*_lock);
00177 queued_zings *cab = _clients->find(client_id);
00178 if (!cab) return false;
00179
00180 if (!cab->elements()) return false;
00181 event_record *w = cab->acquire(0);
00182 cab->zap(0, 0);
00183 if (w) {
00184 event = w->_event;
00185 data = w->_data;
00186 targets = w->_targets;
00187
00188 int size_guess = sizeof(event_record) + w->_data.length()
00189 + 10 * sizeof(int) + (w->_targets.length() + 1) * sizeof(int);
00190 if (!_limiter->record_deletion(client_id, size_guess)) {
00191 LOG("logic error in zing size assessment.");
00192 }
00193
00194 return true;
00195 }
00196 return false;
00197 }
00198
00199 bool zing_table::peek_event(int client_id, int &event, int &data_length,
00200 int_set &targets) const
00201 {
00202 event = 0;
00203 data_length = 0;
00204
00205 auto_synchronizer l(*_lock);
00206 queued_zings *cab = _clients->find(client_id);
00207 if (!cab) return false;
00208
00209 const event_record *w = cab->borrow(0);
00210 if (w) {
00211 event = w->_event;
00212 data_length = w->_data.length();
00213 targets = w->_targets;
00214 return true;
00215 }
00216 return false;
00217 }
00218
00219 void zing_table::flush_events(int client_id)
00220 {
00221 auto_synchronizer l(*_lock);
00222 queued_zings *cab = _clients->find(client_id);
00223 if (!cab) return;
00224 cab->reset();
00225 }
00226
00227 bool zing_table::pop_events(int client_id, int &count, byte_array &event_data,
00228 int max_size)
00229 {
00230 FUNCDEF("pop_events");
00231 #ifdef DEBUG_ZINGABLE
00232 LOG(istring(istring::SPRINTF, "coming in with max len of %d.", max_size));
00233 #endif
00234 count = 0;
00235
00236
00237
00238 queued_zings data_holder;
00239 {
00240 auto_synchronizer l(*_lock);
00241 queued_zings *cab = _clients->find(client_id);
00242 if (!cab) return false;
00243 if (!cab->elements()) return false;
00244
00245 for (int i = 0; i < cab->elements(); i++) {
00246 data_holder.append(cab->acquire(i));
00247 }
00248 cab->reset();
00249 }
00250
00251 int holder = 0;
00252 int start_position = event_data.length();
00253 bool added_holder = false;
00254 int index;
00255 for (index = 0; index < data_holder.elements(); index++) {
00256 event_record *found = data_holder.borrow(index);
00257 if (!found) {
00258 data_holder.zap(index, index);
00259 index--;
00260 continue;
00261 }
00262
00263 int extra_add = added_holder? 0 : sizeof(int);
00264 if (max_size
00265 && (extra_add + event_data.length() + found->_data.length()
00266 > max_size))
00267 break;
00268 if (!added_holder) {
00269 attach(event_data, holder);
00270 added_holder = true;
00271 }
00272 count++;
00273 #ifdef DEBUG_ZINGABLE
00274 LOG(isprintf("for cli %d: adding event=%d cli=%d ev=%d len=%d.",
00275 client_id, count, found->_id, found->_event, found->_data.length()));
00276 #endif
00277 event_data += found->_data;
00278
00279
00280 int size_guess = sizeof(event_record) + found->_data.length()
00281 + 10 * sizeof(int) + (found->_targets.length() + 1) * sizeof(int);
00282 {
00283 auto_synchronizer l(*_lock);
00284 if (!_limiter->record_deletion(client_id, size_guess)) {
00285 LOG("logic error in zing size assessment.");
00286 }
00287 }
00288 }
00289 if (added_holder) {
00290
00291 byte_array to_replace_holder;
00292 attach(to_replace_holder, count);
00293 event_data.overwrite(start_position, to_replace_holder);
00294 }
00295
00296 if (index)
00297 data_holder.zap(0, index - 1);
00298
00299
00300 if (data_holder.elements()) {
00301 auto_synchronizer l(*_lock);
00302 queued_zings *cab = _clients->find(client_id);
00303 if (cab) {
00304
00305 cab->insert(0, data_holder.elements());
00306
00307 for (int i = 0; i < data_holder.elements(); i++) {
00308 cab->put(i, data_holder.acquire(i));
00309 }
00310 }
00311 data_holder.reset();
00312 }
00313
00314 return count > 0;
00315 }
00316
00317 bool zing_table::peek_events(int client_id, int &count,
00318 int &data_length) const
00319 {
00320 count = 0;
00321 data_length = 0;
00322
00323 auto_synchronizer l(*_lock);
00324 queued_zings *cab = _clients->find(client_id);
00325 if (!cab) return false;
00326
00327 if (!cab->elements()) return false;
00328 data_length = sizeof(int);
00329 for (int i = 0; i < cab->elements(); i++) {
00330 const event_record *found = cab->borrow(i);
00331 if (found) {
00332 count++;
00333 data_length += found->_data.length();
00334 }
00335 }
00336 return count > 0;
00337 }
00338
00339
00340 #endif //ZING_TABLE_IMPLEMENTATION_FILE
00341