00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015 #include "entity_data_bin.h"
00016 #include "entity_defs.h"
00017 #include "identity_tentacle.h"
00018 #include "infoton.h"
00019 #include "octopus.h"
00020 #include "tentacle.h"
00021 #include "unhandled_request.h"
00022
00023 #include <basis/astring.h>
00024 #include <basis/mutex.h>
00025 #include <configuration/application_configuration.h>
00026 #include <loggers/critical_events.h>
00027 #include <loggers/program_wide_logger.h>
00028 #include <mathematics/chaos.h>
00029 #include <structures/amorph.h>
00030 #include <structures/string_hash.h>
00031 #include <timely/time_control.h>
00032 #include <timely/time_stamp.h>
00033
00034 using namespace basis;
00035 using namespace configuration;
00036 using namespace loggers;
00037 using namespace mathematics;
00038 using namespace processes;
00039 using namespace structures;
00040 using namespace timely;
00041
00042 namespace octopi {
00043
00044
00045
00046
00047
00048
00049 #undef GRAB_LOCK
00050 #define GRAB_LOCK \
00051 auto_synchronizer l(*_molock)
00052
00053
00054
00055
00056
00057 #define WHACK_RETURN(to_ret, to_whack) { \
00058 unhandled_request *bad_response = new unhandled_request(id, \
00059 request->classifier(), to_ret); \
00060 _responses->add_item(bad_response, id); \
00061 WHACK(to_whack); \
00062 return to_ret; \
00063 }
00064
00065 const int MAXIMUM_TRASH_SIZE = 128 * KILOBYTE;
00066
00067
00068 #undef LOG
00069 #define LOG(t) CLASS_EMERGENCY_LOG(program_wide_logger::get(), t)
00070
00071 const int OCTOPUS_CHECKING_INTERVAL = 4 * MINUTE_ms;
00072
00073
00074
00075
00077
00078 class filter_list : public array<tentacle *>
00079 {
00080 public:
00081 bool remove(tentacle *to_remove) {
00082 for (int i = 0; i < length(); i++) {
00083 if (get(i) == to_remove) {
00084 zap(i, i);
00085 return true;
00086 }
00087 }
00088 return false;
00089 }
00090 };
00091
00093
00094 class tentacle_record
00095 {
00096 public:
00097 tentacle *_limb;
00098 bool _filter;
00099
00100 tentacle_record(tentacle *limb, bool filter)
00101 : _limb(limb), _filter(filter) {}
00102
00103 ~tentacle_record() { WHACK(_limb); }
00104 };
00105
00107
00108 class modula_oblongata : public amorph<tentacle_record>
00109 {
00110 public:
00111 modula_oblongata() : amorph<tentacle_record>() {}
00112
00113 int find_index(const string_array &group) {
00114 for (int i = 0; i < elements(); i++) {
00115 if (borrow(i)->_limb->group().prefix_compare(group))
00116 return i;
00117 }
00118 return common::NOT_FOUND;
00119 }
00120
00121 tentacle *find(const string_array &group) {
00122 int indy = find_index(group);
00123 if (negative(indy)) return NIL;
00124 return borrow(indy)->_limb;
00125 }
00126
00127 bool zap(int a, int b) {
00128 outcome ret = amorph<tentacle_record>::zap(a, b);
00129 return ret == common::OKAY;
00130 }
00131
00132 bool zap(const string_array &group) {
00133 int indy = find_index(group);
00134 if (negative(indy)) return false;
00135 amorph<tentacle_record>::zap(indy, indy);
00136 return true;
00137 }
00138 };
00139
00141
00142 octopus::octopus(const astring &name, int max_per_ent)
00143 : _name(new astring(name)),
00144 _tentacles(new modula_oblongata),
00145 _molock(new mutex),
00146 _responses(new entity_data_bin(max_per_ent)),
00147 _disallow_removals(0),
00148 _next_cleaning(new time_stamp(OCTOPUS_CHECKING_INTERVAL)),
00149 _clean_lock(new mutex),
00150 _filters(new filter_list),
00151 _sequencer(new safe_roller(1, MAXINT32 / 2)),
00152 _rando(new chaos)
00153 {
00154 add_tentacle(new identity_tentacle(*this), true);
00155
00156 add_tentacle(new unhandled_request_tentacle(false), false);
00157
00158 }
00159
00160 octopus::~octopus()
00161 {
00162
00163 WHACK(_filters);
00164 WHACK(_tentacles);
00165 WHACK(_responses);
00166 WHACK(_next_cleaning);
00167 WHACK(_clean_lock);
00168 WHACK(_name);
00169 WHACK(_molock);
00170 WHACK(_rando);
00171 WHACK(_sequencer);
00172 }
00173
00174 void octopus::lock_tentacles() { _molock->lock(); }
00175
00176 void octopus::unlock_tentacles() { _molock->unlock(); }
00177
00178 entity_data_bin &octopus::responses() { return *_responses; }
00179
00180 int octopus::locked_tentacle_count() { return _tentacles->elements(); }
00181
00182 const astring &octopus::name() const { return *_name; }
00183
00184 tentacle *octopus::locked_get_tentacle(int indy)
00185 { return _tentacles->borrow(indy)->_limb; }
00186
00187 infoton *octopus::acquire_specific_result(const octopus_request_id &id)
00188 { return _responses->acquire_for_identifier(id); }
00189
00190 infoton *octopus::acquire_result(const octopus_entity &requester,
00191 octopus_request_id &id)
00192 { return _responses->acquire_for_entity(requester, id); }
00193
00194 void octopus::unlock_tentacle(tentacle *to_unlock)
00195 {
00196 to_unlock = NIL;
00197 _molock->unlock();
00198 }
00199
00200 void octopus::expunge(const octopus_entity &to_remove)
00201 {
00202 FUNCDEF("expunge");
00203 {
00204
00205 GRAB_LOCK;
00206 _disallow_removals++;
00207 }
00208
00209
00210
00211 int len = _tentacles->elements();
00212 for (int i = 0; i < len; i++) {
00213 tentacle_record *curr = _tentacles->borrow(i);
00214 if (!curr || !curr->_limb) {
00215
00216 continue;
00217 }
00218
00219 curr->_limb->expunge(to_remove);
00220 }
00221
00222 {
00223
00224 GRAB_LOCK;
00225 _disallow_removals--;
00226 }
00227
00228
00229 int items_found = 1;
00230 infoton_list junk;
00231 while (items_found) {
00232
00233 items_found = responses().acquire_for_entity(to_remove, junk,
00234 MAXIMUM_TRASH_SIZE);
00235 junk.reset();
00236
00237 if (items_found)
00238 LOG(a_sprintf("cleaned %d items for expunged entity ", items_found)
00239 + to_remove.mangled_form());
00240
00241 }
00242
00243 }
00244
00245 outcome octopus::zap_tentacle(const string_array &tentacle_name)
00246 {
00247 tentacle *found = NIL;
00248 outcome ret = remove_tentacle(tentacle_name, found);
00249 WHACK(found);
00250 return ret;
00251 }
00252
00253 outcome octopus::add_tentacle(tentacle *to_add, bool filter)
00254 {
00255 FUNCDEF("add_tentacle");
00256 if (!to_add) return tentacle::BAD_INPUT;
00257 if (!to_add->group().length()) return tentacle::BAD_INPUT;
00258 outcome zapped_it = zap_tentacle(to_add->group());
00259 if (zapped_it == tentacle::OKAY) {
00260
00261 LOG(astring("removed existing tentacle: ") + to_add->group().text_form());
00262
00263 }
00264 GRAB_LOCK;
00265 tentacle *found = _tentacles->find(to_add->group());
00266
00267
00268 if (found) return tentacle::ALREADY_EXISTS;
00269 to_add->attach_storage(*_responses);
00270 tentacle_record *new_record = new tentacle_record(to_add, filter);
00271 _tentacles->append(new_record);
00272 if (filter) *_filters += to_add;
00273 #ifdef DEBUG_OCTOPUS
00274 LOG(astring("added tentacle on ") + to_add->group().text_form());
00275 #endif
00276 return tentacle::OKAY;
00277 }
00278
00279 outcome octopus::remove_tentacle(const string_array &group_name,
00280 tentacle * &free_me)
00281 {
00282 FUNCDEF("remove_tentacle");
00283 free_me = NIL;
00284 if (!group_name.length()) return tentacle::BAD_INPUT;
00285 while (true) {
00286
00287
00288 _molock->lock();
00289 if (!_disallow_removals) {
00290
00291
00292 break;
00293 }
00294 if (_disallow_removals < 0) {
00295 continuable_error(class_name(), func, "logic error in removal "
00296 "reference counter.");
00297 }
00298 _molock->unlock();
00299 time_control::sleep_ms(0);
00300 }
00301 int indy = _tentacles->find_index(group_name);
00302 if (negative(indy)) {
00303
00304 _molock->unlock();
00305 return tentacle::NOT_FOUND;
00306 }
00307
00308 tentacle_record *freeing = _tentacles->acquire(indy);
00309 _tentacles->zap(indy, indy);
00310 free_me = freeing->_limb;
00311 _filters->remove(free_me);
00312 _molock->unlock();
00313 freeing->_limb = NIL;
00314 WHACK(freeing);
00315 return tentacle::OKAY;
00316 }
00317
00318 outcome octopus::restore(const string_array &classifier,
00319 byte_array &packed_form, infoton * &reformed)
00320 {
00321 #ifdef DEBUG_OCTOPUS
00322 FUNCDEF("restore");
00323 #endif
00324 periodic_cleaning();
00325
00326 reformed = NIL;
00327 if (!classifier.length()) return tentacle::BAD_INPUT;
00328 if (!packed_form.length()) return tentacle::BAD_INPUT;
00329 if (!classifier.length()) return tentacle::BAD_INPUT;
00330 {
00331
00332 GRAB_LOCK;
00333 _disallow_removals++;
00334 }
00335 tentacle *found = _tentacles->find(classifier);
00336 outcome to_return;
00337 if (!found) {
00338 #ifdef DEBUG_OCTOPUS
00339 LOG(astring("tentacle not found for: ") + classifier.text_form());
00340 #endif
00341 to_return = tentacle::NOT_FOUND;
00342 } else {
00343 to_return = found->reconstitute(classifier, packed_form, reformed);
00344 }
00345
00346 GRAB_LOCK;
00347 _disallow_removals--;
00348 return to_return;
00349 }
00350
00351 outcome octopus::evaluate(infoton *request, const octopus_request_id &id,
00352 bool now)
00353 {
00354 FUNCDEF("evaluate");
00355 periodic_cleaning();
00356
00357
00358 if (!request->classifier().length()) {
00359 #ifdef DEBUG_OCTOPUS
00360 LOG("failed due to empty classifier.");
00361 #endif
00362 WHACK_RETURN(tentacle::BAD_INPUT, request);
00363 }
00364
00365 _molock->lock();
00366
00367
00368 _disallow_removals++;
00369
00370
00371 for (int i = 0; i < _filters->length(); i++) {
00372 tentacle *current = (*_filters)[i];
00373 #ifdef DEBUG_OCTOPUS_FILTERS
00374 LOG(a_sprintf("%d: checking ", i + 1) + current->group().text_form());
00375 #endif
00376
00377
00378 bool is_relevant = current->group().prefix_compare(request->classifier());
00379
00380 #ifdef DEBUG_OCTOPUS_FILTERS
00381 if (is_relevant)
00382 LOG(astring("found it to be relevant! for ") + id.text_form())
00383 else
00384 LOG(astring("found it to not be relevant. for ") + id.text_form());
00385 #endif
00386
00387
00388 _molock->unlock();
00389
00390
00391 byte_array transformed;
00392
00393 outcome to_return = current->consume(*request, id, transformed);
00394
00395
00396 if (is_relevant) {
00397
00398
00399 #ifdef DEBUG_OCTOPUS_FILTERS
00400 LOG(astring("filter ") + current->group().text_form() + " consumed "
00401 "infoton from " + id.text_form() + " with result "
00402 + tentacle::outcome_name(to_return));
00403 #endif
00404 WHACK(request);
00405 GRAB_LOCK;
00406 _disallow_removals--;
00407 return to_return;
00408 } else {
00409
00410 #ifdef DEBUG_OCTOPUS_FILTERS
00411 LOG(astring("filter ") + current->group().text_form() + " vetted "
00412 "infoton " + id.text_form() + " with result "
00413 + tentacle::outcome_name(to_return));
00414 #endif
00415 if (to_return == tentacle::PARTIAL) {
00416
00417
00418
00419
00420 if (transformed.length()) {
00421
00422 string_array classif;
00423 byte_array decro;
00424 bool worked = infoton::fast_unpack(transformed, classif, decro);
00425 if (!worked) {
00426 LOG("failed to fast_unpack the transformed data.");
00427 } else {
00428 infoton *new_req = NIL;
00429 outcome rest_ret = restore(classif, decro, new_req);
00430 if (rest_ret == tentacle::OKAY) {
00431
00432 WHACK(request);
00433 request = new_req;
00434 } else {
00435 LOG("failed to restore transformed infoton.");
00436 }
00437 }
00438 }
00439
00440 _molock->lock();
00441 continue;
00442 } else {
00443
00444 #ifdef DEBUG_OCTOPUS_FILTERS
00445 LOG(astring("filter ") + current->group().text_form() + " denied "
00446 "infoton from " + id.text_form());
00447 #endif
00448 {
00449 GRAB_LOCK;
00450 _disallow_removals--;
00451 }
00452 WHACK_RETURN(to_return, request);
00453 }
00454 }
00455 }
00456
00457
00458
00459 #ifdef DEBUG_OCTOPUS_FILTERS
00460 LOG(astring("all filters approved infoton: ") + id.text_form());
00461 #endif
00462
00463
00464 tentacle *found = _tentacles->find(request->classifier());
00465
00466 _molock->unlock();
00467
00468
00469
00470 if (!found) {
00471 #ifdef DEBUG_OCTOPUS
00472 LOG(astring("tentacle not found for: ")
00473 + request->classifier().text_form());
00474 #endif
00475 GRAB_LOCK;
00476 _disallow_removals--;
00477 WHACK_RETURN(tentacle::NOT_FOUND, request);
00478 }
00479
00480
00481 if (!now && found->backgrounding()) {
00482
00483 outcome to_return = found->enqueue(request, id);
00484 GRAB_LOCK;
00485 _disallow_removals--;
00486 return to_return;
00487 } else {
00488
00489 byte_array ignored;
00490 outcome to_return = found->consume(*request, id, ignored);
00491 WHACK(request);
00492 GRAB_LOCK;
00493 _disallow_removals--;
00494 return to_return;
00495 }
00496 }
00497
00498 void octopus::periodic_cleaning()
00499 {
00500
00501 time_stamp next_time;
00502 {
00503 auto_synchronizer l(*_clean_lock);
00504 next_time = *_next_cleaning;
00505 }
00506 if (next_time < time_stamp()) {
00507
00508 _responses->clean_out_deadwood();
00509 auto_synchronizer l(*_clean_lock);
00510
00511 _next_cleaning->reset(OCTOPUS_CHECKING_INTERVAL);
00512 }
00513 }
00514
00515 tentacle *octopus::lock_tentacle(const string_array &tentacle_name)
00516 {
00517 if (!tentacle_name.length()) return NIL;
00518 _molock->lock();
00519 tentacle *found = _tentacles->find(tentacle_name);
00520 if (!found) {
00521 _molock->unlock();
00522 return NIL;
00523 }
00524 return found;
00525 }
00526
00527 octopus_entity octopus::issue_identity()
00528 {
00529 return octopus_entity(*_name, application_configuration::process_id(), _sequencer->next_id(),
00530 _rando->inclusive(0, MAXINT32 / 4));
00531 }
00532
00533 }
00534