00001 #ifndef OCTOPUS_IMPLEMENTATION_FILE
00002 #define OCTOPUS_IMPLEMENTATION_FILE
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018 #include "entity_data_bin.h"
00019 #include "entity_defs.h"
00020 #include "identity_tentacle.h"
00021 #include "infoton.h"
00022 #include "octopus.h"
00023 #include "tentacle.h"
00024 #include "unhandled_request.h"
00025
00026 #include <basis/chaos.h>
00027 #include <basis/istring.h>
00028 #include <basis/log_base.h>
00029 #include <basis/mutex.h>
00030 #include <basis/portable.h>
00031 #include <data_struct/amorph.cpp>
00032 #include <data_struct/string_hash.cpp>
00033 #include <mechanisms/safe_roller.h>
00034 #include <mechanisms/time_stamp.h>
00035
00036
00037
00038
00039
00040
00041 #undef GRAB_LOCK
00042 #define GRAB_LOCK \
00043 auto_synchronizer l(*_molock)
00044
00045
00046
00047
00048
00049 #define WHACK_RETURN(to_ret, to_whack) { \
00050 unhandled_request *bad_response = new unhandled_request(id, \
00051 request->classifier(), to_ret); \
00052 _responses->add_item(bad_response, id); \
00053 WHACK(to_whack); \
00054 return to_ret; \
00055 }
00056
00057 const int MAXIMUM_TRASH_SIZE = 128 * KILOBYTE;
00058
00059
00060 #undef LOG
00061 #define LOG(t) CLASS_EMERGENCY_LOG(program_wide_logger(), t)
00062
00063 const int OCTOPUS_CHECKING_INTERVAL = 4 * MINUTE_ms;
00064
00065
00066
00067
00069
00070 class filter_list : public array<tentacle *>
00071 {
00072 public:
00073 bool remove(tentacle *to_remove) {
00074 for (int i = 0; i < length(); i++) {
00075 if (get(i) == to_remove) {
00076 zap(i, i);
00077 return true;
00078 }
00079 }
00080 return false;
00081 }
00082 };
00083
00085
00086 class tentacle_record
00087 {
00088 public:
00089 tentacle *_limb;
00090 bool _filter;
00091
00092 tentacle_record(tentacle *limb, bool filter)
00093 : _limb(limb), _filter(filter) {}
00094
00095 ~tentacle_record() { WHACK(_limb); }
00096 };
00097
00099
00100 class modula_oblongata : public amorph<tentacle_record>
00101 {
00102 public:
00103 modula_oblongata() : amorph<tentacle_record>() {}
00104
00105 int find_index(const string_array &group) {
00106 for (int i = 0; i < elements(); i++) {
00107 if (borrow(i)->_limb->group().prefix_compare(group))
00108 return i;
00109 }
00110 return common::NOT_FOUND;
00111 }
00112
00113 tentacle *find(const string_array &group) {
00114 int indy = find_index(group);
00115 if (negative(indy)) return NIL;
00116 return borrow(indy)->_limb;
00117 }
00118
00119 bool zap(int a, int b) {
00120 outcome ret = amorph<tentacle_record>::zap(a, b);
00121 return ret == common::OKAY;
00122 }
00123
00124 bool zap(const string_array &group) {
00125 int indy = find_index(group);
00126 if (negative(indy)) return false;
00127 amorph<tentacle_record>::zap(indy, indy);
00128 return true;
00129 }
00130 };
00131
00133
00134 octopus::octopus(const istring &name, int max_per_ent)
00135 : _name(new istring(name)),
00136 _tentacles(new modula_oblongata),
00137 _molock(new mutex),
00138 _responses(new entity_data_bin(max_per_ent)),
00139 _disallow_removals(0),
00140 _next_cleaning(new time_stamp(OCTOPUS_CHECKING_INTERVAL)),
00141 _clean_lock(new mutex),
00142 _filters(new filter_list),
00143 _sequencer(new safe_roller(1, MAXINT / 2)),
00144 _rando(new chaos)
00145 {
00146 add_tentacle(new identity_tentacle(*this), true);
00147
00148 add_tentacle(new unhandled_request_tentacle(false), false);
00149
00150 }
00151
00152 octopus::~octopus()
00153 {
00154 FUNCDEF("destructor");
00155 WHACK(_filters);
00156 WHACK(_tentacles);
00157 WHACK(_responses);
00158 WHACK(_next_cleaning);
00159 WHACK(_clean_lock);
00160 WHACK(_name);
00161 WHACK(_molock);
00162 WHACK(_rando);
00163 WHACK(_sequencer);
00164 }
00165
00166 void octopus::lock_tentacles() { _molock->lock(); }
00167
00168 void octopus::unlock_tentacles() { _molock->unlock(); }
00169
00170 entity_data_bin &octopus::responses() { return *_responses; }
00171
00172 int octopus::locked_tentacle_count() { return _tentacles->elements(); }
00173
00174 const istring &octopus::name() const { return *_name; }
00175
00176 tentacle *octopus::locked_get_tentacle(int indy)
00177 { return _tentacles->borrow(indy)->_limb; }
00178
00179 infoton *octopus::acquire_specific_result(const octopus_request_id &id)
00180 { return _responses->acquire_for_identifier(id); }
00181
00182 infoton *octopus::acquire_result(const octopus_entity &requester,
00183 octopus_request_id &id)
00184 { return _responses->acquire_for_entity(requester, id); }
00185
00186 void octopus::unlock_tentacle(tentacle *to_unlock)
00187 {
00188 to_unlock = NIL;
00189 _molock->unlock();
00190 }
00191
00192 void octopus::expunge(const octopus_entity &to_remove)
00193 {
00194 FUNCDEF("expunge");
00195 {
00196
00197 GRAB_LOCK;
00198 _disallow_removals++;
00199 }
00200
00201
00202
00203 int len = _tentacles->elements();
00204 for (int i = 0; i < len; i++) {
00205 tentacle_record *curr = _tentacles->borrow(i);
00206 if (!curr || !curr->_limb) {
00207
00208 continue;
00209 }
00210
00211 curr->_limb->expunge(to_remove);
00212 }
00213
00214 {
00215
00216 GRAB_LOCK;
00217 _disallow_removals--;
00218 }
00219
00220
00221 int items_found = 1;
00222 infoton_list junk;
00223 while (items_found) {
00224
00225 items_found = responses().acquire_for_entity(to_remove, junk,
00226 MAXIMUM_TRASH_SIZE);
00227 junk.reset();
00228
00229 if (items_found)
00230 LOG(isprintf("cleaned %d items for expunged entity ", items_found)
00231 + to_remove.mangled_form());
00232
00233 }
00234
00235 }
00236
00237 outcome octopus::zap_tentacle(const string_array &tentacle_name)
00238 {
00239 tentacle *found = NIL;
00240 outcome ret = remove_tentacle(tentacle_name, found);
00241 WHACK(found);
00242 return ret;
00243 }
00244
00245 outcome octopus::add_tentacle(tentacle *to_add, bool filter)
00246 {
00247 FUNCDEF("add_tentacle");
00248 if (!to_add) return tentacle::BAD_INPUT;
00249 if (!to_add->group().length()) return tentacle::BAD_INPUT;
00250 outcome zapped_it = zap_tentacle(to_add->group());
00251 if (zapped_it == tentacle::OKAY) {
00252
00253 LOG(istring("removed existing tentacle: ") + to_add->group().text_form());
00254
00255 }
00256 GRAB_LOCK;
00257 tentacle *found = _tentacles->find(to_add->group());
00258
00259
00260 if (found) return tentacle::ALREADY_EXISTS;
00261 to_add->attach_storage(*_responses);
00262 tentacle_record *new_record = new tentacle_record(to_add, filter);
00263 _tentacles->append(new_record);
00264 if (filter) *_filters += to_add;
00265 #ifdef DEBUG_OCTOPUS
00266 LOG(istring("added tentacle on ") + to_add->group().text_form());
00267 #endif
00268 return tentacle::OKAY;
00269 }
00270
00271 outcome octopus::remove_tentacle(const string_array &group_name,
00272 tentacle * &free_me)
00273 {
00274 FUNCDEF("remove_tentacle");
00275 free_me = NIL;
00276 if (!group_name.length()) return tentacle::BAD_INPUT;
00277 while (true) {
00278
00279
00280 _molock->lock();
00281 if (!_disallow_removals) {
00282
00283
00284 break;
00285 }
00286 if (_disallow_removals < 0) {
00287 continuable_error(class_name(), func, "logic error in removal "
00288 "reference counter.");
00289 }
00290 _molock->unlock();
00291 portable::sleep_ms(0);
00292 }
00293 int indy = _tentacles->find_index(group_name);
00294 if (negative(indy)) {
00295
00296 _molock->unlock();
00297 return tentacle::NOT_FOUND;
00298 }
00299
00300 tentacle_record *freeing = _tentacles->acquire(indy);
00301 _tentacles->zap(indy, indy);
00302 free_me = freeing->_limb;
00303 _filters->remove(free_me);
00304 _molock->unlock();
00305 freeing->_limb = NIL;
00306 WHACK(freeing);
00307 return tentacle::OKAY;
00308 }
00309
00310 outcome octopus::restore(const string_array &classifier,
00311 byte_array &packed_form, infoton * &reformed)
00312 {
00313 FUNCDEF("restore");
00314 periodic_cleaning();
00315
00316 reformed = NIL;
00317 if (!classifier.length()) return tentacle::BAD_INPUT;
00318 if (!packed_form.length()) return tentacle::BAD_INPUT;
00319 if (!classifier.length()) return tentacle::BAD_INPUT;
00320 {
00321
00322 GRAB_LOCK;
00323 _disallow_removals++;
00324 }
00325 tentacle *found = _tentacles->find(classifier);
00326 outcome to_return;
00327 if (!found) {
00328 #ifdef DEBUG_OCTOPUS
00329 LOG(istring("tentacle not found for: ") + classifier.text_form());
00330 #endif
00331 to_return = tentacle::NOT_FOUND;
00332 } else {
00333 to_return = found->reconstitute(classifier, packed_form, reformed);
00334 }
00335
00336 GRAB_LOCK;
00337 _disallow_removals--;
00338 return to_return;
00339 }
00340
00341 outcome octopus::evaluate(infoton *request, const octopus_request_id &id,
00342 bool now)
00343 {
00344 FUNCDEF("evaluate");
00345 periodic_cleaning();
00346
00347
00348 if (!request->classifier().length()) {
00349 #ifdef DEBUG_OCTOPUS
00350 LOG("failed due to empty classifier.");
00351 #endif
00352 WHACK_RETURN(tentacle::BAD_INPUT, request);
00353 }
00354
00355 _molock->lock();
00356
00357
00358 _disallow_removals++;
00359
00360
00361 for (int i = 0; i < _filters->length(); i++) {
00362 tentacle *current = (*_filters)[i];
00363 #ifdef DEBUG_OCTOPUS_FILTERS
00364 LOG(isprintf("%d: checking ", i + 1) + current->group().text_form());
00365 #endif
00366
00367
00368 bool is_relevant = current->group().prefix_compare(request->classifier());
00369
00370 #ifdef DEBUG_OCTOPUS_FILTERS
00371 if (is_relevant)
00372 LOG(istring("found it to be relevant! for ") + id.text_form())
00373 else
00374 LOG(istring("found it to not be relevant. for ") + id.text_form());
00375 #endif
00376
00377
00378 _molock->unlock();
00379
00380
00381 byte_array transformed;
00382
00383 outcome to_return = current->consume(*request, id, transformed);
00384
00385
00386 if (is_relevant) {
00387
00388
00389 #ifdef DEBUG_OCTOPUS_FILTERS
00390 LOG(istring("filter ") + current->group().text_form() + " consumed "
00391 "infoton from " + id.text_form() + " with result "
00392 + tentacle::outcome_name(to_return));
00393 #endif
00394 WHACK(request);
00395 GRAB_LOCK;
00396 _disallow_removals--;
00397 return to_return;
00398 } else {
00399
00400 #ifdef DEBUG_OCTOPUS_FILTERS
00401 LOG(istring("filter ") + current->group().text_form() + " vetted "
00402 "infoton " + id.text_form() + " with result "
00403 + tentacle::outcome_name(to_return));
00404 #endif
00405 if (to_return == tentacle::PARTIAL) {
00406
00407
00408
00409
00410 if (transformed.length()) {
00411
00412 string_array classif;
00413 byte_array decro;
00414 bool worked = infoton::fast_unpack(transformed, classif, decro);
00415 if (!worked) {
00416 LOG("failed to fast_unpack the transformed data.");
00417 } else {
00418 infoton *new_req = NIL;
00419 outcome rest_ret = restore(classif, decro, new_req);
00420 if (rest_ret == tentacle::OKAY) {
00421
00422 WHACK(request);
00423 request = new_req;
00424 } else {
00425 LOG("failed to restore transformed infoton.");
00426 }
00427 }
00428 }
00429
00430 _molock->lock();
00431 continue;
00432 } else {
00433
00434 #ifdef DEBUG_OCTOPUS_FILTERS
00435 LOG(istring("filter ") + current->group().text_form() + " denied "
00436 "infoton from " + id.text_form());
00437 #endif
00438 {
00439 GRAB_LOCK;
00440 _disallow_removals--;
00441 }
00442 WHACK_RETURN(to_return, request);
00443 }
00444 }
00445 }
00446
00447
00448
00449 #ifdef DEBUG_OCTOPUS_FILTERS
00450 LOG(istring("all filters approved infoton: ") + id.text_form());
00451 #endif
00452
00453
00454 tentacle *found = _tentacles->find(request->classifier());
00455
00456 _molock->unlock();
00457
00458
00459
00460 if (!found) {
00461 #ifdef DEBUG_OCTOPUS
00462 LOG(istring("tentacle not found for: ")
00463 + request->classifier().text_form());
00464 #endif
00465 GRAB_LOCK;
00466 _disallow_removals--;
00467 WHACK_RETURN(tentacle::NOT_FOUND, request);
00468 }
00469
00470
00471 if (!now && found->backgrounding()) {
00472
00473 outcome to_return = found->enqueue(request, id);
00474 GRAB_LOCK;
00475 _disallow_removals--;
00476 return to_return;
00477 } else {
00478
00479 byte_array ignored;
00480 outcome to_return = found->consume(*request, id, ignored);
00481 WHACK(request);
00482 GRAB_LOCK;
00483 _disallow_removals--;
00484 return to_return;
00485 }
00486 }
00487
00488 void octopus::periodic_cleaning()
00489 {
00490 FUNCDEF("periodic_cleaning");
00491 time_stamp next_time;
00492 {
00493 auto_synchronizer l(*_clean_lock);
00494 next_time = *_next_cleaning;
00495 }
00496 if (next_time < time_stamp()) {
00497
00498 _responses->clean_out_deadwood();
00499 auto_synchronizer l(*_clean_lock);
00500
00501 _next_cleaning->reset(OCTOPUS_CHECKING_INTERVAL);
00502 }
00503 }
00504
00505 tentacle *octopus::lock_tentacle(const string_array &tentacle_name)
00506 {
00507 if (!tentacle_name.length()) return NIL;
00508 _molock->lock();
00509 tentacle *found = _tentacles->find(tentacle_name);
00510 if (!found) {
00511 _molock->unlock();
00512 return NIL;
00513 }
00514 return found;
00515 }
00516
00517 octopus_entity octopus::issue_identity()
00518 {
00519 return octopus_entity(*_name, portable::process_id(), _sequencer->next_id(),
00520 _rando->inclusive(0, MAXINT / 4));
00521 }
00522
00523
00524 #endif //OCTOPUS_IMPLEMENTATION_FILE
00525