entity_data_bin.cpp

Go to the documentation of this file.
00001 #ifndef ENTITY_DATA_BIN_IMPLEMENTATION_FILE
00002 #define ENTITY_DATA_BIN_IMPLEMENTATION_FILE
00003 
00004 /*****************************************************************************\
00005 *                                                                             *
00006 *  Name   : entity_data_bin                                                   *
00007 *  Author : Chris Koeritz                                                     *
00008 *                                                                             *
00009 *******************************************************************************
00010 * Copyright (c) 2002-$now By Author.  This program is free software; you can  *
00011 * redistribute it and/or modify it under the terms of the GNU General Public  *
00012 * License as published by the Free Software Foundation; either version 2 of   *
00013 * the License or (at your option) any later version.  This is online at:      *
00014 *     http://www.fsf.org/copyleft/gpl.html                                    *
00015 * Please send any updates to: fred@gruntose.com                               *
00016 \*****************************************************************************/
00017 
00018 #include "entity_data_bin.h"
00019 #include "entity_defs.h"
00020 #include "infoton.h"
00021 #include "tentacle.h"
00022 
00023 #include <basis/istring.h>
00024 #include <basis/log_base.h>
00025 #include <basis/mutex.h>
00026 #include <data_struct/amorph.cpp>
00027 #include <data_struct/string_hash.cpp>
00028 #include <mechanisms/time_stamp.h>
00029 
00030 //#define DEBUG_ENTITY_DATA_BIN
00031   // uncomment for more debugging information.
00032 
00033 #undef GRAB_LOCK
00034 #define GRAB_LOCK \
00035   auto_synchronizer l(*_ent_lock)
00036 
00037 #undef LOG
00038 #define LOG(s) CLASS_EMERGENCY_LOG(program_wide_logger(), s)
00039 
00040 const int OCTOPUS_TABLE_BITS = 6;
00041   // the hash table for items will have 2^N entries.
00042 
00043 //hmmm: parameterize in class interface?
00044 const int DATA_DECAY_INTERVAL = 4 * MINUTE_ms;
00045   // if we haven't gotten a data item out to its entity in this long, then
00046   // we assume the entity has croaked or doesn't want its data.
00047 
00049 
00050 class infoton_holder
00051 {
00052 public:
00053   infoton *_item;      // the data making up the production.
00054   octopus_request_id _id;  // the id, if any, of the original request.
00055   time_stamp _when_added;  // when the data became available.
00056 
00057   infoton_holder(const octopus_request_id &id = octopus_request_id(),
00058       infoton *item = NIL)
00059   : _item(item), _id(id), _when_added() {}
00060 
00061   ~infoton_holder() { WHACK(_item); }
00062 
00063   istring text_form() const {
00064     return istring("id=") + _id.text_form() + ", added="
00065         + _when_added.text_form() + ", item="
00066         + _item->classifier().text_form() + ", data="
00067         + _item->text_form();
00068   }
00069 };
00070 
00072 
00073 class entity_basket : public amorph<infoton_holder>
00074 {
00075 public:
00076   time_stamp _last_active;
00077 
00078   istring text_form() const {
00079     istring to_return;
00080     for (int i = 0; i < elements(); i++)
00081       to_return += get(i)->text_form() + log_base::platform_ending();
00082     return to_return;
00083   }
00084 };
00085 
00087 
00088 class entity_hasher : public hashing_algorithm
00089 {
00090 public:
00091   virtual hashing_algorithm *clone() const { return new entity_hasher; }
00092 
00093   virtual u_int hash(const void *key_data, int formal(key_length)) const {
00094     octopus_entity *key = (octopus_entity *)key_data;
00095     // jiggle the pieces of the id into a number.
00096     return u_int(
00097         key->process_id()
00098         + (key->add_in() << 10)
00099         + (key->sequencer() << 14)
00100         + (key->hostname()[0] << 20)
00101         + (key->hostname()[1] << 24) );
00102   }
00103 };
00104 
00106 
00107 class entity_item_hash
00108 : public hash_table<octopus_entity, entity_basket>
00109 {
00110 public:
00111   entity_item_hash(const entity_hasher &hash)
00112   : hash_table<octopus_entity, entity_basket>(hash, OCTOPUS_TABLE_BITS)
00113   {}
00114 };
00115 
00117 
00118 class basketcase : public basis::set<octopus_entity>
00119 {
00120 public:
00121 };
00122 
00124 
00125 // used for our apply methods for communicating back to the caller.
00126 struct apply_struct
00127 {
00128   basketcase *_empty_baskets;
00129   entity_basket *_any_item;
00130   int &_items_held;  // hooks to parent's item count.
00131 
00132   apply_struct(int &items_held)
00133       : _empty_baskets(NIL), _any_item(NIL), _items_held(items_held) {}
00134 };
00135 
00137 
00138 entity_data_bin::entity_data_bin(int max_size_per_entity)
00139 : _table(new entity_item_hash(entity_hasher())),
00140   _ent_lock(new mutex),
00141   _action_count(0),
00142   _max_per_ent(max_size_per_entity),
00143   _items_held(0)
00144 {}
00145 
00146 entity_data_bin::~entity_data_bin()
00147 {
00148   WHACK(_table);
00149   WHACK(_ent_lock);
00150 }
00151 
00152 int entity_data_bin::entities() const
00153 {
00154   GRAB_LOCK;
00155   return _table->elements();
00156 }
00157 
00158 struct text_form_accumulator { istring _accum; };
00159 
00160 bool text_form_applier(const octopus_entity &formal(key), entity_basket &bask,
00161     void *data_link)
00162 {
00163   text_form_accumulator *shuttle = (text_form_accumulator *)data_link;
00164   shuttle->_accum += bask.text_form();
00165   return true;
00166 }
00167 
00168 istring entity_data_bin::text_form() const
00169 {
00170   GRAB_LOCK;
00171   text_form_accumulator shuttle;
00172   _table->apply(text_form_applier, &shuttle);
00173   return shuttle._accum;
00174 }
00175 
00176 bool scramble_applier(const octopus_entity &formal(key), entity_basket &bask,
00177     void *data_link)
00178 {
00179   #undef static_class_name
00180   #define static_class_name() "entity_data_bin"
00181   FUNCDEF("scramble_applier");
00182   int *county = (int *)data_link;
00183   *county += bask.elements();
00184   return true;
00185   #undef static_class_name
00186 }
00187 
00188 // this could be extended to do more interesting checks also; currently it's
00189 // just like the entities() method really.
00190 int entity_data_bin::scramble_counter()
00191 {
00192   int count = 0;
00193   _table->apply(scramble_applier, &count);
00194   return count;
00195 }
00196 
00197 #ifdef DEBUG_ENTITY_DATA_BIN
00198   #define DUMP_STATE \
00199     if ( !(_action_count++ % 100) ) { \
00200       int items = scramble_counter(); \
00201       LOG(isprintf("-> %d items counted.", items)); \
00202     }
00203 #else
00204   #define DUMP_STATE
00205 #endif
00206 
00207 bool entity_data_bin::add_item(infoton *to_add,
00208     const octopus_request_id &orig_id)
00209 {
00210   FUNCDEF("add_item");
00211   GRAB_LOCK;
00212   // create a record to add to the appropriate bin.
00213   infoton_holder *holder = new infoton_holder(orig_id, to_add);
00214 
00215   // see if a basket already exists for the entity.
00216   entity_basket *bask = _table->find(orig_id._entity);
00217   if (!bask) {
00218     // this entity doesn't have a basket so add one.
00219     bask = new entity_basket;
00220     _table->add(orig_id._entity, bask);
00221   }
00222 
00223   bask->_last_active = time_stamp();  // reset activity time.
00224 
00225   // count up the current amount of data in use.
00226   int current_size = 0;
00227   for (int i = 0; i < bask->elements(); i++)
00228     current_size += bask->borrow(i)->_item->packed_size();
00229 
00230   if (current_size + to_add->packed_size() > _max_per_ent) {
00231     WHACK(holder);
00232     return false;
00233   }
00234   
00235   // append the latest production to the list.
00236   bask->append(holder);
00237   _items_held++;
00238   return true;
00239 }
00240 
00241 bool any_item_applier(const octopus_entity &formal(key), entity_basket &bask,
00242     void *data_link)
00243 {
00244   #define static_class_name() "entity_data_bin"
00245   FUNCDEF("any_item_applier");
00246   apply_struct *apple = (apply_struct *)data_link;
00247   // check the basket to see if it has any items.
00248   if (!bask.elements()) {
00249 #ifdef DEBUG_ENTITY_DATA_BIN
00250     LOG(istring("saw empty basket ") + key.mangled_form());
00251 #endif
00252     return true;  // continue iterating.
00253   }
00254   apple->_any_item = &bask;
00255   return false;  // stop iteration.
00256   #undef static_class_name
00257 }
00258 
00259 infoton *entity_data_bin::acquire_for_any(octopus_request_id &id)
00260 {
00261   FUNCDEF("acquire_for_any");
00262   GRAB_LOCK;
00263   apply_struct apple(_items_held);
00264   _table->apply(any_item_applier, &apple);
00265   if (!apple._any_item) return NIL;
00266   DUMP_STATE;
00267   // retrieve the information from our basket that was provided.
00268   infoton_holder *found = apple._any_item->acquire(0);
00269   apple._any_item->zap(0, 0);
00270   if (!apple._any_item->elements()) {
00271     // toss this empty basket.
00272 #ifdef DEBUG_ENTITY_DATA_BIN
00273     LOG(istring("tossing empty basket ") + found->_id._entity.mangled_form());
00274 #endif
00275     _table->zap(found->_id._entity);
00276   }
00277   apple._any_item = NIL;
00278   infoton *to_return = found->_item;
00279   id = found->_id;
00280   found->_item = NIL;  // clear so it won't be whacked.
00281   WHACK(found);
00282   _items_held--;
00283 //#ifdef DEBUG_ENTITY_DATA_BIN
00284   if (_items_held < 0)
00285     LOG("logic error: number of items went below zero.");
00286 //#endif
00287   return to_return;
00288 }
00289 
00290 int entity_data_bin::acquire_for_entity(const octopus_entity &requester,
00291     infoton_list &items, int maximum_size)
00292 {
00293   FUNCDEF("acquire_for_entity [multiple]");
00294   items.reset();
00295   if (maximum_size <= 0) maximum_size = 20 * KILOBYTE;
00296     // pick a reasonable default.
00297   octopus_request_id id;
00298   int items_found = 0;
00299   while (maximum_size > 0) {
00300     infoton *inf = acquire_for_entity(requester, id);
00301     if (!inf)
00302       break;  // none left.
00303     items.append(new infoton_id_pair(inf, id));    
00304     maximum_size -= inf->packed_size();
00305     items_found++;
00306   }
00307   return items_found;
00308 }
00309 
00310 infoton *entity_data_bin::acquire_for_entity(const octopus_entity &requester,
00311     octopus_request_id &id)
00312 {
00313   FUNCDEF("acquire_for_entity [single]");
00314   id = octopus_request_id();  // reset it.
00315   GRAB_LOCK;
00316   infoton *to_return = NIL;
00317   entity_basket *bask = _table->find(requester);
00318   if (!bask) {
00319     return NIL;
00320   }
00321   if (!bask->elements()) {
00322 #ifdef DEBUG_ENTITY_DATA_BIN
00323     LOG(istring("tossing empty basket ") + requester.mangled_form());
00324 #endif
00325     _table->zap(requester);
00326     return NIL;
00327   }
00328   DUMP_STATE;
00329   id = bask->get(0)->_id;
00330   to_return = bask->borrow(0)->_item;
00331   bask->borrow(0)->_item = NIL;
00332   bask->zap(0, 0);
00333   if (!bask->elements()) {
00334 #ifdef DEBUG_ENTITY_DATA_BIN
00335     LOG(istring("tossing empty basket ") + requester.mangled_form());
00336 #endif
00337     _table->zap(requester);
00338   }
00339   _items_held--;
00340 //#ifdef DEBUG_ENTITY_DATA_BIN
00341   if (_items_held < 0)
00342     LOG("logic error: number of items went below zero.");
00343 //#endif
00344   return to_return;
00345 }
00346 
00347 infoton *entity_data_bin::acquire_for_identifier(const octopus_request_id &id)
00348 {
00349   FUNCDEF("acquire_for_identifier");
00350   infoton *to_return = NIL;
00351   GRAB_LOCK;
00352   entity_basket *bask = _table->find(id._entity);
00353   if (!bask) return NIL;
00354   if (!bask->elements()) {
00355 #ifdef DEBUG_ENTITY_DATA_BIN
00356     LOG(istring("tossing empty basket ") + id._entity.mangled_form());
00357 #endif
00358     _table->zap(id._entity);
00359     return NIL;
00360   }
00361   for (int i = 0; i < bask->elements(); i++) {
00362     if (bask->get(i)->_id == id) {
00363       to_return = bask->borrow(i)->_item;  // snag the item.
00364       bask->borrow(i)->_item = NIL;  // clear the list's version out.
00365       bask->zap(i, i);  // whack the sanitized element.
00366       DUMP_STATE;
00367       if (!bask->elements()) {
00368 #ifdef DEBUG_ENTITY_DATA_BIN
00369         LOG(istring("tossing empty basket ") + id._entity.mangled_form());
00370 #endif
00371         _table->zap(id._entity);
00372       }
00373       _items_held--;
00374 //#ifdef DEBUG_ENTITY_DATA_BIN
00375       if (_items_held < 0)
00376         LOG("logic error: number of items went below zero.");
00377 //#endif
00378       return to_return;
00379     }
00380   }
00381   return NIL;
00382 }
00383 
00384 bool cleaning_applier(const octopus_entity &key, entity_basket &bask,
00385     void *data_link)
00386 {
00387   #define static_class_name() "entity_data_bin"
00388   FUNCDEF("cleaning_applier");
00389   apply_struct *apple = (apply_struct *)data_link;
00390   time_stamp expiration_time(-DATA_DECAY_INTERVAL);
00391 
00392   int whack_count = 0;
00393   for (int i = 0; i < bask.elements(); i++) {
00394     infoton_holder &rec = *bask.borrow(i);
00395     if (rec._when_added <= expiration_time) {
00396       // if a requester hasn't picked this up in N seconds, then drop it.
00397 #ifdef DEBUG_ENTITY_DATA_BIN
00398       LOG(istring("whacking old item ") + rec._id.text_form());
00399 #endif
00400       whack_count++;
00401       apple->_items_held--;
00402 //#ifdef DEBUG_ENTITY_DATA_BIN
00403       if (apple->_items_held < 0)
00404         LOG("logic error: number of items went below zero.");
00405 //#endif
00406       bask.zap(i, i);
00407       i--;  // skip back before the delete.
00408     } else {
00409       // NOTE: this break is based on an assumption about the storage of
00410       // items; if it's ever the case in the future that items can be
00411       // disordered on time of arrival in the queue, then the break should
00412       // be removed.
00413       break;
00414     }
00415   }
00416 //#ifdef DEBUG_ENTITY_DATA_BIN
00417   if (whack_count)
00418     LOG(isprintf("==> whacked %d old items.", whack_count));
00419 //#endif
00420   if (!bask.elements()) {
00421     // if the basket has nothing left in it then we signal the parent that
00422     // it can be deleted.
00423     *apple->_empty_baskets += key;
00424   }
00425 
00426   // keep iterating on items unless we know it's time to go.
00427   return true;
00428   #undef static_class_name
00429 }
00430 
00431 void entity_data_bin::clean_out_deadwood()
00432 {
00433   FUNCDEF("clean_out_deadwood");
00434   GRAB_LOCK;
00435   // check that no items have timed out.
00436   apply_struct apple(_items_held);
00437   basketcase empty_baskets;
00438   apple._empty_baskets = &empty_baskets;
00439   _table->apply(cleaning_applier, &apple);
00440 
00441   // clean up any entities whose baskets are empty.
00442   for (int i = empty_baskets.length() - 1; i >= 0; i--) {
00443 #ifdef DEBUG_ENTITY_DATA_BIN
00444      LOG(istring("removing basket ") + empty_baskets.get(i).mangled_form());
00445 #endif
00446     _table->zap(empty_baskets.get(i));
00447     empty_baskets.zap(i, i);
00448     // we don't skip back since we're scanning the array from its end.
00449   }
00450 }
00451 
00452 bool entity_data_bin::get_sizes(const octopus_entity &id, int &items,
00453     int &bytes)
00454 {
00455   FUNCDEF("get_sizes");
00456   items = 0;
00457   bytes = 0;
00458   GRAB_LOCK;
00459   entity_basket *bask = _table->find(id);
00460   if (!bask || !bask->elements()) return false;
00461   items = bask->elements();
00462   for (int i = 0; i < bask->elements(); i++)
00463     bytes += bask->borrow(i)->_item->packed_size();
00464   return true;
00465 }
00466 
00467 
00468 #endif //ENTITY_DATA_BIN_IMPLEMENTATION_FILE
00469 

Generated on Thu Nov 20 04:29:01 2008 for HOOPLE Libraries by  doxygen 1.5.1