entity_data_bin.cpp

Go to the documentation of this file.
00001 /*****************************************************************************\
00002 *                                                                             *
00003 *  Name   : entity_data_bin                                                   *
00004 *  Author : Chris Koeritz                                                     *
00005 *                                                                             *
00006 *******************************************************************************
00007 * Copyright (c) 2002-$now By Author.  This program is free software; you can  *
00008 * redistribute it and/or modify it under the terms of the GNU General Public  *
00009 * License as published by the Free Software Foundation; either version 2 of   *
00010 * the License or (at your option) any later version.  This is online at:      *
00011 *     http://www.fsf.org/copyleft/gpl.html                                    *
00012 * Please send any updates to: fred@gruntose.com                               *
00013 \*****************************************************************************/
00014 
00015 #include "entity_data_bin.h"
00016 #include "entity_defs.h"
00017 #include "infoton.h"
00018 #include "tentacle.h"
00019 
00020 #include <basis/astring.h>
00021 
00022 #include <basis/mutex.h>
00023 #include <loggers/program_wide_logger.h>
00024 #include <structures/set.h>
00025 #include <structures/string_array.h>
00026 #include <structures/amorph.h>
00027 #include <structures/string_hash.h>
00028 #include <textual/parser_bits.h>
00029 #include <timely/time_stamp.h>
00030 
00031 using namespace basis;
00032 using namespace loggers;
00033 using namespace structures;
00034 using namespace textual;
00035 using namespace timely;
00036 
00037 namespace octopi {
00038 
00039 //#define DEBUG_ENTITY_DATA_BIN
00040   // uncomment for more debugging information.
00041 
00042 #undef GRAB_LOCK
00043 #define GRAB_LOCK \
00044   auto_synchronizer l(*_ent_lock)
00045 
00046 #undef LOG
00047 #define LOG(s) CLASS_EMERGENCY_LOG(program_wide_logger::get(), s)
00048 
00049 const int OCTOPUS_TABLE_BITS = 6;
00050   // the hash table for items will have 2^N entries.
00051 
00052 //hmmm: parameterize in class interface?
00054   // if we haven't gotten a data item out to its entity in this long, then
00055   // we assume the entity has croaked or doesn't want its data.
00056 
00058 
00059 class infoton_holder
00060 {
00061 public:
00062   infoton *_item;      // the data making up the production.
00063   octopus_request_id _id;  // the id, if any, of the original request.
00064   time_stamp _when_added;  // when the data became available.
00065 
00066   infoton_holder(const octopus_request_id &id = octopus_request_id(),
00067       infoton *item = NIL)
00068   : _item(item), _id(id), _when_added() {}
00069 
00070   ~infoton_holder() { WHACK(_item); }
00071 
00072   astring text_form() const {
00073     return astring("id=") + _id.text_form() + ", added="
00074         + _when_added.text_form() + ", item="
00075         + _item->classifier().text_form() + ", data="
00076         + _item->text_form();
00077   }
00078 };
00079 
00081 
00082 class entity_basket : public amorph<infoton_holder>
00083 {
00084 public:
00085   time_stamp _last_active;
00086 
00087   astring text_form() const {
00088     astring to_return;
00089     for (int i = 0; i < elements(); i++)
00090       to_return += get(i)->text_form() + parser_bits::platform_eol_to_chars();
00091     return to_return;
00092   }
00093 };
00094 
00096 
00097 class entity_hasher : public hashing_algorithm
00098 {
00099 public:
00100   virtual hashing_algorithm *clone() const { return new entity_hasher; }
00101 
00102   virtual basis::un_int hash(const void *key_data, int formal(key_length)) const {
00103     octopus_entity *key = (octopus_entity *)key_data;
00104     // jiggle the pieces of the id into a number.
00105     return basis::un_int(
00106         key->process_id()
00107         + (key->add_in() << 10)
00108         + (key->sequencer() << 14)
00109         + (key->hostname()[0] << 20)
00110         + (key->hostname()[1] << 24) );
00111   }
00112 };
00113 
00115 
00116 class entity_item_hash
00117 : public hash_table<octopus_entity, entity_basket>
00118 {
00119 public:
00120   entity_item_hash(const entity_hasher &hash)
00121   : hash_table<octopus_entity, entity_basket>(hash, OCTOPUS_TABLE_BITS)
00122   {}
00123 };
00124 
00126 
00127 class basketcase : public structures::set<octopus_entity>
00128 {
00129 public:
00130 };
00131 
00133 
00134 // used for our apply methods for communicating back to the caller.
00135 struct apply_struct
00136 {
00137   basketcase *_empty_baskets;
00138   entity_basket *_any_item;
00139   int &_items_held;  // hooks to parent's item count.
00140   int _decay_interval;  // how long are items allowed to live?
00141 
00142   apply_struct(int &items_held)
00143       : _empty_baskets(NIL), _any_item(NIL), _items_held(items_held),
00144         _decay_interval(0) {}
00145 };
00146 
00148 
00149 entity_data_bin::entity_data_bin(int max_size_per_entity)
00150 : _table(new entity_item_hash(entity_hasher())),
00151   _ent_lock(new mutex),
00152   _action_count(0),
00153   _max_per_ent(max_size_per_entity),
00154   _items_held(0)
00155 {}
00156 
00157 entity_data_bin::~entity_data_bin()
00158 {
00159   WHACK(_table);
00160   WHACK(_ent_lock);
00161 }
00162 
00163 int entity_data_bin::entities() const
00164 {
00165   GRAB_LOCK;
00166   return _table->elements();
00167 }
00168 
00169 struct text_form_accumulator { astring _accum; };
00170 
00171 bool text_form_applier(const octopus_entity &formal(key), entity_basket &bask,
00172     void *data_link)
00173 {
00174   text_form_accumulator *shuttle = (text_form_accumulator *)data_link;
00175   shuttle->_accum += bask.text_form();
00176   return true;
00177 }
00178 
00179 astring entity_data_bin::text_form() const
00180 {
00181   GRAB_LOCK;
00182   text_form_accumulator shuttle;
00183   _table->apply(text_form_applier, &shuttle);
00184   return shuttle._accum;
00185 }
00186 
00187 bool scramble_applier(const octopus_entity &formal(key), entity_basket &bask,
00188     void *data_link)
00189 {
00190   #undef static_class_name
00191   #define static_class_name() "entity_data_bin"
00192 //  FUNCDEF("scramble_applier");
00193   int *county = (int *)data_link;
00194   *county += bask.elements();
00195   return true;
00196   #undef static_class_name
00197 }
00198 
00199 // this could be extended to do more interesting checks also; currently it's
00200 // just like the entities() method really.
00201 int entity_data_bin::scramble_counter()
00202 {
00203   GRAB_LOCK;
00204   int count = 0;
00205   _table->apply(scramble_applier, &count);
00206   return count;
00207 }
00208 
00209 #ifdef DEBUG_ENTITY_DATA_BIN
00210   #define DUMP_STATE \
00211     if ( !(_action_count++ % 100) ) { \
00212       int items = scramble_counter(); \
00213       LOG(a_sprintf("-> %d items counted.", items)); \
00214     }
00215 #else
00216   #define DUMP_STATE
00217 #endif
00218 
00219 bool entity_data_bin::add_item(infoton *to_add,
00220     const octopus_request_id &orig_id)
00221 {
00222 //  FUNCDEF("add_item");
00223   GRAB_LOCK;
00224   // create a record to add to the appropriate bin.
00225   infoton_holder *holder = new infoton_holder(orig_id, to_add);
00226 
00227   // see if a basket already exists for the entity.
00228   entity_basket *bask = _table->find(orig_id._entity);
00229   if (!bask) {
00230     // this entity doesn't have a basket so add one.
00231     bask = new entity_basket;
00232     _table->add(orig_id._entity, bask);
00233   }
00234 
00235   bask->_last_active = time_stamp();  // reset activity time.
00236 
00237   // count up the current amount of data in use.
00238   int current_size = 0;
00239   for (int i = 0; i < bask->elements(); i++)
00240     current_size += bask->borrow(i)->_item->packed_size();
00241 
00242   if (current_size + to_add->packed_size() > _max_per_ent) {
00243     WHACK(holder);
00244     return false;
00245   }
00246   
00247   // append the latest production to the list.
00248   bask->append(holder);
00249   _items_held++;
00250   return true;
00251 }
00252 
00253 bool any_item_applier(const octopus_entity &formal(key), entity_basket &bask,
00254     void *data_link)
00255 {
00256 //#ifdef DEBUG_ENTITY_DATA_BIN
00257 //  #define static_class_name() "entity_data_bin"
00258 //  FUNCDEF("any_item_applier");
00259 //#endif
00260   apply_struct *apple = (apply_struct *)data_link;
00261   // check the basket to see if it has any items.
00262   if (!bask.elements()) {
00263 //#ifdef DEBUG_ENTITY_DATA_BIN
00264 //    LOG(astring("saw empty basket ") + key.mangled_form());
00265 //#endif
00266     return true;  // continue iterating.
00267   }
00268   apple->_any_item = &bask;
00269   return false;  // stop iteration.
00270   #undef static_class_name
00271 }
00272 
00273 infoton *entity_data_bin::acquire_for_any(octopus_request_id &id)
00274 {
00275   FUNCDEF("acquire_for_any");
00276   GRAB_LOCK;
00277   apply_struct apple(_items_held);
00278   _table->apply(any_item_applier, &apple);
00279   if (!apple._any_item) return NIL;
00280   DUMP_STATE;
00281   // retrieve the information from our basket that was provided.
00282   infoton_holder *found = apple._any_item->acquire(0);
00283   apple._any_item->zap(0, 0);
00284   if (!apple._any_item->elements()) {
00285     // toss this empty basket.
00286 #ifdef DEBUG_ENTITY_DATA_BIN
00287     LOG(astring("tossing empty basket ") + found->_id._entity.mangled_form());
00288 #endif
00289     _table->zap(found->_id._entity);
00290   }
00291   apple._any_item = NIL;
00292   infoton *to_return = found->_item;
00293   id = found->_id;
00294   found->_item = NIL;  // clear so it won't be whacked.
00295   WHACK(found);
00296   _items_held--;
00297 //#ifdef DEBUG_ENTITY_DATA_BIN
00298   if (_items_held < 0)
00299     LOG("logic error: number of items went below zero.");
00300 //#endif
00301   return to_return;
00302 }
00303 
00304 int entity_data_bin::acquire_for_entity(const octopus_entity &requester,
00305     infoton_list &items, int maximum_size)
00306 {
00307 //  FUNCDEF("acquire_for_entity [multiple]");
00308   // this method does not grab the lock because it simply composes other
00309   // class methods without interacting with class data members.
00310   items.reset();
00311   if (maximum_size <= 0) maximum_size = 20 * KILOBYTE;
00312     // pick a reasonable default.
00313   octopus_request_id id;
00314   int items_found = 0;
00315   while (maximum_size > 0) {
00316     infoton *inf = acquire_for_entity(requester, id);
00317     if (!inf)
00318       break;  // none left.
00319     items.append(new infoton_id_pair(inf, id));    
00320     maximum_size -= inf->packed_size();
00321     items_found++;
00322   }
00323   return items_found;
00324 }
00325 
00326 infoton *entity_data_bin::acquire_for_entity(const octopus_entity &requester,
00327     octopus_request_id &id)
00328 {
00329   FUNCDEF("acquire_for_entity [single]");
00330   id = octopus_request_id();  // reset it.
00331   GRAB_LOCK;
00332   infoton *to_return = NIL;
00333   entity_basket *bask = _table->find(requester);
00334   if (!bask) {
00335     return NIL;
00336   }
00337   if (!bask->elements()) {
00338 #ifdef DEBUG_ENTITY_DATA_BIN
00339     LOG(astring("tossing empty basket ") + requester.mangled_form());
00340 #endif
00341     _table->zap(requester);
00342     return NIL;
00343   }
00344   DUMP_STATE;
00345   id = bask->get(0)->_id;
00346   to_return = bask->borrow(0)->_item;
00347   bask->borrow(0)->_item = NIL;
00348   bask->zap(0, 0);
00349   if (!bask->elements()) {
00350 #ifdef DEBUG_ENTITY_DATA_BIN
00351     LOG(astring("tossing empty basket ") + requester.mangled_form());
00352 #endif
00353     _table->zap(requester);
00354   }
00355   _items_held--;
00356 //#ifdef DEBUG_ENTITY_DATA_BIN
00357   if (_items_held < 0)
00358     LOG("logic error: number of items went below zero.");
00359 //#endif
00360   return to_return;
00361 }
00362 
00363 infoton *entity_data_bin::acquire_for_identifier(const octopus_request_id &id)
00364 {
00365   FUNCDEF("acquire_for_identifier");
00366   infoton *to_return = NIL;
00367   GRAB_LOCK;
00368   entity_basket *bask = _table->find(id._entity);
00369   if (!bask) return NIL;
00370   if (!bask->elements()) {
00371 #ifdef DEBUG_ENTITY_DATA_BIN
00372     LOG(astring("tossing empty basket ") + id._entity.mangled_form());
00373 #endif
00374     _table->zap(id._entity);
00375     return NIL;
00376   }
00377   for (int i = 0; i < bask->elements(); i++) {
00378     if (bask->get(i)->_id == id) {
00379       to_return = bask->borrow(i)->_item;  // snag the item.
00380       bask->borrow(i)->_item = NIL;  // clear the list's version out.
00381       bask->zap(i, i);  // whack the sanitized element.
00382       DUMP_STATE;
00383       if (!bask->elements()) {
00384 #ifdef DEBUG_ENTITY_DATA_BIN
00385         LOG(astring("tossing empty basket ") + id._entity.mangled_form());
00386 #endif
00387         _table->zap(id._entity);
00388       }
00389       _items_held--;
00390 //#ifdef DEBUG_ENTITY_DATA_BIN
00391       if (_items_held < 0)
00392         LOG("logic error: number of items went below zero.");
00393 //#endif
00394       return to_return;
00395     }
00396   }
00397   return NIL;
00398 }
00399 
00400 bool cleaning_applier(const octopus_entity &key, entity_basket &bask,
00401     void *data_link)
00402 {
00403   #define static_class_name() "entity_data_bin"
00404   FUNCDEF("cleaning_applier");
00405   apply_struct *apple = (apply_struct *)data_link;
00406   time_stamp expiration_time(-apple->_decay_interval);
00407 
00408   int whack_count = 0;
00409   for (int i = 0; i < bask.elements(); i++) {
00410     infoton_holder &rec = *bask.borrow(i);
00411     if (rec._when_added <= expiration_time) {
00412       // if a requester hasn't picked this up in N seconds, then drop it.
00413 #ifdef DEBUG_ENTITY_DATA_BIN
00414       LOG(astring("whacking old item ") + rec._id.text_form());
00415 #endif
00416       whack_count++;
00417       apple->_items_held--;
00418 //#ifdef DEBUG_ENTITY_DATA_BIN
00419       if (apple->_items_held < 0)
00420         LOG("logic error: number of items went below zero.");
00421 //#endif
00422       bask.zap(i, i);
00423       i--;  // skip back before the delete.
00424     } else {
00425       // NOTE: this break is based on an assumption about the storage of
00426       // items; if it's ever the case in the future that items can be
00427       // disordered on time of arrival in the queue, then the break should
00428       // be removed.
00429       break;
00430     }
00431   }
00432 #ifdef DEBUG_ENTITY_DATA_BIN
00433   if (whack_count)
00434     LOG(a_sprintf("==> whacked %d old items.", whack_count));
00435 #endif
00436   if (!bask.elements()) {
00437     // if the basket has nothing left in it then we signal the parent that
00438     // it can be deleted.
00439 //LOG("adding to empty basket list.");
00440     *apple->_empty_baskets += key;
00441 //LOG("added to empty basket list.");
00442   }
00443 
00444   // keep iterating on items unless we know it's time to go.
00445   return true;
00446   #undef static_class_name
00447 }
00448 
00449 void entity_data_bin::clean_out_deadwood(int decay_interval)
00450 {
00451 #ifdef DEBUG_ENTITY_DATA_BIN
00452   FUNCDEF("clean_out_deadwood");
00453 #endif
00454   GRAB_LOCK;
00455   // check that no items have timed out.
00456   apply_struct apple(_items_held);
00457   basketcase empty_baskets;
00458   apple._empty_baskets = &empty_baskets;
00459   apple._decay_interval = decay_interval;
00460   _table->apply(cleaning_applier, &apple);
00461 
00462   // clean up any entities whose baskets are empty.
00463   for (int i = empty_baskets.length() - 1; i >= 0; i--) {
00464 #ifdef DEBUG_ENTITY_DATA_BIN
00465      LOG(astring("removing basket ") + empty_baskets.get(i).mangled_form());
00466 #endif
00467     _table->zap(empty_baskets.get(i));
00468     empty_baskets.zap(i, i);
00469     // we don't skip back since we're scanning the array from its end.
00470   }
00471 }
00472 
00473 bool entity_data_bin::get_sizes(const octopus_entity &id, int &items,
00474     int &bytes)
00475 {
00476 //  FUNCDEF("get_sizes");
00477   items = 0;
00478   bytes = 0;
00479   GRAB_LOCK;
00480   entity_basket *bask = _table->find(id);
00481   if (!bask || !bask->elements()) return false;
00482   items = bask->elements();
00483   for (int i = 0; i < bask->elements(); i++)
00484     bytes += bask->borrow(i)->_item->packed_size();
00485   return true;
00486 }
00487 
00488 } //namespace.
00489 
Generated on Sat Jan 28 04:22:42 2012 for hoople2 project by  doxygen 1.6.3