zing_table.cpp

Go to the documentation of this file.
00001 #ifndef ZING_TABLE_IMPLEMENTATION_FILE
00002 #define ZING_TABLE_IMPLEMENTATION_FILE
00003 
00004 /*****************************************************************************\
00005 *                                                                             *
00006 *  Name   : zing_table                                                        *
00007 *  Author : Chris Koeritz                                                     *
00008 *                                                                             *
00009 *******************************************************************************
00010 * Copyright (c) 1996-$now By Author.  This program is free software; you can  *
00011 * redistribute it and/or modify it under the terms of the GNU General Public  *
00012 * License as published by the Free Software Foundation; either version 2 of   *
00013 * the License or (at your option) any later version.  This is online at:      *
00014 *     http://www.fsf.org/copyleft/gpl.html                                    *
00015 * Please send any updates to: fred@gruntose.com                               *
00016 \*****************************************************************************/
00017 
00018 #include "event_record.h"
00019 #include "zing_table.h"
00020 
00021 #include <basis/auto_synch.h>
00022 #include <basis/mutex.h>
00023 #include <data_struct/amorph.cpp>
00024 #include <data_struct/int_hash.cpp>
00025 #include <data_struct/memory_limiter.h>
00026 #include <mechanisms/heartbeat.h>
00027 #include <mechanisms/time_stamp.h>
00028 #include <opsystem/system_values.h>
00029 #include <textual/string_manipulation.h>
00030 
00031 using namespace basis;
00032 
00033 //#define DEBUG_ZING_TABLE
00034   // uncomment if you want noisy zinging.
00035 
00036 #undef LOG
00037 #define LOG(s) CLASS_EMERGENCY_LOG(program_wide_logger(), s)
00038 
00039 const int ZING_COMPLAINT_INTERVAL = 2 * MINUTE_ms;
00040   // the frequency at which we complain about overflows on zinging.
00041 
00043 
00044 class queued_zings : public amorph<event_record> {};
00045 
00047 
00048 class zing_queue_amorph : public int_hash<queued_zings>
00049 {
00050 public:
00051   zing_queue_amorph() : int_hash<queued_zings>(10) {}
00052     // 1024 has slots.
00053 //hmmm: is that too many?
00054 };
00055 
00057 
00058 zing_table::zing_table(int overall_memory, int per_user_memory)
00059 : _clients(new zing_queue_amorph),
00060   _lock(new mutex),
00061   _limiter(new memory_limiter(overall_memory, per_user_memory))
00062 {}
00063 
00064 zing_table::~zing_table()
00065 {
00066   WHACK(_lock);
00067   WHACK(_clients);
00068   WHACK(_limiter);
00069 }
00070 
00071 bool zing_table::add_client(int new_id)
00072 {
00073   auto_synchronizer l(*_lock);
00074   queued_zings *cli = _clients->find(new_id);
00075   if (cli) return false;  // already exists.
00076   queued_zings *new_entry = new queued_zings;
00077   _clients->add(new_id, new_entry);
00078 //check result if it matters whether this was new or existing.
00079   return true;
00080 }
00081 
00082 bool zing_table::remove_client(int outgoing_id)
00083 {
00084   auto_synchronizer l(*_lock);
00085   queued_zings *cli = _clients->find(outgoing_id);
00086   if (!cli) return false;  // not found for that id.
00087   _clients->zap(outgoing_id);
00088   return true;
00089 }
00090 
00091 void zing_table::zing_event(int client_id, int event, const byte_array &data)
00092 {
00093   zing_event(client_id, event, data, int_set());
00094 }
00095 
00096 void zing_table::zing_event(int client_id, int event, const byte_array &data,
00097     const int_set &targets)
00098 {
00099   FUNCDEF("zing_event");
00100   auto_synchronizer l(*_lock);
00101 
00102 /*
00103 {
00104 istring targs;
00105 for (int n = 0; n < targets.elements(); n++)
00106 targs += isprintf("%d ", targets[n]);
00107 if (targs.t()) LOG(istring("targets passed in: ") + targs);
00108 }
00109 */
00110 
00111   // we always repack the data in the event structure because that's the
00112   // known protocol.  no one should be pre-packing these structures.
00113   event_record to_add(client_id, event, data, targets);
00114   byte_array packed_blob;
00115   to_add.pack(packed_blob);
00116 
00117   int size_guess = sizeof(event_record) + packed_blob.length()
00118       + 10 * sizeof(int) + (targets.length() + 1) * sizeof(int);
00119   if (!_limiter->okay_allocation(client_id, size_guess)) {
00120     // complain that we're not allowing this, but not too frequently.
00121 static time_stamp last_complaint(-ZING_COMPLAINT_INTERVAL * 2);
00122 //move to object header; safe for now since we're synched.
00123     if (time_stamp().value() - last_complaint.value()
00124         > ZING_COMPLAINT_INTERVAL) {
00125       LOG(isprintf("data overflow on client %d!", client_id));
00126       last_complaint = time_stamp();
00127     }
00128     return;
00129   }
00130 
00131   queued_zings *cab = _clients->find(client_id);
00132   if (!cab) return;  // not found for that id.
00133 
00134   event_record *to_store
00135       = new event_record(client_id, event, packed_blob, targets);
00136   cab->append(to_store);
00137 }
00138 
00139 int zing_table::show_events(int client_id, const istring &name,
00140     istring &output, int indentation)
00141 {
00142   istring indent = string_manipulation::indentation(indentation);
00143   int events_printed = 0;
00144 
00145   auto_synchronizer l(*_lock);
00146   queued_zings *cab = _clients->find(client_id);
00147   if (!cab) return 0;  // not found for that id.
00148 
00149   system_values event_namer(system_values::EVENT_VALUES());
00150   for (int i = 0; i < cab->elements(); i++) {
00151     event_record *w = cab->borrow(i);
00152     if (w) {
00153       istring event_name;
00154       istring descrip;
00155       istring file_loc;
00156       event_namer.lookup(w->_event, event_name, descrip, file_loc);
00157       // finds a name for the event, if possible.
00158       if (!event_name)
00159         event_name = isprintf("%d", w->_event);
00160       output += indent + isprintf("%s with id %d has an event \"%s\" "
00161           "listed (%d bytes).", name.s(), client_id, event_name.s(),
00162           w->_data.length()) + log_base::platform_ending();
00163       events_printed++;
00164     }
00165   }
00166   return events_printed;
00167 }
00168 
00169 bool zing_table::pop_event(int client_id, int &event, byte_array &data,
00170     int_set &targets)
00171 {
00172   FUNCDEF("pop_event");
00173   event = 0;
00174   data = NIL;
00175 
00176   auto_synchronizer l(*_lock);
00177   queued_zings *cab = _clients->find(client_id);
00178   if (!cab) return false;  // not found for that id.
00179 
00180   if (!cab->elements()) return false;
00181   event_record *w = cab->acquire(0);
00182   cab->zap(0, 0);
00183   if (w) {
00184     event = w->_event;
00185     data = w->_data;
00186     targets = w->_targets;
00187 
00188     int size_guess = sizeof(event_record) + w->_data.length()
00189         + 10 * sizeof(int) + (w->_targets.length() + 1) * sizeof(int);
00190     if (!_limiter->record_deletion(client_id, size_guess)) {
00191       LOG("logic error in zing size assessment.");
00192     }
00193 
00194     return true;
00195   }
00196   return false;
00197 }
00198 
00199 bool zing_table::peek_event(int client_id, int &event, int &data_length,
00200     int_set &targets) const
00201 {
00202   event = 0;
00203   data_length = 0;
00204 
00205   auto_synchronizer l(*_lock);
00206   queued_zings *cab = _clients->find(client_id);
00207   if (!cab) return false;  // not found for that id.
00208 
00209   const event_record *w = cab->borrow(0);
00210   if (w) {
00211     event = w->_event;
00212     data_length = w->_data.length();
00213     targets = w->_targets;
00214     return true;
00215   }
00216   return false;
00217 }
00218 
00219 void zing_table::flush_events(int client_id)
00220 {
00221   auto_synchronizer l(*_lock);
00222   queued_zings *cab = _clients->find(client_id);
00223   if (!cab) return;  // not found for that id.
00224   cab->reset();
00225 }
00226 
00227 bool zing_table::pop_events(int client_id, int &count, byte_array &event_data,
00228     int max_size)
00229 {
00230   FUNCDEF("pop_events");
00231 #ifdef DEBUG_ZINGABLE
00232   LOG(istring(istring::SPRINTF, "coming in with max len of %d.", max_size));
00233 #endif
00234   count = 0;
00235 
00236 //hmmm, what about grabbing all of it now?
00237 
00238   queued_zings data_holder;
00239   {
00240     auto_synchronizer l(*_lock);
00241     queued_zings *cab = _clients->find(client_id);
00242     if (!cab) return false;  // not found for that id.
00243     if (!cab->elements()) return false;  // nothing waiting.
00244     // move all pending items into the holder.
00245     for (int i = 0; i < cab->elements(); i++) {
00246       data_holder.append(cab->acquire(i));
00247     }
00248     cab->reset();  // dump all index storage.
00249   }
00250 
00251   int holder = 0;
00252   int start_position = event_data.length();  // where we're adding.
00253   bool added_holder = false;  // did we add the holder yet?
00254   int index;  // track the position so we can clear what we consumed.
00255   for (index = 0; index < data_holder.elements(); index++) {
00256     event_record *found = data_holder.borrow(index);
00257     if (!found) {
00258       data_holder.zap(index, index);
00259       index--;  // skip back for the missing one.
00260       continue;
00261     }
00262     // if we didn't attach the holder yet, we need to consider that we will.
00263     int extra_add = added_holder? 0 : sizeof(int);
00264     if (max_size
00265         && (extra_add + event_data.length() + found->_data.length()
00266             > max_size))
00267       break;
00268     if (!added_holder) {
00269       attach(event_data, holder);
00270       added_holder = true;
00271     }
00272     count++;
00273 #ifdef DEBUG_ZINGABLE
00274     LOG(isprintf("for cli %d: adding event=%d cli=%d ev=%d len=%d.",
00275         client_id, count, found->_id, found->_event, found->_data.length()));
00276 #endif
00277     event_data += found->_data;
00278 
00279 //hmmm: move this into the ipc event structure object!
00280     int size_guess = sizeof(event_record) + found->_data.length()
00281         + 10 * sizeof(int) + (found->_targets.length() + 1) * sizeof(int);
00282     {
00283       auto_synchronizer l(*_lock);
00284       if (!_limiter->record_deletion(client_id, size_guess)) {
00285         LOG("logic error in zing size assessment.");
00286       }
00287     }
00288   }
00289   if (added_holder) {
00290     // store the number of events over the placeholder, if one's in there.
00291     byte_array to_replace_holder;
00292     attach(to_replace_holder, count);
00293     event_data.overwrite(start_position, to_replace_holder);
00294   }
00295   // if we chowed anything, remove it from the list.
00296   if (index)
00297     data_holder.zap(0, index - 1);
00298 
00299   // if we didn't clear the list, then we need to put the extras back.
00300   if (data_holder.elements()) {
00301     auto_synchronizer l(*_lock);
00302     queued_zings *cab = _clients->find(client_id);
00303     if (cab) {
00304       // move all remaining pending items back into the cabinet.
00305       cab->insert(0, data_holder.elements());
00306         // add space for these items.
00307       for (int i = 0; i < data_holder.elements(); i++) {
00308         cab->put(i, data_holder.acquire(i));
00309       }
00310     }
00311     data_holder.reset();  // clear for the heck of it.
00312   }
00313 
00314   return count > 0;
00315 }
00316 
00317 bool zing_table::peek_events(int client_id, int &count,
00318     int &data_length) const
00319 {
00320   count = 0;
00321   data_length = 0;
00322 
00323   auto_synchronizer l(*_lock);
00324   queued_zings *cab = _clients->find(client_id);
00325   if (!cab) return false;  // not found for that id.
00326 
00327   if (!cab->elements()) return false;  // nothing waiting.
00328   data_length = sizeof(int);  // initially just the count size.
00329   for (int i = 0; i < cab->elements(); i++) {
00330     const event_record *found = cab->borrow(i);
00331     if (found) {
00332       count++;
00333       data_length += found->_data.length();
00334     }
00335   }
00336   return count > 0;
00337 }
00338 
00339 
00340 #endif //ZING_TABLE_IMPLEMENTATION_FILE
00341 

Generated on Fri Nov 28 04:28:55 2008 for HOOPLE Libraries by  doxygen 1.5.1