octopus.cpp

Go to the documentation of this file.
00001 /*****************************************************************************\
00002 *                                                                             *
00003 *  Name   : octopus                                                           *
00004 *  Author : Chris Koeritz                                                     *
00005 *                                                                             *
00006 *******************************************************************************
00007 * Copyright (c) 2002-$now By Author.  This program is free software; you can  *
00008 * redistribute it and/or modify it under the terms of the GNU General Public  *
00009 * License as published by the Free Software Foundation; either version 2 of   *
00010 * the License or (at your option) any later version.  This is online at:      *
00011 *     http://www.fsf.org/copyleft/gpl.html                                    *
00012 * Please send any updates to: fred@gruntose.com                               *
00013 \*****************************************************************************/
00014 
00015 #include "entity_data_bin.h"
00016 #include "entity_defs.h"
00017 #include "identity_tentacle.h"
00018 #include "infoton.h"
00019 #include "octopus.h"
00020 #include "tentacle.h"
00021 #include "unhandled_request.h"
00022 
00023 #include <basis/astring.h>
00024 #include <basis/mutex.h>
00025 #include <configuration/application_configuration.h>
00026 #include <loggers/critical_events.h>
00027 #include <loggers/program_wide_logger.h>
00028 #include <mathematics/chaos.h>
00029 #include <structures/amorph.h>
00030 #include <structures/string_hash.h>
00031 #include <timely/time_control.h>
00032 #include <timely/time_stamp.h>
00033 
00034 using namespace basis;
00035 using namespace configuration;
00036 using namespace loggers;
00037 using namespace mathematics;
00038 using namespace processes;
00039 using namespace structures;
00040 using namespace timely;
00041 
00042 namespace octopi {
00043 
00044 //#define DEBUG_OCTOPUS
00045   // uncomment for debugging noise.
00046 //#define DEBUG_OCTOPUS_FILTERS
00047   // uncomment for noisy filter processing.
00048 
00049 #undef GRAB_LOCK
00050 #define GRAB_LOCK \
00051   auto_synchronizer l(*_molock)
00052 
00053 // this macro returns a result and deletes the request due to a failure.  it
00054 // stores a response for the request, in case they were expecting one, since
00055 // otherwise they will wait a long time for a response that isn't coming.  if
00056 // those responses are never picked up, they will eventually be cleaned out.
00057 #define WHACK_RETURN(to_ret, to_whack) { \
00058   unhandled_request *bad_response = new unhandled_request(id, \
00059       request->classifier(), to_ret); \
00060   _responses->add_item(bad_response, id); \
00061   WHACK(to_whack); \
00062   return to_ret; \
00063 }
00064 
00065 const int MAXIMUM_TRASH_SIZE = 128 * KILOBYTE;
00066   // this is how much we'll toss out on closing an entity.
00067 
00068 #undef LOG
00069 #define LOG(t) CLASS_EMERGENCY_LOG(program_wide_logger::get(), t)
00070 
00071 const int OCTOPUS_CHECKING_INTERVAL = 4 * MINUTE_ms;
00072   // the frequency in milliseconds of cleaning on the response bin.  this
00073   // doesn't need to happen very often; it only tosses data that has been
00074   // abandoned in the response bin.
00075 
00077 
00078 class filter_list : public array<tentacle *>
00079 {
00080 public:
00081   bool remove(tentacle *to_remove) {
00082     for (int i = 0; i < length(); i++) {
00083       if (get(i) == to_remove) {
00084         zap(i, i);
00085         return true;
00086       }
00087     }
00088     return false;
00089   }
00090 };
00091 
00093 
00094 class tentacle_record 
00095 {
00096 public:
00097   tentacle *_limb;
00098   bool _filter;
00099 
00100   tentacle_record(tentacle *limb, bool filter)
00101       : _limb(limb), _filter(filter) {}
00102 
00103   ~tentacle_record() { WHACK(_limb); }
00104 };
00105 
00107 
00108 class modula_oblongata : public amorph<tentacle_record>
00109 {
00110 public:
00111   modula_oblongata() : amorph<tentacle_record>() {}
00112 
00113   int find_index(const string_array &group) {
00114     for (int i = 0; i < elements(); i++) {
00115       if (borrow(i)->_limb->group().prefix_compare(group))
00116         return i;
00117     }
00118     return common::NOT_FOUND;
00119   }
00120 
00121   tentacle *find(const string_array &group) {
00122     int indy = find_index(group);
00123     if (negative(indy)) return NIL;
00124     return borrow(indy)->_limb;
00125   }
00126 
00127   bool zap(int a, int b) {
00128     outcome ret = amorph<tentacle_record>::zap(a, b);
00129     return ret == common::OKAY;
00130   }
00131 
00132   bool zap(const string_array &group) {
00133     int indy = find_index(group);
00134     if (negative(indy)) return false;
00135     amorph<tentacle_record>::zap(indy, indy);
00136     return true;
00137   }
00138 };
00139 
00141 
00142 octopus::octopus(const astring &name, int max_per_ent)
00143 : _name(new astring(name)),
00144   _tentacles(new modula_oblongata),
00145   _molock(new mutex),
00146   _responses(new entity_data_bin(max_per_ent)),
00147   _disallow_removals(0),
00148   _next_cleaning(new time_stamp(OCTOPUS_CHECKING_INTERVAL)),
00149   _clean_lock(new mutex),
00150   _filters(new filter_list),
00151   _sequencer(new safe_roller(1, MAXINT32 / 2)),
00152   _rando(new chaos)
00153 {
00154   add_tentacle(new identity_tentacle(*this), true);
00155     // register a way to issue identities.  this is a filter.
00156   add_tentacle(new unhandled_request_tentacle(false), false);
00157     // provide a way to unpack the unhandled_request object.
00158 }
00159 
00160 octopus::~octopus()
00161 {
00162 //  FUNCDEF("destructor");
00163   WHACK(_filters);
00164   WHACK(_tentacles);
00165   WHACK(_responses);
00166   WHACK(_next_cleaning);
00167   WHACK(_clean_lock);
00168   WHACK(_name);
00169   WHACK(_molock);
00170   WHACK(_rando);
00171   WHACK(_sequencer);
00172 }
00173 
00174 void octopus::lock_tentacles() { _molock->lock(); }
00175 
00176 void octopus::unlock_tentacles() { _molock->unlock(); }
00177 
00178 entity_data_bin &octopus::responses() { return *_responses; }
00179 
00180 int octopus::locked_tentacle_count() { return _tentacles->elements(); }
00181 
00182 const astring &octopus::name() const { return *_name; }
00183 
00184 tentacle *octopus::locked_get_tentacle(int indy)
00185 { return _tentacles->borrow(indy)->_limb; }
00186 
00187 infoton *octopus::acquire_specific_result(const octopus_request_id &id)
00188 { return _responses->acquire_for_identifier(id); }
00189 
00190 infoton *octopus::acquire_result(const octopus_entity &requester,
00191     octopus_request_id &id)
00192 { return _responses->acquire_for_entity(requester, id); }
00193 
00194 void octopus::unlock_tentacle(tentacle *to_unlock)
00195 {
00196   to_unlock = NIL;
00197   _molock->unlock();
00198 }
00199 
00200 void octopus::expunge(const octopus_entity &to_remove)
00201 {
00202   FUNCDEF("expunge");
00203   {
00204     // temporary lock so we can keep tentacles from evaporating.
00205     GRAB_LOCK;
00206     _disallow_removals++;
00207   }
00208 
00209   // we've now ensured that no tentacles will be removed, so at most the
00210   // list would get longer.  we'll settle on its current length.
00211   int len = _tentacles->elements();
00212   for (int i = 0; i < len; i++) {
00213     tentacle_record *curr = _tentacles->borrow(i);
00214     if (!curr || !curr->_limb) {
00215 //complain... logic error.
00216       continue;
00217     }
00218     // activate the expunge method on the current tentacle.
00219     curr->_limb->expunge(to_remove);
00220   }
00221 
00222   {
00223     // re-enable tentacle removals.
00224     GRAB_LOCK;
00225     _disallow_removals--;
00226   }
00227 
00228   // throw out any data that was waiting for that guy.
00229   int items_found = 1;
00230   infoton_list junk;
00231   while (items_found) {
00232     // grab a chunk of items to be trashed.
00233     items_found = responses().acquire_for_entity(to_remove, junk,
00234         MAXIMUM_TRASH_SIZE);
00235     junk.reset();
00236 //#ifdef DEBUG_OCTOPUS
00237     if (items_found)
00238       LOG(a_sprintf("cleaned %d items for expunged entity ", items_found)
00239           + to_remove.mangled_form());
00240 //#endif
00241   }
00242 
00243 }
00244 
00245 outcome octopus::zap_tentacle(const string_array &tentacle_name)
00246 {
00247   tentacle *found = NIL;
00248   outcome ret = remove_tentacle(tentacle_name, found);
00249   WHACK(found);
00250   return ret;
00251 }
00252 
00253 outcome octopus::add_tentacle(tentacle *to_add, bool filter)
00254 {
00255   FUNCDEF("add_tentacle");
00256   if (!to_add) return tentacle::BAD_INPUT;
00257   if (!to_add->group().length()) return tentacle::BAD_INPUT;
00258   outcome zapped_it = zap_tentacle(to_add->group());
00259   if (zapped_it == tentacle::OKAY) {
00260 //#ifdef DEBUG_OCTOPUS
00261     LOG(astring("removed existing tentacle: ") + to_add->group().text_form());
00262 //#endif
00263   }
00264   GRAB_LOCK;
00265   tentacle *found = _tentacles->find(to_add->group());
00266   // if found is non-NIL, then that would be a serious logic error since
00267   // we just zapped it above.
00268   if (found) return tentacle::ALREADY_EXISTS;
00269   to_add->attach_storage(*_responses);
00270   tentacle_record *new_record = new tentacle_record(to_add, filter);
00271   _tentacles->append(new_record);
00272   if (filter) *_filters += to_add;
00273 #ifdef DEBUG_OCTOPUS
00274   LOG(astring("added tentacle on ") + to_add->group().text_form());
00275 #endif
00276   return tentacle::OKAY;
00277 }
00278 
00279 outcome octopus::remove_tentacle(const string_array &group_name,
00280     tentacle * &free_me)
00281 {
00282   FUNCDEF("remove_tentacle");
00283   free_me = NIL;
00284   if (!group_name.length()) return tentacle::BAD_INPUT;
00285   while (true) {
00286     // repeatedly grab the lock and make sure we're allowed to remove.  if
00287     // we're told we can't remove yet, then we drop the lock again and pause.
00288     _molock->lock();
00289     if (!_disallow_removals) {
00290       // we ARE allowed to remove it right now.  we leave the loop in
00291       // possession of the lock.
00292       break;
00293     }
00294     if (_disallow_removals < 0) {
00295       continuable_error(class_name(), func, "logic error in removal "
00296           "reference counter.");
00297     }
00298     _molock->unlock();
00299     time_control::sleep_ms(0);  // yield thread's execution to another thread.
00300   }
00301   int indy = _tentacles->find_index(group_name);
00302   if (negative(indy)) {
00303     // nope, no match.
00304     _molock->unlock();
00305     return tentacle::NOT_FOUND;
00306   }
00307   // found the match.
00308   tentacle_record *freeing = _tentacles->acquire(indy);
00309   _tentacles->zap(indy, indy);
00310   free_me = freeing->_limb;
00311   _filters->remove(free_me);
00312   _molock->unlock();
00313   freeing->_limb = NIL;
00314   WHACK(freeing);
00315   return tentacle::OKAY;
00316 }
00317 
00318 outcome octopus::restore(const string_array &classifier,
00319     byte_array &packed_form, infoton * &reformed)
00320 {
00321 #ifdef DEBUG_OCTOPUS
00322   FUNCDEF("restore");
00323 #endif
00324   periodic_cleaning();  // freshen up if it's that time.
00325 
00326   reformed = NIL;
00327   if (!classifier.length()) return tentacle::BAD_INPUT;
00328   if (!packed_form.length()) return tentacle::BAD_INPUT;
00329   if (!classifier.length()) return tentacle::BAD_INPUT;
00330   {
00331     // keep anyone from being removed until we're done.
00332     GRAB_LOCK;
00333     _disallow_removals++;
00334   }
00335   tentacle *found = _tentacles->find(classifier);
00336   outcome to_return;
00337   if (!found) {
00338 #ifdef DEBUG_OCTOPUS
00339     LOG(astring("tentacle not found for: ") + classifier.text_form());
00340 #endif
00341     to_return = tentacle::NOT_FOUND;
00342   } else {
00343     to_return = found->reconstitute(classifier, packed_form, reformed);
00344   }
00345   // re-enable tentacle removals.
00346   GRAB_LOCK;
00347   _disallow_removals--;
00348   return to_return;
00349 }
00350 
00351 outcome octopus::evaluate(infoton *request, const octopus_request_id &id,
00352     bool now)
00353 {
00354   FUNCDEF("evaluate");
00355   periodic_cleaning();  // freshen up if it's that time.
00356 
00357   // check that the classifier is well formed.
00358   if (!request->classifier().length()) {
00359 #ifdef DEBUG_OCTOPUS
00360     LOG("failed due to empty classifier.");
00361 #endif
00362     WHACK_RETURN(tentacle::BAD_INPUT, request);
00363   }
00364 
00365   _molock->lock();
00366 
00367   // block tentacle removals while we're working.
00368   _disallow_removals++;
00369 
00370   // ensure that we pass this infoton through all the filters for vetting.
00371   for (int i = 0; i < _filters->length(); i++) {
00372     tentacle *current = (*_filters)[i];
00373 #ifdef DEBUG_OCTOPUS_FILTERS
00374     LOG(a_sprintf("%d: checking ", i + 1) + current->group().text_form());
00375 #endif
00376 
00377     // check if the infoton is addressed specifically by this filter.
00378     bool is_relevant = current->group().prefix_compare(request->classifier());
00379 
00380 #ifdef DEBUG_OCTOPUS_FILTERS
00381     if (is_relevant)
00382       LOG(astring("found it to be relevant!  for ") + id.text_form())
00383     else
00384       LOG(astring("found it to not be relevant.  for ") + id.text_form());
00385 #endif
00386 
00387     // this infoton is _for_ this filter.
00388     _molock->unlock();
00389       // unlock octopus to allow others to operate.
00390 
00391     byte_array transformed;
00392 //hmmm: maybe there should be a separate filter method?
00393     outcome to_return = current->consume(*request, id, transformed);
00394       // pass the infoton into the current filter.
00395 
00396     if (is_relevant) {
00397       // the infoton was _for_ the current filter.  that means that we are
00398       // done processing it now.
00399 #ifdef DEBUG_OCTOPUS_FILTERS
00400       LOG(astring("filter ") + current->group().text_form() + " consumed "
00401           "infoton from " + id.text_form() + " with result "
00402           + tentacle::outcome_name(to_return));
00403 #endif
00404       WHACK(request);
00405       GRAB_LOCK;  // short re-establishment of the lock.
00406       _disallow_removals--;
00407       return to_return;
00408     } else {
00409       // the infoton was vetted by the filter.  make sure it was liked.
00410 #ifdef DEBUG_OCTOPUS_FILTERS
00411       LOG(astring("filter ") + current->group().text_form() + " vetted "
00412           "infoton " + id.text_form() + " with result "
00413           + tentacle::outcome_name(to_return));
00414 #endif
00415       if (to_return == tentacle::PARTIAL) {
00416         // if the infoton is partially complete, then we're allowed to keep
00417         // going.  this outcome means it was not prohibited.
00418 
00419         // make sure they didn't switch it out on us.
00420         if (transformed.length()) {
00421           // we need to substitute the transformed version for the original.
00422           string_array classif;
00423           byte_array decro;  // decrypted packed infoton.
00424           bool worked = infoton::fast_unpack(transformed, classif, decro);
00425           if (!worked) {
00426             LOG("failed to fast_unpack the transformed data.");
00427           } else {
00428             infoton *new_req = NIL;
00429             outcome rest_ret = restore(classif, decro, new_req);
00430             if (rest_ret == tentacle::OKAY) {
00431               // we got a good transformed version.
00432               WHACK(request);
00433               request = new_req;  // substitution complete.
00434             } else {
00435               LOG("failed to restore transformed infoton.");
00436             }
00437           }
00438         }
00439 
00440         _molock->lock();  // get the lock again.
00441         continue;
00442       } else {
00443         // this is a failure to process that object.
00444 #ifdef DEBUG_OCTOPUS_FILTERS
00445         LOG(astring("filter ") + current->group().text_form() + " denied "
00446             "infoton from " + id.text_form());
00447 #endif
00448         {
00449           GRAB_LOCK;  // short re-establishment of the lock.
00450           _disallow_removals--;
00451         }
00452         WHACK_RETURN(to_return, request);
00453       }
00454     }
00455   }
00456 
00457   // if we're here, then the infoton has been approved by all filters.
00458 
00459 #ifdef DEBUG_OCTOPUS_FILTERS
00460   LOG(astring("all filters approved infoton: ") + id.text_form());
00461 #endif
00462 
00463   // locate the appropriate tentacle for this request.
00464   tentacle *found = _tentacles->find(request->classifier());
00465 
00466   _molock->unlock();
00467     // from here in, the octopus itself is not locked up.  but we have sent
00468     // the signal that no one must remove any tentacles for now.
00469 
00470   if (!found) {
00471 #ifdef DEBUG_OCTOPUS
00472     LOG(astring("tentacle not found for: ")
00473         + request->classifier().text_form());
00474 #endif
00475     GRAB_LOCK;  // short re-establishment of the lock.
00476     _disallow_removals--;
00477     WHACK_RETURN(tentacle::NOT_FOUND, request);
00478   }
00479   // make sure they want background execution and that the tentacle can
00480   // support this.
00481   if (!now && found->backgrounding()) {
00482     // pass responsibility over to the tentacle.
00483     outcome to_return = found->enqueue(request, id);
00484     GRAB_LOCK;  // short re-establishment of the lock.
00485     _disallow_removals--;
00486     return to_return;
00487   } else {
00488     // call the tentacle directly.
00489     byte_array ignored;
00490     outcome to_return = found->consume(*request, id, ignored);
00491     WHACK(request);
00492     GRAB_LOCK;  // short re-establishment of the lock.
00493     _disallow_removals--;
00494     return to_return;
00495   }
00496 }
00497 
00498 void octopus::periodic_cleaning()
00499 {
00500 //  FUNCDEF("periodic_cleaning");
00501   time_stamp next_time;
00502   {
00503     auto_synchronizer l(*_clean_lock);
00504     next_time = *_next_cleaning;
00505   }
00506   if (next_time < time_stamp()) {
00507     // the bin locks itself, so we don't need to grab the lock here.
00508     _responses->clean_out_deadwood(); 
00509     auto_synchronizer l(*_clean_lock);
00510       // lock before modifying the time stamp; only one writer.
00511     _next_cleaning->reset(OCTOPUS_CHECKING_INTERVAL);
00512   }
00513 }
00514 
00515 tentacle *octopus::lock_tentacle(const string_array &tentacle_name)
00516 {
00517   if (!tentacle_name.length()) return NIL;
00518   _molock->lock();
00519   tentacle *found = _tentacles->find(tentacle_name);
00520   if (!found) {
00521     _molock->unlock();
00522     return NIL;
00523   }
00524   return found;
00525 }
00526 
00527 octopus_entity octopus::issue_identity()
00528 {
00529   return octopus_entity(*_name, application_configuration::process_id(), _sequencer->next_id(),
00530       _rando->inclusive(0, MAXINT32 / 4));
00531 }
00532 
00533 } //namespace.
00534 
Generated on Sat Jan 28 04:22:43 2012 for hoople2 project by  doxygen 1.6.3