post_office.cpp

Go to the documentation of this file.
00001 #ifndef POST_OFFICE_IMPLEMENTATION_FILE
00002 #define POST_OFFICE_IMPLEMENTATION_FILE
00003 
00004 /*****************************************************************************\
00005 *                                                                             *
00006 *  Name   : post_office                                                       *
00007 *  Author : Chris Koeritz                                                     *
00008 *                                                                             *
00009 *******************************************************************************
00010 * Copyright (c) 1998-$now By Author.  This program is free software; you can  *
00011 * redistribute it and/or modify it under the terms of the GNU General Public  *
00012 * License as published by the Free Software Foundation; either version 2 of   *
00013 * the License or (at your option) any later version.  This is online at:      *
00014 *     http://www.fsf.org/copyleft/gpl.html                                    *
00015 * Please send any updates to: fred@gruntose.com                               *
00016 \*****************************************************************************/
00017 
00018 #include "letter.h"
00019 #include "mailbox.h"
00020 #include "post_office.h"
00021 
00022 #include <basis/mutex.h>
00023 #include <basis/log_base.h>
00024 #include <basis/portable.h>
00025 #include <basis/set.h>
00026 #include <data_struct/amorph.cpp>
00027 #include <data_struct/unique_id.h>
00028 #include <mechanisms/ithread.h>
00029 #include <mechanisms/thread_cabinet.h>
00030 #include <mechanisms/time_stamp.h>
00031 #include <nodes/catalogable.h>
00032 #include <nodes/object_catalog.h>
00033 #include <opsystem/path_configuration.h>
00034 
00035 using namespace nodes;
00036 
00037 //#define DEBUG_POST_OFFICE
00038   // uncomment if you want the noisy version.
00039 
00040 #undef LOG
00041 #define LOG(a) CLASS_EMERGENCY_LOG(program_wide_logger(), a)
00042 
00043 const int CLEANING_INTERVAL = 14 * SECOND_ms;
00044   // the interval between cleaning of extra letters and dead mailboxes.
00045 
00046 const int SNOOZE_TIME_FOR_POSTMAN = 42;
00047   // we'll snooze for this long if absolutely nothing happened during the
00048   // thread's activation.  if things are going on, our snooze time is reduced
00049   // by the length of time we were delivering items.
00050 
00051 const int DELIVERIES_ALLOWED = 350;
00052   // the maximum number of deliveries we'll try to get done per thread run.
00053 
00055 
00056 //hmmm: arrhhh--maybe we need to spawn a thread per postal route.
00057 
00058 class postal_carrier : public ithread
00059 {
00060 public:
00061   postal_carrier(post_office &parent, const unique_int &route)
00062   : ithread(SNOOZE_TIME_FOR_POSTMAN, ithread::SLACK_INTERVAL),
00063     _parent(parent),
00064     _route(route)
00065   {}
00066 
00067   IMPLEMENT_CLASS_NAME("postal_carrier");
00068 
00069   void perform_activity(void *) {
00070     FUNCDEF("perform_activity");
00071     bool finished;
00072     try {
00073       finished = _parent.deliver_mail_on_route(_route, *this); 
00074     } catch(...) {
00075       LOG("caught exception during mail delivery!");
00076     }
00077     if (!finished) {
00078       // not finished delivering all items.
00079       reschedule();
00080     } else {
00081       reschedule(SNOOZE_TIME_FOR_POSTMAN);
00082     }
00083   }
00084 
00085 private:
00086   post_office &_parent;
00087   unique_int _route;
00088 };
00089 
00091 
00092 class postal_cache : public mailbox {};
00093 
00095 
00096 class tagged_mail_stop : public nodes::catalogable
00097 {
00098 public:
00099   mail_stop *_route;
00100   unique_int _thread_id;
00101 
00102   tagged_mail_stop(const unique_int &id = 0, mail_stop *route = NIL,
00103           const unique_int &thread_id = 0)
00104       : catalogable(id), _route(route), _thread_id(thread_id) {}
00105 
00106   IMPLEMENT_CLASS_NAME("tagged_mail_stop");
00107 
00108   virtual istring catalogable_name() const {
00109     return isprintf("%s: id=%d, addr=%08lx, thr_id=%d",
00110         static_class_name(), id().raw_id(), _route, _thread_id.raw_id());
00111   }
00112 };
00113 
00114 class route_map : public object_catalog {};
00115 
00117 
00118 class letter_morph : public amorph<letter> {};
00119 
00121 
00122 post_office::post_office()
00123 : _post(new mailbox),
00124   _routes(new route_map),
00125   _stock_lock(new mutex),
00126   _next_cleaning(new time_stamp),
00127   _threads(new thread_cabinet)
00128 {
00129 }
00130 
00131 post_office::~post_office()
00132 {
00133   stop_serving();
00134   WHACK(_post);
00135   WHACK(_routes);
00136   WHACK(_next_cleaning);
00137   WHACK(_stock_lock);
00138   WHACK(_threads);
00139 }
00140 
00141 void post_office::show_routes(istring &to_fill)
00142 {
00143 //hmmm: simplify this; just use the int_set returning func and print that.
00144   objcat_iterator *iter = _routes->find_head();
00145   istring current_line;
00146   istring temp;
00147   if (_routes->elements(*iter))
00148     to_fill += istring("Mail Delivery Routes:") + log_base::platform_ending();
00149 
00150   for ( ; !iter->is_tail(); iter->next()) {
00151     const tagged_mail_stop *tag
00152         = dynamic_cast<const tagged_mail_stop *>(iter->cat());
00153     if (!tag) continue;
00154 
00155     temp = istring(istring::SPRINTF, "%d ", tag->id().raw_id());
00156     if (current_line.length() + temp.length() >= 80) {
00157       current_line += log_base::platform_ending();
00158       to_fill += current_line;
00159       current_line.reset();
00160     }
00161     current_line += temp;
00162   }
00163   _routes->iter_unlock(iter);
00164   // catch the last line we created.
00165   if (!!current_line) to_fill += current_line;
00166 }
00167 
00168 void post_office::stop_serving() { if (_threads) _threads->stop_all(); }
00169 
00170 void post_office::show_mail(istring &output)
00171 {
00172   output.reset();
00173   output += log_base::platform_ending();
00174   output += istring("Mailbox Contents at ") + utility::timestamp(true, true)
00175       + log_base::platform_ending() + log_base::platform_ending();
00176   istring box_state;
00177   _post->show(box_state); 
00178   if (box_state.t()) output += box_state;
00179   else
00180     output += istring("No items are awaiting delivery.")
00181         + log_base::platform_ending();
00182 }
00183 
00184 void post_office::drop_off(const unique_int &id, letter *package)
00185 {
00186   FUNCDEF("drop_off");
00187 #ifdef DEBUG_POST_OFFICE
00188   LOG(istring(istring::SPRINTF, "mailbox drop for %d: ", id)
00189       + package->text_form());
00190 #endif
00191   _post->drop_off(id, package); 
00192 #ifdef DEBUG_POST_OFFICE
00193   if (!route_listed(id)) {
00194     LOG(isprintf("letter for %d has no route!", id));
00195   }
00196 #endif
00197 }
00198 
00199 bool post_office::pick_up(const unique_int &id, letter * &package)
00200 {
00201   FUNCDEF("pick_up");
00202   bool to_return = _post->pick_up(id, package); 
00203 #ifdef DEBUG_POST_OFFICE
00204   if (to_return)
00205     LOG(istring(istring::SPRINTF, "mailbox grab for %d: ", id)
00206         + package->text_form());
00207 #endif
00208   return to_return;
00209 }
00210 
00211 bool post_office::route_listed(const unique_int &id)
00212 {
00213   int_set route_set;
00214   get_route_list(route_set);
00215   return route_set.member(id.raw_id());
00216 }
00217 
00218 void post_office::get_route_list(int_set &route_set)
00219 {
00220   // gather the set of routes that we should carry mail to.
00221   route_set.reset();
00222 
00223   objcat_iterator *iter = _routes->find_head();
00224   if (!_routes->elements(*iter)) {
00225     // if there are no elements, why bother iterating?
00226     _routes->iter_unlock(iter);
00227     return;
00228   }
00229 
00230   for ( ; !iter->is_tail(); iter->next()) {
00231     const tagged_mail_stop *tag
00232         = dynamic_cast<const tagged_mail_stop *>(iter->cat());
00233     if (!tag) continue;
00234     route_set.add(tag->id().raw_id());
00235   }
00236   _routes->iter_unlock(iter);
00237 }
00238 
00239 void post_office::clean_package_list(post_office &formal(post),
00240     letter_morph &to_clean)
00241 {
00242   FUNCDEF("clean_package_list");
00243   // recycle all the stuff we had in the list.
00244   while (to_clean.elements()) {
00245     letter *package = to_clean.acquire(0);
00246     to_clean.zap(0, 0);
00247     if (!package) {
00248       LOG("saw empty package in list to clean!");
00249       continue;
00250     }
00251     WHACK(package);
00252   }
00253 }
00254 
00255 bool post_office::deliver_mail_on_route(const unique_int &route,
00256     ithread &carrier)
00257 {
00258   FUNCDEF("deliver_mail_on_route");
00259 #ifdef DEBUG_POST_OFFICE
00260   time_stamp enter;
00261 #endif
00262   if (carrier.should_stop()) return true;  // get out if thread was told to.
00263 
00264   int deliveries = 0;  // number of items delivered so far.
00265   letter_morph items_for_route;
00266     // holds the items that need to be sent to this route.
00267 
00268   // pickup all of the mail that we can for this route.
00269   while (deliveries < DELIVERIES_ALLOWED) {
00270     if (carrier.should_stop())
00271       return true;  // get out if thread was told to.
00272     letter *package;
00273     if (!_post->pick_up(route, package)) {
00274       // there are no more letters for this route.
00275       break;  // skip out of the loop.
00276     }
00277     deliveries++;  // count this item as a delivery.
00278     items_for_route.append(package);
00279   }
00280 
00281   if (!items_for_route.elements()) return true;  // nothing to handle.
00282 
00283   // locate the destination for this route.
00284   objcat_iterator *iter = _routes->find(route);  // find the route.
00285   if (!iter) {
00286     // we failed to find the route we wanted...
00287     LOG(istring(istring::SPRINTF, "route %d disappeared!", route.raw_id()));
00288     clean_package_list(*this, items_for_route);
00289     return true;
00290   }
00291 
00292   objcat_isolater *iso = _routes->separate(iter);
00293   if (!iso) {
00294     // some kind of bad error has occurred.
00295     LOG(isprintf("failed to separate route %d!", route.raw_id()));
00296     clean_package_list(*this, items_for_route);
00297     _routes->iter_unlock(iter);
00298     return true;
00299   }
00300   tagged_mail_stop *real_route = dynamic_cast<tagged_mail_stop *>(&iso->cat());
00301   if (!real_route) {
00302     // another kind of bad error has occurred.
00303     LOG(isprintf("route was wrong type for %d!", route.raw_id()));
00304     _routes->rejoin(iso);  // return the lock on this route.
00305     clean_package_list(*this, items_for_route);
00306     return true;
00307   }
00308 
00309   // now deliver what we found for this route.
00310   for (int t = 0; t < items_for_route.elements(); t++) {
00311     if (carrier.should_stop()) {
00312       // get out if thread was told to.
00313       _routes->rejoin(iso);  // return the lock on this route.
00314       return true;
00315     }
00316     letter *package = items_for_route.acquire(t);
00317     // hand the package out on the route.
00318     mail_stop::items_to_deliver pack(route, package);
00319     real_route->_route->invoke_callback(pack);
00320       // the callee is responsible for cleaning up.
00321   }
00322 
00323   _routes->rejoin(iso);  // return the lock on this route.
00324 
00325   bool finished_all = (deliveries < DELIVERIES_ALLOWED);
00326     // true if we handled everything we could have.
00327 
00328   if (carrier.should_stop()) return true;  // get out if thread was told to.
00329 
00330   // this bit is for the post office at large, but we don't want an extra
00331   // thread when we've got all these others handy.
00332   bool cleaning_time = false;
00333   {
00334     // short lock to look at the time for next cleaning.
00335     auto_synchronizer l(*_stock_lock);
00336     cleaning_time = time_stamp() > *_next_cleaning;
00337   }
00338   if (cleaning_time) {
00339     _post->clean_up();  // get rid of dead mailboxes in main post office.
00340     _next_cleaning->reset(CLEANING_INTERVAL);
00341   }
00342 
00343   time_stamp exit;
00344 #ifdef DEBUG_POST_OFFICE
00345   int duration = int(exit.value() - enter.value());
00346   if (duration > 20)
00347     LOG(isprintf("deliveries took %d ms.", duration));
00348 #endif
00349   return finished_all;
00350 }
00351 
00352 bool post_office::register_route(const unique_int &id,
00353     mail_stop &carrier_path)
00354 {
00355   objcat_iterator *iter = _routes->find(id);
00356   if (iter) {
00357     _routes->iter_unlock(iter);
00358     return false;  // already exists.
00359   }
00360   postal_carrier *po = new postal_carrier(*this, id);
00361   unique_int thread_id = _threads->add_thread(po, false, NIL);
00362     // add the thread so we can record its id.
00363   tagged_mail_stop *new_stop = new tagged_mail_stop(id, &carrier_path,
00364       thread_id);
00365   _routes->add(new_stop);
00366     // add the mail stop to our listings.
00367   po->start(NIL);
00368     // now start the thread so it can begin cranking.
00369   return true;
00370 }
00371 
00372 bool post_office::unregister_route(const unique_int &id)
00373 {
00374   objcat_iterator *iter = _routes->find(id);
00375   if (!iter) return false;  // doesn't exist yet.
00376   unique_int thread_id = 0;
00377   const tagged_mail_stop *tag
00378       = dynamic_cast<const tagged_mail_stop *>(iter->cat());
00379   if (tag) thread_id = tag->_thread_id;
00380     // if the "tag" is nil, then something is badly wrong.
00381   _routes->iter_unlock(iter);
00382   _routes->zap_id(id);
00383   _threads->zap_thread(thread_id);
00384   return true;
00385 }
00386 
00387 
00388 #endif //POST_OFFICE_IMPLEMENTATION_FILE
00389 

Generated on Fri Nov 28 04:29:30 2008 for HOOPLE Libraries by  doxygen 1.5.1