00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015 #include "entity_data_bin.h"
00016 #include "entity_defs.h"
00017 #include "infoton.h"
00018 #include "tentacle.h"
00019
00020 #include <basis/astring.h>
00021
00022 #include <basis/mutex.h>
00023 #include <loggers/program_wide_logger.h>
00024 #include <structures/set.h>
00025 #include <structures/string_array.h>
00026 #include <structures/amorph.h>
00027 #include <structures/string_hash.h>
00028 #include <textual/parser_bits.h>
00029 #include <timely/time_stamp.h>
00030
00031 using namespace basis;
00032 using namespace loggers;
00033 using namespace structures;
00034 using namespace textual;
00035 using namespace timely;
00036
00037 namespace octopi {
00038
00039
00040
00041
00042 #undef GRAB_LOCK
00043 #define GRAB_LOCK \
00044 auto_synchronizer l(*_ent_lock)
00045
00046 #undef LOG
00047 #define LOG(s) CLASS_EMERGENCY_LOG(program_wide_logger::get(), s)
00048
00049 const int OCTOPUS_TABLE_BITS = 6;
00050
00051
00052
00054
00055
00056
00058
00059 class infoton_holder
00060 {
00061 public:
00062 infoton *_item;
00063 octopus_request_id _id;
00064 time_stamp _when_added;
00065
00066 infoton_holder(const octopus_request_id &id = octopus_request_id(),
00067 infoton *item = NIL)
00068 : _item(item), _id(id), _when_added() {}
00069
00070 ~infoton_holder() { WHACK(_item); }
00071
00072 astring text_form() const {
00073 return astring("id=") + _id.text_form() + ", added="
00074 + _when_added.text_form() + ", item="
00075 + _item->classifier().text_form() + ", data="
00076 + _item->text_form();
00077 }
00078 };
00079
00081
00082 class entity_basket : public amorph<infoton_holder>
00083 {
00084 public:
00085 time_stamp _last_active;
00086
00087 astring text_form() const {
00088 astring to_return;
00089 for (int i = 0; i < elements(); i++)
00090 to_return += get(i)->text_form() + parser_bits::platform_eol_to_chars();
00091 return to_return;
00092 }
00093 };
00094
00096
00097 class entity_hasher : public hashing_algorithm
00098 {
00099 public:
00100 virtual hashing_algorithm *clone() const { return new entity_hasher; }
00101
00102 virtual basis::un_int hash(const void *key_data, int formal(key_length)) const {
00103 octopus_entity *key = (octopus_entity *)key_data;
00104
00105 return basis::un_int(
00106 key->process_id()
00107 + (key->add_in() << 10)
00108 + (key->sequencer() << 14)
00109 + (key->hostname()[0] << 20)
00110 + (key->hostname()[1] << 24) );
00111 }
00112 };
00113
00115
00116 class entity_item_hash
00117 : public hash_table<octopus_entity, entity_basket>
00118 {
00119 public:
00120 entity_item_hash(const entity_hasher &hash)
00121 : hash_table<octopus_entity, entity_basket>(hash, OCTOPUS_TABLE_BITS)
00122 {}
00123 };
00124
00126
00127 class basketcase : public structures::set<octopus_entity>
00128 {
00129 public:
00130 };
00131
00133
00134
00135 struct apply_struct
00136 {
00137 basketcase *_empty_baskets;
00138 entity_basket *_any_item;
00139 int &_items_held;
00140 int _decay_interval;
00141
00142 apply_struct(int &items_held)
00143 : _empty_baskets(NIL), _any_item(NIL), _items_held(items_held),
00144 _decay_interval(0) {}
00145 };
00146
00148
00149 entity_data_bin::entity_data_bin(int max_size_per_entity)
00150 : _table(new entity_item_hash(entity_hasher())),
00151 _ent_lock(new mutex),
00152 _action_count(0),
00153 _max_per_ent(max_size_per_entity),
00154 _items_held(0)
00155 {}
00156
00157 entity_data_bin::~entity_data_bin()
00158 {
00159 WHACK(_table);
00160 WHACK(_ent_lock);
00161 }
00162
00163 int entity_data_bin::entities() const
00164 {
00165 GRAB_LOCK;
00166 return _table->elements();
00167 }
00168
00169 struct text_form_accumulator { astring _accum; };
00170
00171 bool text_form_applier(const octopus_entity &formal(key), entity_basket &bask,
00172 void *data_link)
00173 {
00174 text_form_accumulator *shuttle = (text_form_accumulator *)data_link;
00175 shuttle->_accum += bask.text_form();
00176 return true;
00177 }
00178
00179 astring entity_data_bin::text_form() const
00180 {
00181 GRAB_LOCK;
00182 text_form_accumulator shuttle;
00183 _table->apply(text_form_applier, &shuttle);
00184 return shuttle._accum;
00185 }
00186
00187 bool scramble_applier(const octopus_entity &formal(key), entity_basket &bask,
00188 void *data_link)
00189 {
00190 #undef static_class_name
00191 #define static_class_name() "entity_data_bin"
00192
00193 int *county = (int *)data_link;
00194 *county += bask.elements();
00195 return true;
00196 #undef static_class_name
00197 }
00198
00199
00200
00201 int entity_data_bin::scramble_counter()
00202 {
00203 GRAB_LOCK;
00204 int count = 0;
00205 _table->apply(scramble_applier, &count);
00206 return count;
00207 }
00208
00209 #ifdef DEBUG_ENTITY_DATA_BIN
00210 #define DUMP_STATE \
00211 if ( !(_action_count++ % 100) ) { \
00212 int items = scramble_counter(); \
00213 LOG(a_sprintf("-> %d items counted.", items)); \
00214 }
00215 #else
00216 #define DUMP_STATE
00217 #endif
00218
00219 bool entity_data_bin::add_item(infoton *to_add,
00220 const octopus_request_id &orig_id)
00221 {
00222
00223 GRAB_LOCK;
00224
00225 infoton_holder *holder = new infoton_holder(orig_id, to_add);
00226
00227
00228 entity_basket *bask = _table->find(orig_id._entity);
00229 if (!bask) {
00230
00231 bask = new entity_basket;
00232 _table->add(orig_id._entity, bask);
00233 }
00234
00235 bask->_last_active = time_stamp();
00236
00237
00238 int current_size = 0;
00239 for (int i = 0; i < bask->elements(); i++)
00240 current_size += bask->borrow(i)->_item->packed_size();
00241
00242 if (current_size + to_add->packed_size() > _max_per_ent) {
00243 WHACK(holder);
00244 return false;
00245 }
00246
00247
00248 bask->append(holder);
00249 _items_held++;
00250 return true;
00251 }
00252
00253 bool any_item_applier(const octopus_entity &formal(key), entity_basket &bask,
00254 void *data_link)
00255 {
00256
00257
00258
00259
00260 apply_struct *apple = (apply_struct *)data_link;
00261
00262 if (!bask.elements()) {
00263
00264
00265
00266 return true;
00267 }
00268 apple->_any_item = &bask;
00269 return false;
00270 #undef static_class_name
00271 }
00272
00273 infoton *entity_data_bin::acquire_for_any(octopus_request_id &id)
00274 {
00275 FUNCDEF("acquire_for_any");
00276 GRAB_LOCK;
00277 apply_struct apple(_items_held);
00278 _table->apply(any_item_applier, &apple);
00279 if (!apple._any_item) return NIL;
00280 DUMP_STATE;
00281
00282 infoton_holder *found = apple._any_item->acquire(0);
00283 apple._any_item->zap(0, 0);
00284 if (!apple._any_item->elements()) {
00285
00286 #ifdef DEBUG_ENTITY_DATA_BIN
00287 LOG(astring("tossing empty basket ") + found->_id._entity.mangled_form());
00288 #endif
00289 _table->zap(found->_id._entity);
00290 }
00291 apple._any_item = NIL;
00292 infoton *to_return = found->_item;
00293 id = found->_id;
00294 found->_item = NIL;
00295 WHACK(found);
00296 _items_held--;
00297
00298 if (_items_held < 0)
00299 LOG("logic error: number of items went below zero.");
00300
00301 return to_return;
00302 }
00303
00304 int entity_data_bin::acquire_for_entity(const octopus_entity &requester,
00305 infoton_list &items, int maximum_size)
00306 {
00307
00308
00309
00310 items.reset();
00311 if (maximum_size <= 0) maximum_size = 20 * KILOBYTE;
00312
00313 octopus_request_id id;
00314 int items_found = 0;
00315 while (maximum_size > 0) {
00316 infoton *inf = acquire_for_entity(requester, id);
00317 if (!inf)
00318 break;
00319 items.append(new infoton_id_pair(inf, id));
00320 maximum_size -= inf->packed_size();
00321 items_found++;
00322 }
00323 return items_found;
00324 }
00325
00326 infoton *entity_data_bin::acquire_for_entity(const octopus_entity &requester,
00327 octopus_request_id &id)
00328 {
00329 FUNCDEF("acquire_for_entity [single]");
00330 id = octopus_request_id();
00331 GRAB_LOCK;
00332 infoton *to_return = NIL;
00333 entity_basket *bask = _table->find(requester);
00334 if (!bask) {
00335 return NIL;
00336 }
00337 if (!bask->elements()) {
00338 #ifdef DEBUG_ENTITY_DATA_BIN
00339 LOG(astring("tossing empty basket ") + requester.mangled_form());
00340 #endif
00341 _table->zap(requester);
00342 return NIL;
00343 }
00344 DUMP_STATE;
00345 id = bask->get(0)->_id;
00346 to_return = bask->borrow(0)->_item;
00347 bask->borrow(0)->_item = NIL;
00348 bask->zap(0, 0);
00349 if (!bask->elements()) {
00350 #ifdef DEBUG_ENTITY_DATA_BIN
00351 LOG(astring("tossing empty basket ") + requester.mangled_form());
00352 #endif
00353 _table->zap(requester);
00354 }
00355 _items_held--;
00356
00357 if (_items_held < 0)
00358 LOG("logic error: number of items went below zero.");
00359
00360 return to_return;
00361 }
00362
00363 infoton *entity_data_bin::acquire_for_identifier(const octopus_request_id &id)
00364 {
00365 FUNCDEF("acquire_for_identifier");
00366 infoton *to_return = NIL;
00367 GRAB_LOCK;
00368 entity_basket *bask = _table->find(id._entity);
00369 if (!bask) return NIL;
00370 if (!bask->elements()) {
00371 #ifdef DEBUG_ENTITY_DATA_BIN
00372 LOG(astring("tossing empty basket ") + id._entity.mangled_form());
00373 #endif
00374 _table->zap(id._entity);
00375 return NIL;
00376 }
00377 for (int i = 0; i < bask->elements(); i++) {
00378 if (bask->get(i)->_id == id) {
00379 to_return = bask->borrow(i)->_item;
00380 bask->borrow(i)->_item = NIL;
00381 bask->zap(i, i);
00382 DUMP_STATE;
00383 if (!bask->elements()) {
00384 #ifdef DEBUG_ENTITY_DATA_BIN
00385 LOG(astring("tossing empty basket ") + id._entity.mangled_form());
00386 #endif
00387 _table->zap(id._entity);
00388 }
00389 _items_held--;
00390
00391 if (_items_held < 0)
00392 LOG("logic error: number of items went below zero.");
00393
00394 return to_return;
00395 }
00396 }
00397 return NIL;
00398 }
00399
00400 bool cleaning_applier(const octopus_entity &key, entity_basket &bask,
00401 void *data_link)
00402 {
00403 #define static_class_name() "entity_data_bin"
00404 FUNCDEF("cleaning_applier");
00405 apply_struct *apple = (apply_struct *)data_link;
00406 time_stamp expiration_time(-apple->_decay_interval);
00407
00408 int whack_count = 0;
00409 for (int i = 0; i < bask.elements(); i++) {
00410 infoton_holder &rec = *bask.borrow(i);
00411 if (rec._when_added <= expiration_time) {
00412
00413 #ifdef DEBUG_ENTITY_DATA_BIN
00414 LOG(astring("whacking old item ") + rec._id.text_form());
00415 #endif
00416 whack_count++;
00417 apple->_items_held--;
00418
00419 if (apple->_items_held < 0)
00420 LOG("logic error: number of items went below zero.");
00421
00422 bask.zap(i, i);
00423 i--;
00424 } else {
00425
00426
00427
00428
00429 break;
00430 }
00431 }
00432 #ifdef DEBUG_ENTITY_DATA_BIN
00433 if (whack_count)
00434 LOG(a_sprintf("==> whacked %d old items.", whack_count));
00435 #endif
00436 if (!bask.elements()) {
00437
00438
00439
00440 *apple->_empty_baskets += key;
00441
00442 }
00443
00444
00445 return true;
00446 #undef static_class_name
00447 }
00448
00449 void entity_data_bin::clean_out_deadwood(int decay_interval)
00450 {
00451 #ifdef DEBUG_ENTITY_DATA_BIN
00452 FUNCDEF("clean_out_deadwood");
00453 #endif
00454 GRAB_LOCK;
00455
00456 apply_struct apple(_items_held);
00457 basketcase empty_baskets;
00458 apple._empty_baskets = &empty_baskets;
00459 apple._decay_interval = decay_interval;
00460 _table->apply(cleaning_applier, &apple);
00461
00462
00463 for (int i = empty_baskets.length() - 1; i >= 0; i--) {
00464 #ifdef DEBUG_ENTITY_DATA_BIN
00465 LOG(astring("removing basket ") + empty_baskets.get(i).mangled_form());
00466 #endif
00467 _table->zap(empty_baskets.get(i));
00468 empty_baskets.zap(i, i);
00469
00470 }
00471 }
00472
00473 bool entity_data_bin::get_sizes(const octopus_entity &id, int &items,
00474 int &bytes)
00475 {
00476
00477 items = 0;
00478 bytes = 0;
00479 GRAB_LOCK;
00480 entity_basket *bask = _table->find(id);
00481 if (!bask || !bask->elements()) return false;
00482 items = bask->elements();
00483 for (int i = 0; i < bask->elements(); i++)
00484 bytes += bask->borrow(i)->_item->packed_size();
00485 return true;
00486 }
00487
00488 }
00489