octopus.cpp

Go to the documentation of this file.
00001 #ifndef OCTOPUS_IMPLEMENTATION_FILE
00002 #define OCTOPUS_IMPLEMENTATION_FILE
00003 
00004 /*****************************************************************************\
00005 *                                                                             *
00006 *  Name   : octopus                                                           *
00007 *  Author : Chris Koeritz                                                     *
00008 *                                                                             *
00009 *******************************************************************************
00010 * Copyright (c) 2002-$now By Author.  This program is free software; you can  *
00011 * redistribute it and/or modify it under the terms of the GNU General Public  *
00012 * License as published by the Free Software Foundation; either version 2 of   *
00013 * the License or (at your option) any later version.  This is online at:      *
00014 *     http://www.fsf.org/copyleft/gpl.html                                    *
00015 * Please send any updates to: fred@gruntose.com                               *
00016 \*****************************************************************************/
00017 
00018 #include "entity_data_bin.h"
00019 #include "entity_defs.h"
00020 #include "identity_tentacle.h"
00021 #include "infoton.h"
00022 #include "octopus.h"
00023 #include "tentacle.h"
00024 #include "unhandled_request.h"
00025 
00026 #include <basis/chaos.h>
00027 #include <basis/istring.h>
00028 #include <basis/log_base.h>
00029 #include <basis/mutex.h>
00030 #include <basis/portable.h>
00031 #include <data_struct/amorph.cpp>
00032 #include <data_struct/string_hash.cpp>
00033 #include <mechanisms/safe_roller.h>
00034 #include <mechanisms/time_stamp.h>
00035 
00036 //#define DEBUG_OCTOPUS
00037   // uncomment for debugging noise.
00038 //#define DEBUG_OCTOPUS_FILTERS
00039   // uncomment for noisy filter processing.
00040 
00041 #undef GRAB_LOCK
00042 #define GRAB_LOCK \
00043   auto_synchronizer l(*_molock)
00044 
00045 // this macro returns a result and deletes the request due to a failure.  it
00046 // stores a response for the request, in case they were expecting one, since
00047 // otherwise they will wait a long time for a response that isn't coming.  if
00048 // those responses are never picked up, they will eventually be cleaned out.
00049 #define WHACK_RETURN(to_ret, to_whack) { \
00050   unhandled_request *bad_response = new unhandled_request(id, \
00051       request->classifier(), to_ret); \
00052   _responses->add_item(bad_response, id); \
00053   WHACK(to_whack); \
00054   return to_ret; \
00055 }
00056 
00057 const int MAXIMUM_TRASH_SIZE = 128 * KILOBYTE;
00058   // this is how much we'll toss out on closing an entity.
00059 
00060 #undef LOG
00061 #define LOG(t) CLASS_EMERGENCY_LOG(program_wide_logger(), t)
00062 
00063 const int OCTOPUS_CHECKING_INTERVAL = 4 * MINUTE_ms;
00064   // the frequency in milliseconds of cleaning on the response bin.  this
00065   // doesn't need to happen very often; it only tosses data that has been
00066   // abandoned in the response bin.
00067 
00069 
00070 class filter_list : public array<tentacle *>
00071 {
00072 public:
00073   bool remove(tentacle *to_remove) {
00074     for (int i = 0; i < length(); i++) {
00075       if (get(i) == to_remove) {
00076         zap(i, i);
00077         return true;
00078       }
00079     }
00080     return false;
00081   }
00082 };
00083 
00085 
00086 class tentacle_record 
00087 {
00088 public:
00089   tentacle *_limb;
00090   bool _filter;
00091 
00092   tentacle_record(tentacle *limb, bool filter)
00093       : _limb(limb), _filter(filter) {}
00094 
00095   ~tentacle_record() { WHACK(_limb); }
00096 };
00097 
00099 
00100 class modula_oblongata : public amorph<tentacle_record>
00101 {
00102 public:
00103   modula_oblongata() : amorph<tentacle_record>() {}
00104 
00105   int find_index(const string_array &group) {
00106     for (int i = 0; i < elements(); i++) {
00107       if (borrow(i)->_limb->group().prefix_compare(group))
00108         return i;
00109     }
00110     return common::NOT_FOUND;
00111   }
00112 
00113   tentacle *find(const string_array &group) {
00114     int indy = find_index(group);
00115     if (negative(indy)) return NIL;
00116     return borrow(indy)->_limb;
00117   }
00118 
00119   bool zap(int a, int b) {
00120     outcome ret = amorph<tentacle_record>::zap(a, b);
00121     return ret == common::OKAY;
00122   }
00123 
00124   bool zap(const string_array &group) {
00125     int indy = find_index(group);
00126     if (negative(indy)) return false;
00127     amorph<tentacle_record>::zap(indy, indy);
00128     return true;
00129   }
00130 };
00131 
00133 
00134 octopus::octopus(const istring &name, int max_per_ent)
00135 : _name(new istring(name)),
00136   _tentacles(new modula_oblongata),
00137   _molock(new mutex),
00138   _responses(new entity_data_bin(max_per_ent)),
00139   _disallow_removals(0),
00140   _next_cleaning(new time_stamp(OCTOPUS_CHECKING_INTERVAL)),
00141   _clean_lock(new mutex),
00142   _filters(new filter_list),
00143   _sequencer(new safe_roller(1, MAXINT / 2)),
00144   _rando(new chaos)
00145 {
00146   add_tentacle(new identity_tentacle(*this), true);
00147     // register a way to issue identities.  this is a filter.
00148   add_tentacle(new unhandled_request_tentacle(false), false);
00149     // provide a way to unpack the unhandled_request object.
00150 }
00151 
00152 octopus::~octopus()
00153 {
00154   FUNCDEF("destructor");
00155   WHACK(_filters);
00156   WHACK(_tentacles);
00157   WHACK(_responses);
00158   WHACK(_next_cleaning);
00159   WHACK(_clean_lock);
00160   WHACK(_name);
00161   WHACK(_molock);
00162   WHACK(_rando);
00163   WHACK(_sequencer);
00164 }
00165 
00166 void octopus::lock_tentacles() { _molock->lock(); }
00167 
00168 void octopus::unlock_tentacles() { _molock->unlock(); }
00169 
00170 entity_data_bin &octopus::responses() { return *_responses; }
00171 
00172 int octopus::locked_tentacle_count() { return _tentacles->elements(); }
00173 
00174 const istring &octopus::name() const { return *_name; }
00175 
00176 tentacle *octopus::locked_get_tentacle(int indy)
00177 { return _tentacles->borrow(indy)->_limb; }
00178 
00179 infoton *octopus::acquire_specific_result(const octopus_request_id &id)
00180 { return _responses->acquire_for_identifier(id); }
00181 
00182 infoton *octopus::acquire_result(const octopus_entity &requester,
00183     octopus_request_id &id)
00184 { return _responses->acquire_for_entity(requester, id); }
00185 
00186 void octopus::unlock_tentacle(tentacle *to_unlock)
00187 {
00188   to_unlock = NIL;
00189   _molock->unlock();
00190 }
00191 
00192 void octopus::expunge(const octopus_entity &to_remove)
00193 {
00194   FUNCDEF("expunge");
00195   {
00196     // temporary lock so we can keep tentacles from evaporating.
00197     GRAB_LOCK;
00198     _disallow_removals++;
00199   }
00200 
00201   // we've now ensured that no tentacles will be removed, so at most the
00202   // list would get longer.  we'll settle on its current length.
00203   int len = _tentacles->elements();
00204   for (int i = 0; i < len; i++) {
00205     tentacle_record *curr = _tentacles->borrow(i);
00206     if (!curr || !curr->_limb) {
00207 //complain... logic error.
00208       continue;
00209     }
00210     // activate the expunge method on the current tentacle.
00211     curr->_limb->expunge(to_remove);
00212   }
00213 
00214   {
00215     // re-enable tentacle removals.
00216     GRAB_LOCK;
00217     _disallow_removals--;
00218   }
00219 
00220   // throw out any data that was waiting for that guy.
00221   int items_found = 1;
00222   infoton_list junk;
00223   while (items_found) {
00224     // grab a chunk of items to be trashed.
00225     items_found = responses().acquire_for_entity(to_remove, junk,
00226         MAXIMUM_TRASH_SIZE);
00227     junk.reset();
00228 //#ifdef DEBUG_OCTOPUS
00229     if (items_found)
00230       LOG(isprintf("cleaned %d items for expunged entity ", items_found)
00231           + to_remove.mangled_form());
00232 //#endif
00233   }
00234 
00235 }
00236 
00237 outcome octopus::zap_tentacle(const string_array &tentacle_name)
00238 {
00239   tentacle *found = NIL;
00240   outcome ret = remove_tentacle(tentacle_name, found);
00241   WHACK(found);
00242   return ret;
00243 }
00244 
00245 outcome octopus::add_tentacle(tentacle *to_add, bool filter)
00246 {
00247   FUNCDEF("add_tentacle");
00248   if (!to_add) return tentacle::BAD_INPUT;
00249   if (!to_add->group().length()) return tentacle::BAD_INPUT;
00250   outcome zapped_it = zap_tentacle(to_add->group());
00251   if (zapped_it == tentacle::OKAY) {
00252 //#ifdef DEBUG_OCTOPUS
00253     LOG(istring("removed existing tentacle: ") + to_add->group().text_form());
00254 //#endif
00255   }
00256   GRAB_LOCK;
00257   tentacle *found = _tentacles->find(to_add->group());
00258   // if found is non-NIL, then that would be a serious logic error since
00259   // we just zapped it above.
00260   if (found) return tentacle::ALREADY_EXISTS;
00261   to_add->attach_storage(*_responses);
00262   tentacle_record *new_record = new tentacle_record(to_add, filter);
00263   _tentacles->append(new_record);
00264   if (filter) *_filters += to_add;
00265 #ifdef DEBUG_OCTOPUS
00266   LOG(istring("added tentacle on ") + to_add->group().text_form());
00267 #endif
00268   return tentacle::OKAY;
00269 }
00270 
00271 outcome octopus::remove_tentacle(const string_array &group_name,
00272     tentacle * &free_me)
00273 {
00274   FUNCDEF("remove_tentacle");
00275   free_me = NIL;
00276   if (!group_name.length()) return tentacle::BAD_INPUT;
00277   while (true) {
00278     // repeatedly grab the lock and make sure we're allowed to remove.  if
00279     // we're told we can't remove yet, then we drop the lock again and pause.
00280     _molock->lock();
00281     if (!_disallow_removals) {
00282       // we ARE allowed to remove it right now.  we leave the loop in
00283       // possession of the lock.
00284       break;
00285     }
00286     if (_disallow_removals < 0) {
00287       continuable_error(class_name(), func, "logic error in removal "
00288           "reference counter.");
00289     }
00290     _molock->unlock();
00291     portable::sleep_ms(0);  // yield thread's execution to another thread.
00292   }
00293   int indy = _tentacles->find_index(group_name);
00294   if (negative(indy)) {
00295     // nope, no match.
00296     _molock->unlock();
00297     return tentacle::NOT_FOUND;
00298   }
00299   // found the match.
00300   tentacle_record *freeing = _tentacles->acquire(indy);
00301   _tentacles->zap(indy, indy);
00302   free_me = freeing->_limb;
00303   _filters->remove(free_me);
00304   _molock->unlock();
00305   freeing->_limb = NIL;
00306   WHACK(freeing);
00307   return tentacle::OKAY;
00308 }
00309 
00310 outcome octopus::restore(const string_array &classifier,
00311     byte_array &packed_form, infoton * &reformed)
00312 {
00313   FUNCDEF("restore");
00314   periodic_cleaning();  // freshen up if it's that time.
00315 
00316   reformed = NIL;
00317   if (!classifier.length()) return tentacle::BAD_INPUT;
00318   if (!packed_form.length()) return tentacle::BAD_INPUT;
00319   if (!classifier.length()) return tentacle::BAD_INPUT;
00320   {
00321     // keep anyone from being removed until we're done.
00322     GRAB_LOCK;
00323     _disallow_removals++;
00324   }
00325   tentacle *found = _tentacles->find(classifier);
00326   outcome to_return;
00327   if (!found) {
00328 #ifdef DEBUG_OCTOPUS
00329     LOG(istring("tentacle not found for: ") + classifier.text_form());
00330 #endif
00331     to_return = tentacle::NOT_FOUND;
00332   } else {
00333     to_return = found->reconstitute(classifier, packed_form, reformed);
00334   }
00335   // re-enable tentacle removals.
00336   GRAB_LOCK;
00337   _disallow_removals--;
00338   return to_return;
00339 }
00340 
00341 outcome octopus::evaluate(infoton *request, const octopus_request_id &id,
00342     bool now)
00343 {
00344   FUNCDEF("evaluate");
00345   periodic_cleaning();  // freshen up if it's that time.
00346 
00347   // check that the classifier is well formed.
00348   if (!request->classifier().length()) {
00349 #ifdef DEBUG_OCTOPUS
00350     LOG("failed due to empty classifier.");
00351 #endif
00352     WHACK_RETURN(tentacle::BAD_INPUT, request);
00353   }
00354 
00355   _molock->lock();
00356 
00357   // block tentacle removals while we're working.
00358   _disallow_removals++;
00359 
00360   // ensure that we pass this infoton through all the filters for vetting.
00361   for (int i = 0; i < _filters->length(); i++) {
00362     tentacle *current = (*_filters)[i];
00363 #ifdef DEBUG_OCTOPUS_FILTERS
00364     LOG(isprintf("%d: checking ", i + 1) + current->group().text_form());
00365 #endif
00366 
00367     // check if the infoton is addressed specifically by this filter.
00368     bool is_relevant = current->group().prefix_compare(request->classifier());
00369 
00370 #ifdef DEBUG_OCTOPUS_FILTERS
00371     if (is_relevant)
00372       LOG(istring("found it to be relevant!  for ") + id.text_form())
00373     else
00374       LOG(istring("found it to not be relevant.  for ") + id.text_form());
00375 #endif
00376 
00377     // this infoton is _for_ this filter.
00378     _molock->unlock();
00379       // unlock octopus to allow others to operate.
00380 
00381     byte_array transformed;
00382 //hmmm: maybe there should be a separate filter method?
00383     outcome to_return = current->consume(*request, id, transformed);
00384       // pass the infoton into the current filter.
00385 
00386     if (is_relevant) {
00387       // the infoton was _for_ the current filter.  that means that we are
00388       // done processing it now.
00389 #ifdef DEBUG_OCTOPUS_FILTERS
00390       LOG(istring("filter ") + current->group().text_form() + " consumed "
00391           "infoton from " + id.text_form() + " with result "
00392           + tentacle::outcome_name(to_return));
00393 #endif
00394       WHACK(request);
00395       GRAB_LOCK;  // short re-establishment of the lock.
00396       _disallow_removals--;
00397       return to_return;
00398     } else {
00399       // the infoton was vetted by the filter.  make sure it was liked.
00400 #ifdef DEBUG_OCTOPUS_FILTERS
00401       LOG(istring("filter ") + current->group().text_form() + " vetted "
00402           "infoton " + id.text_form() + " with result "
00403           + tentacle::outcome_name(to_return));
00404 #endif
00405       if (to_return == tentacle::PARTIAL) {
00406         // if the infoton is partially complete, then we're allowed to keep
00407         // going.  this outcome means it was not prohibited.
00408 
00409         // make sure they didn't switch it out on us.
00410         if (transformed.length()) {
00411           // we need to substitute the transformed version for the original.
00412           string_array classif;
00413           byte_array decro;  // decrypted packed infoton.
00414           bool worked = infoton::fast_unpack(transformed, classif, decro);
00415           if (!worked) {
00416             LOG("failed to fast_unpack the transformed data.");
00417           } else {
00418             infoton *new_req = NIL;
00419             outcome rest_ret = restore(classif, decro, new_req);
00420             if (rest_ret == tentacle::OKAY) {
00421               // we got a good transformed version.
00422               WHACK(request);
00423               request = new_req;  // substitution complete.
00424             } else {
00425               LOG("failed to restore transformed infoton.");
00426             }
00427           }
00428         }
00429 
00430         _molock->lock();  // get the lock again.
00431         continue;
00432       } else {
00433         // this is a failure to process that object.
00434 #ifdef DEBUG_OCTOPUS_FILTERS
00435         LOG(istring("filter ") + current->group().text_form() + " denied "
00436             "infoton from " + id.text_form());
00437 #endif
00438         {
00439           GRAB_LOCK;  // short re-establishment of the lock.
00440           _disallow_removals--;
00441         }
00442         WHACK_RETURN(to_return, request);
00443       }
00444     }
00445   }
00446 
00447   // if we're here, then the infoton has been approved by all filters.
00448 
00449 #ifdef DEBUG_OCTOPUS_FILTERS
00450   LOG(istring("all filters approved infoton: ") + id.text_form());
00451 #endif
00452 
00453   // locate the appropriate tentacle for this request.
00454   tentacle *found = _tentacles->find(request->classifier());
00455 
00456   _molock->unlock();
00457     // from here in, the octopus itself is not locked up.  but we have sent
00458     // the signal that no one must remove any tentacles for now.
00459 
00460   if (!found) {
00461 #ifdef DEBUG_OCTOPUS
00462     LOG(istring("tentacle not found for: ")
00463         + request->classifier().text_form());
00464 #endif
00465     GRAB_LOCK;  // short re-establishment of the lock.
00466     _disallow_removals--;
00467     WHACK_RETURN(tentacle::NOT_FOUND, request);
00468   }
00469   // make sure they want background execution and that the tentacle can
00470   // support this.
00471   if (!now && found->backgrounding()) {
00472     // pass responsibility over to the tentacle.
00473     outcome to_return = found->enqueue(request, id);
00474     GRAB_LOCK;  // short re-establishment of the lock.
00475     _disallow_removals--;
00476     return to_return;
00477   } else {
00478     // call the tentacle directly.
00479     byte_array ignored;
00480     outcome to_return = found->consume(*request, id, ignored);
00481     WHACK(request);
00482     GRAB_LOCK;  // short re-establishment of the lock.
00483     _disallow_removals--;
00484     return to_return;
00485   }
00486 }
00487 
00488 void octopus::periodic_cleaning()
00489 {
00490   FUNCDEF("periodic_cleaning");
00491   time_stamp next_time;
00492   {
00493     auto_synchronizer l(*_clean_lock);
00494     next_time = *_next_cleaning;
00495   }
00496   if (next_time < time_stamp()) {
00497     // the bin locks itself, so we don't need to grab the lock here.
00498     _responses->clean_out_deadwood(); 
00499     auto_synchronizer l(*_clean_lock);
00500       // lock before modifying the time stamp; only one writer.
00501     _next_cleaning->reset(OCTOPUS_CHECKING_INTERVAL);
00502   }
00503 }
00504 
00505 tentacle *octopus::lock_tentacle(const string_array &tentacle_name)
00506 {
00507   if (!tentacle_name.length()) return NIL;
00508   _molock->lock();
00509   tentacle *found = _tentacles->find(tentacle_name);
00510   if (!found) {
00511     _molock->unlock();
00512     return NIL;
00513   }
00514   return found;
00515 }
00516 
00517 octopus_entity octopus::issue_identity()
00518 {
00519   return octopus_entity(*_name, portable::process_id(), _sequencer->next_id(),
00520       _rando->inclusive(0, MAXINT / 4));
00521 }
00522 
00523 
00524 #endif //OCTOPUS_IMPLEMENTATION_FILE
00525 

Generated on Fri Aug 29 04:28:59 2008 for HOOPLE Libraries by  doxygen 1.5.1