00001 #ifndef ENTITY_DATA_BIN_IMPLEMENTATION_FILE
00002 #define ENTITY_DATA_BIN_IMPLEMENTATION_FILE
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018 #include "entity_data_bin.h"
00019 #include "entity_defs.h"
00020 #include "infoton.h"
00021 #include "tentacle.h"
00022
00023 #include <basis/istring.h>
00024 #include <basis/log_base.h>
00025 #include <basis/mutex.h>
00026 #include <data_struct/amorph.cpp>
00027 #include <data_struct/string_hash.cpp>
00028 #include <mechanisms/time_stamp.h>
00029
00030
00031
00032
00033 #undef GRAB_LOCK
00034 #define GRAB_LOCK \
00035 auto_synchronizer l(*_ent_lock)
00036
00037 #undef LOG
00038 #define LOG(s) CLASS_EMERGENCY_LOG(program_wide_logger(), s)
00039
00040 const int OCTOPUS_TABLE_BITS = 6;
00041
00042
00043
00044 const int DATA_DECAY_INTERVAL = 4 * MINUTE_ms;
00045
00046
00047
00049
00050 class infoton_holder
00051 {
00052 public:
00053 infoton *_item;
00054 octopus_request_id _id;
00055 time_stamp _when_added;
00056
00057 infoton_holder(const octopus_request_id &id = octopus_request_id(),
00058 infoton *item = NIL)
00059 : _item(item), _id(id), _when_added() {}
00060
00061 ~infoton_holder() { WHACK(_item); }
00062
00063 istring text_form() const {
00064 return istring("id=") + _id.text_form() + ", added="
00065 + _when_added.text_form() + ", item="
00066 + _item->classifier().text_form() + ", data="
00067 + _item->text_form();
00068 }
00069 };
00070
00072
00073 class entity_basket : public amorph<infoton_holder>
00074 {
00075 public:
00076 time_stamp _last_active;
00077
00078 istring text_form() const {
00079 istring to_return;
00080 for (int i = 0; i < elements(); i++)
00081 to_return += get(i)->text_form() + log_base::platform_ending();
00082 return to_return;
00083 }
00084 };
00085
00087
00088 class entity_hasher : public hashing_algorithm
00089 {
00090 public:
00091 virtual hashing_algorithm *clone() const { return new entity_hasher; }
00092
00093 virtual u_int hash(const void *key_data, int formal(key_length)) const {
00094 octopus_entity *key = (octopus_entity *)key_data;
00095
00096 return u_int(
00097 key->process_id()
00098 + (key->add_in() << 10)
00099 + (key->sequencer() << 14)
00100 + (key->hostname()[0] << 20)
00101 + (key->hostname()[1] << 24) );
00102 }
00103 };
00104
00106
00107 class entity_item_hash
00108 : public hash_table<octopus_entity, entity_basket>
00109 {
00110 public:
00111 entity_item_hash(const entity_hasher &hash)
00112 : hash_table<octopus_entity, entity_basket>(hash, OCTOPUS_TABLE_BITS)
00113 {}
00114 };
00115
00117
00118 class basketcase : public basis::set<octopus_entity>
00119 {
00120 public:
00121 };
00122
00124
00125
00126 struct apply_struct
00127 {
00128 basketcase *_empty_baskets;
00129 entity_basket *_any_item;
00130 int &_items_held;
00131
00132 apply_struct(int &items_held)
00133 : _empty_baskets(NIL), _any_item(NIL), _items_held(items_held) {}
00134 };
00135
00137
00138 entity_data_bin::entity_data_bin(int max_size_per_entity)
00139 : _table(new entity_item_hash(entity_hasher())),
00140 _ent_lock(new mutex),
00141 _action_count(0),
00142 _max_per_ent(max_size_per_entity),
00143 _items_held(0)
00144 {}
00145
00146 entity_data_bin::~entity_data_bin()
00147 {
00148 WHACK(_table);
00149 WHACK(_ent_lock);
00150 }
00151
00152 int entity_data_bin::entities() const
00153 {
00154 GRAB_LOCK;
00155 return _table->elements();
00156 }
00157
00158 struct text_form_accumulator { istring _accum; };
00159
00160 bool text_form_applier(const octopus_entity &formal(key), entity_basket &bask,
00161 void *data_link)
00162 {
00163 text_form_accumulator *shuttle = (text_form_accumulator *)data_link;
00164 shuttle->_accum += bask.text_form();
00165 return true;
00166 }
00167
00168 istring entity_data_bin::text_form() const
00169 {
00170 GRAB_LOCK;
00171 text_form_accumulator shuttle;
00172 _table->apply(text_form_applier, &shuttle);
00173 return shuttle._accum;
00174 }
00175
00176 bool scramble_applier(const octopus_entity &formal(key), entity_basket &bask,
00177 void *data_link)
00178 {
00179 #undef static_class_name
00180 #define static_class_name() "entity_data_bin"
00181 FUNCDEF("scramble_applier");
00182 int *county = (int *)data_link;
00183 *county += bask.elements();
00184 return true;
00185 #undef static_class_name
00186 }
00187
00188
00189
00190 int entity_data_bin::scramble_counter()
00191 {
00192 int count = 0;
00193 _table->apply(scramble_applier, &count);
00194 return count;
00195 }
00196
00197 #ifdef DEBUG_ENTITY_DATA_BIN
00198 #define DUMP_STATE \
00199 if ( !(_action_count++ % 100) ) { \
00200 int items = scramble_counter(); \
00201 LOG(isprintf("-> %d items counted.", items)); \
00202 }
00203 #else
00204 #define DUMP_STATE
00205 #endif
00206
00207 bool entity_data_bin::add_item(infoton *to_add,
00208 const octopus_request_id &orig_id)
00209 {
00210 FUNCDEF("add_item");
00211 GRAB_LOCK;
00212
00213 infoton_holder *holder = new infoton_holder(orig_id, to_add);
00214
00215
00216 entity_basket *bask = _table->find(orig_id._entity);
00217 if (!bask) {
00218
00219 bask = new entity_basket;
00220 _table->add(orig_id._entity, bask);
00221 }
00222
00223 bask->_last_active = time_stamp();
00224
00225
00226 int current_size = 0;
00227 for (int i = 0; i < bask->elements(); i++)
00228 current_size += bask->borrow(i)->_item->packed_size();
00229
00230 if (current_size + to_add->packed_size() > _max_per_ent) {
00231 WHACK(holder);
00232 return false;
00233 }
00234
00235
00236 bask->append(holder);
00237 _items_held++;
00238 return true;
00239 }
00240
00241 bool any_item_applier(const octopus_entity &formal(key), entity_basket &bask,
00242 void *data_link)
00243 {
00244 #define static_class_name() "entity_data_bin"
00245 FUNCDEF("any_item_applier");
00246 apply_struct *apple = (apply_struct *)data_link;
00247
00248 if (!bask.elements()) {
00249 #ifdef DEBUG_ENTITY_DATA_BIN
00250 LOG(istring("saw empty basket ") + key.mangled_form());
00251 #endif
00252 return true;
00253 }
00254 apple->_any_item = &bask;
00255 return false;
00256 #undef static_class_name
00257 }
00258
00259 infoton *entity_data_bin::acquire_for_any(octopus_request_id &id)
00260 {
00261 FUNCDEF("acquire_for_any");
00262 GRAB_LOCK;
00263 apply_struct apple(_items_held);
00264 _table->apply(any_item_applier, &apple);
00265 if (!apple._any_item) return NIL;
00266 DUMP_STATE;
00267
00268 infoton_holder *found = apple._any_item->acquire(0);
00269 apple._any_item->zap(0, 0);
00270 if (!apple._any_item->elements()) {
00271
00272 #ifdef DEBUG_ENTITY_DATA_BIN
00273 LOG(istring("tossing empty basket ") + found->_id._entity.mangled_form());
00274 #endif
00275 _table->zap(found->_id._entity);
00276 }
00277 apple._any_item = NIL;
00278 infoton *to_return = found->_item;
00279 id = found->_id;
00280 found->_item = NIL;
00281 WHACK(found);
00282 _items_held--;
00283
00284 if (_items_held < 0)
00285 LOG("logic error: number of items went below zero.");
00286
00287 return to_return;
00288 }
00289
00290 int entity_data_bin::acquire_for_entity(const octopus_entity &requester,
00291 infoton_list &items, int maximum_size)
00292 {
00293 FUNCDEF("acquire_for_entity [multiple]");
00294 items.reset();
00295 if (maximum_size <= 0) maximum_size = 20 * KILOBYTE;
00296
00297 octopus_request_id id;
00298 int items_found = 0;
00299 while (maximum_size > 0) {
00300 infoton *inf = acquire_for_entity(requester, id);
00301 if (!inf)
00302 break;
00303 items.append(new infoton_id_pair(inf, id));
00304 maximum_size -= inf->packed_size();
00305 items_found++;
00306 }
00307 return items_found;
00308 }
00309
00310 infoton *entity_data_bin::acquire_for_entity(const octopus_entity &requester,
00311 octopus_request_id &id)
00312 {
00313 FUNCDEF("acquire_for_entity [single]");
00314 id = octopus_request_id();
00315 GRAB_LOCK;
00316 infoton *to_return = NIL;
00317 entity_basket *bask = _table->find(requester);
00318 if (!bask) {
00319 return NIL;
00320 }
00321 if (!bask->elements()) {
00322 #ifdef DEBUG_ENTITY_DATA_BIN
00323 LOG(istring("tossing empty basket ") + requester.mangled_form());
00324 #endif
00325 _table->zap(requester);
00326 return NIL;
00327 }
00328 DUMP_STATE;
00329 id = bask->get(0)->_id;
00330 to_return = bask->borrow(0)->_item;
00331 bask->borrow(0)->_item = NIL;
00332 bask->zap(0, 0);
00333 if (!bask->elements()) {
00334 #ifdef DEBUG_ENTITY_DATA_BIN
00335 LOG(istring("tossing empty basket ") + requester.mangled_form());
00336 #endif
00337 _table->zap(requester);
00338 }
00339 _items_held--;
00340
00341 if (_items_held < 0)
00342 LOG("logic error: number of items went below zero.");
00343
00344 return to_return;
00345 }
00346
00347 infoton *entity_data_bin::acquire_for_identifier(const octopus_request_id &id)
00348 {
00349 FUNCDEF("acquire_for_identifier");
00350 infoton *to_return = NIL;
00351 GRAB_LOCK;
00352 entity_basket *bask = _table->find(id._entity);
00353 if (!bask) return NIL;
00354 if (!bask->elements()) {
00355 #ifdef DEBUG_ENTITY_DATA_BIN
00356 LOG(istring("tossing empty basket ") + id._entity.mangled_form());
00357 #endif
00358 _table->zap(id._entity);
00359 return NIL;
00360 }
00361 for (int i = 0; i < bask->elements(); i++) {
00362 if (bask->get(i)->_id == id) {
00363 to_return = bask->borrow(i)->_item;
00364 bask->borrow(i)->_item = NIL;
00365 bask->zap(i, i);
00366 DUMP_STATE;
00367 if (!bask->elements()) {
00368 #ifdef DEBUG_ENTITY_DATA_BIN
00369 LOG(istring("tossing empty basket ") + id._entity.mangled_form());
00370 #endif
00371 _table->zap(id._entity);
00372 }
00373 _items_held--;
00374
00375 if (_items_held < 0)
00376 LOG("logic error: number of items went below zero.");
00377
00378 return to_return;
00379 }
00380 }
00381 return NIL;
00382 }
00383
00384 bool cleaning_applier(const octopus_entity &key, entity_basket &bask,
00385 void *data_link)
00386 {
00387 #define static_class_name() "entity_data_bin"
00388 FUNCDEF("cleaning_applier");
00389 apply_struct *apple = (apply_struct *)data_link;
00390 time_stamp expiration_time(-DATA_DECAY_INTERVAL);
00391
00392 int whack_count = 0;
00393 for (int i = 0; i < bask.elements(); i++) {
00394 infoton_holder &rec = *bask.borrow(i);
00395 if (rec._when_added <= expiration_time) {
00396
00397 #ifdef DEBUG_ENTITY_DATA_BIN
00398 LOG(istring("whacking old item ") + rec._id.text_form());
00399 #endif
00400 whack_count++;
00401 apple->_items_held--;
00402
00403 if (apple->_items_held < 0)
00404 LOG("logic error: number of items went below zero.");
00405
00406 bask.zap(i, i);
00407 i--;
00408 } else {
00409
00410
00411
00412
00413 break;
00414 }
00415 }
00416
00417 if (whack_count)
00418 LOG(isprintf("==> whacked %d old items.", whack_count));
00419
00420 if (!bask.elements()) {
00421
00422
00423 *apple->_empty_baskets += key;
00424 }
00425
00426
00427 return true;
00428 #undef static_class_name
00429 }
00430
00431 void entity_data_bin::clean_out_deadwood()
00432 {
00433 FUNCDEF("clean_out_deadwood");
00434 GRAB_LOCK;
00435
00436 apply_struct apple(_items_held);
00437 basketcase empty_baskets;
00438 apple._empty_baskets = &empty_baskets;
00439 _table->apply(cleaning_applier, &apple);
00440
00441
00442 for (int i = empty_baskets.length() - 1; i >= 0; i--) {
00443 #ifdef DEBUG_ENTITY_DATA_BIN
00444 LOG(istring("removing basket ") + empty_baskets.get(i).mangled_form());
00445 #endif
00446 _table->zap(empty_baskets.get(i));
00447 empty_baskets.zap(i, i);
00448
00449 }
00450 }
00451
00452 bool entity_data_bin::get_sizes(const octopus_entity &id, int &items,
00453 int &bytes)
00454 {
00455 FUNCDEF("get_sizes");
00456 items = 0;
00457 bytes = 0;
00458 GRAB_LOCK;
00459 entity_basket *bask = _table->find(id);
00460 if (!bask || !bask->elements()) return false;
00461 items = bask->elements();
00462 for (int i = 0; i < bask->elements(); i++)
00463 bytes += bask->borrow(i)->_item->packed_size();
00464 return true;
00465 }
00466
00467
00468 #endif //ENTITY_DATA_BIN_IMPLEMENTATION_FILE
00469