post_office.cpp

Go to the documentation of this file.
00001 /*****************************************************************************\
00002 *                                                                             *
00003 *  Name   : post_office                                                       *
00004 *  Author : Chris Koeritz                                                     *
00005 *                                                                             *
00006 *******************************************************************************
00007 * Copyright (c) 1998-$now By Author.  This program is free software; you can  *
00008 * redistribute it and/or modify it under the terms of the GNU General Public  *
00009 * License as published by the Free Software Foundation; either version 2 of   *
00010 * the License or (at your option) any later version.  This is online at:      *
00011 *     http://www.fsf.org/copyleft/gpl.html                                    *
00012 * Please send any updates to: fred@gruntose.com                               *
00013 \*****************************************************************************/
00014 
00015 #include "ethread.h"
00016 #include "letter.h"
00017 #include "mailbox.h"
00018 #include "post_office.h"
00019 #include "thread_cabinet.h"
00020 
00021 #include <basis/mutex.h>
00022 #include <configuration/application_configuration.h>
00023 #include <loggers/program_wide_logger.h>
00024 #include <structures/set.h>
00025 #include <structures/amorph.h>
00026 #include <structures/unique_id.h>
00027 #include <textual/parser_bits.h>
00028 #include <timely/time_stamp.h>
00029 
00030 using namespace basis;
00031 using namespace configuration;
00032 using namespace loggers;
00033 using namespace structures;
00034 using namespace textual;
00035 using namespace timely;
00036 
00037 namespace processes {
00038 
00039 //#define DEBUG_POST_OFFICE
00040   // uncomment if you want the noisy version.
00041 
00042 #undef LOG
00043 #define LOG(a) CLASS_EMERGENCY_LOG(program_wide_logger::get(), a)
00044 
00045 const int CLEANING_INTERVAL = 14 * SECOND_ms;
00046   // the interval between cleaning of extra letters and dead mailboxes.
00047 
00048 const int SNOOZE_TIME_FOR_POSTMAN = 42;
00049   // we'll snooze for this long if absolutely nothing happened during the
00050   // thread's activation.  if things are going on, our snooze time is reduced
00051   // by the length of time we were delivering items.
00052 
00053 const int DELIVERIES_ALLOWED = 350;
00054   // the maximum number of deliveries we'll try to get done per thread run.
00055 
00057 
00058 //hmmm: arrhhh--maybe we need to spawn a thread per postal route.
00059 
00060 class postal_carrier : public ethread
00061 {
00062 public:
00063   postal_carrier(post_office &parent, const unique_int &route)
00064   : ethread(SNOOZE_TIME_FOR_POSTMAN, ethread::SLACK_INTERVAL),
00065     _parent(parent),
00066     _route(route)
00067   {}
00068 
00069   DEFINE_CLASS_NAME("postal_carrier");
00070 
00071   void perform_activity(void *) {
00072     FUNCDEF("perform_activity");
00073     bool finished;
00074     try {
00075       finished = _parent.deliver_mail_on_route(_route, *this); 
00076     } catch(...) {
00077       LOG(astring("caught exception during mail delivery!"));
00078     }
00079     if (!finished) {
00080       // not finished delivering all items.
00081       reschedule();
00082     } else {
00083       reschedule(SNOOZE_TIME_FOR_POSTMAN);
00084     }
00085   }
00086 
00087 private:
00088   post_office &_parent;
00089   unique_int _route;
00090 };
00091 
00093 
00094 class postal_cache : public mailbox {};
00095 
00097 
00098 class tagged_mail_stop : public virtual text_formable
00099 {
00100 public:
00101   mail_stop *_route;
00102   unique_int _thread_id;
00103   unique_int _id;
00104 
00105   tagged_mail_stop(const unique_int &id = 0, mail_stop *route = NIL,
00106           const unique_int &thread_id = 0)
00107       : _route(route), _thread_id(thread_id), _id(id) {}
00108 
00109   DEFINE_CLASS_NAME("tagged_mail_stop");
00110 
00111   virtual void text_form(basis::base_string &fill) const {
00112     fill.assign(text_form());
00113   }
00114 
00115   virtual astring text_form() const {
00116     return a_sprintf("%s: id=%d, addr=%08lx, thr_id=%d",
00117         static_class_name(), _id.raw_id(), _route, _thread_id.raw_id());
00118   }
00119 };
00120 
00122 
00123 class route_map : public amorph<tagged_mail_stop>
00124 {
00125 public:
00126   tagged_mail_stop *find(const unique_int &id) {
00127     for (int i = 0; i < elements(); i++) {
00128       tagged_mail_stop *curr = borrow(i);
00129       if (curr && (curr->_id == id)) return curr;
00130     }
00131     return NIL;
00132   }
00133 
00134   bool zap(const unique_int &id) {
00135     for (int i = 0; i < elements(); i++) {
00136       tagged_mail_stop *curr = borrow(i);
00137       if (curr && (curr->_id == id)) {
00138         amorph<tagged_mail_stop>::zap(i, i);
00139         return true;
00140       }
00141     }
00142     return false;
00143   }
00144 
00145 };
00146 
00148 
00149 class letter_morph : public amorph<letter> {};
00150 
00152 
00153 post_office::post_office()
00154 : _post(new mailbox),
00155   _routes(new route_map),
00156   _next_cleaning(new time_stamp),
00157   _threads(new thread_cabinet)
00158 {
00159 }
00160 
00161 post_office::~post_office()
00162 {
00163   stop_serving();
00164   WHACK(_post);
00165   WHACK(_routes);
00166   WHACK(_next_cleaning);
00167   WHACK(_threads);
00168 }
00169 
00170 void post_office::show_routes(astring &to_fill)
00171 {
00172   auto_synchronizer l(c_mutt);
00173 //hmmm: simplify this; just use the int_set returning func and print that.
00174   astring current_line;
00175   astring temp;
00176   if (_routes->elements())
00177     to_fill += astring("Mail Delivery Routes:") + parser_bits::platform_eol_to_chars();
00178 
00179   for (int i = 0; i < _routes->elements(); i++) {
00180     const tagged_mail_stop *tag = _routes->get(i);
00181     if (!tag) continue;
00182     temp = astring(astring::SPRINTF, "%d ", tag->_id.raw_id());
00183     if (current_line.length() + temp.length() >= 80) {
00184       current_line += parser_bits::platform_eol_to_chars();
00185       to_fill += current_line;
00186       current_line.reset();
00187     }
00188     current_line += temp;
00189   }
00190   // catch the last line we created.
00191   if (!!current_line) to_fill += current_line;
00192 }
00193 
00194 void post_office::stop_serving() { if (_threads) _threads->stop_all(); }
00195 
00196 void post_office::show_mail(astring &output)
00197 {
00198   output.reset();
00199   output += parser_bits::platform_eol_to_chars();
00200   output += astring("Mailbox Contents at ") + time_stamp::notarize(true)
00201       + parser_bits::platform_eol_to_chars() + parser_bits::platform_eol_to_chars();
00202   astring box_state;
00203   _post->show(box_state); 
00204   if (box_state.t()) output += box_state;
00205   else
00206     output += astring("No items are awaiting delivery.")
00207         + parser_bits::platform_eol_to_chars();
00208 }
00209 
00210 void post_office::drop_off(const unique_int &id, letter *package)
00211 {
00212 #ifdef DEBUG_POST_OFFICE
00213   FUNCDEF("drop_off");
00214   LOG(astring(astring::SPRINTF, "mailbox drop for %d: ", id)
00215       + package->text_form());
00216 #endif
00217   _post->drop_off(id, package); 
00218 #ifdef DEBUG_POST_OFFICE
00219   if (!route_listed(id)) {
00220     LOG(a_sprintf("letter for %d has no route!", id));
00221   }
00222 #endif
00223 }
00224 
00225 bool post_office::pick_up(const unique_int &id, letter * &package)
00226 {
00227 #ifdef DEBUG_POST_OFFICE
00228   FUNCDEF("pick_up");
00229 #endif
00230   bool to_return = _post->pick_up(id, package); 
00231 #ifdef DEBUG_POST_OFFICE
00232   if (to_return)
00233     LOG(astring(astring::SPRINTF, "mailbox grab for %d: ", id)
00234         + package->text_form());
00235 #endif
00236   return to_return;
00237 }
00238 
00239 bool post_office::route_listed(const unique_int &id)
00240 {
00241   int_set route_set;
00242   get_route_list(route_set);
00243   return route_set.member(id.raw_id());
00244 }
00245 
00246 void post_office::get_route_list(int_set &route_set)
00247 {
00248   auto_synchronizer l(c_mutt);
00249 
00250   // gather the set of routes that we should carry mail to.
00251   route_set.reset();
00252 
00253   if (!_routes->elements()) {
00254     // if there are no elements, why bother iterating?
00255     return;
00256   }
00257 
00258   for (int i = 0; i < _routes->elements(); i++) {
00259     const tagged_mail_stop *tag = _routes->get(i);
00260     if (!tag) continue;
00261     route_set.add(tag->_id.raw_id());
00262   }
00263 }
00264 
00265 void post_office::clean_package_list(post_office &formal(post),
00266     letter_morph &to_clean)
00267 {
00268   FUNCDEF("clean_package_list");
00269   auto_synchronizer l(c_mutt);
00270 
00271   // recycle all the stuff we had in the list.
00272   while (to_clean.elements()) {
00273     letter *package = to_clean.acquire(0);
00274     to_clean.zap(0, 0);
00275     if (!package) {
00276       LOG("saw empty package in list to clean!");
00277       continue;
00278     }
00279     WHACK(package);
00280   }
00281 }
00282 
00283 bool post_office::deliver_mail_on_route(const unique_int &route,
00284     ethread &carrier)
00285 {
00286   FUNCDEF("deliver_mail_on_route");
00287   auto_synchronizer l(c_mutt);
00288 
00289 #ifdef DEBUG_POST_OFFICE
00290   time_stamp enter;
00291 #endif
00292   if (carrier.should_stop()) return true;  // get out if thread was told to.
00293 
00294   int deliveries = 0;  // number of items delivered so far.
00295   letter_morph items_for_route;
00296     // holds the items that need to be sent to this route.
00297 
00298   // pickup all of the mail that we can for this route.
00299   while (deliveries < DELIVERIES_ALLOWED) {
00300     if (carrier.should_stop())
00301       return true;  // get out if thread was told to.
00302     letter *package;
00303     if (!_post->pick_up(route, package)) {
00304       // there are no more letters for this route.
00305       break;  // skip out of the loop.
00306     }
00307     deliveries++;  // count this item as a delivery.
00308     items_for_route.append(package);
00309   }
00310 
00311   if (!items_for_route.elements()) return true;  // nothing to handle.
00312 
00313   // locate the destination for this route.
00314   tagged_mail_stop *real_route = _routes->find(route);  // find the route.
00315   if (!real_route) {
00316     // we failed to find the route we wanted...
00317     LOG(astring(astring::SPRINTF, "route %d disappeared!", route.raw_id()));
00318     clean_package_list(*this, items_for_route);
00319     return true;
00320   }
00321 
00322   // now deliver what we found for this route.
00323   for (int t = 0; t < items_for_route.elements(); t++) {
00324     if (carrier.should_stop()) {
00325       // get out if thread was told to.
00326       return true;
00327     }
00328     letter *package = items_for_route.acquire(t);
00329     // hand the package out on the route.
00330     mail_stop::items_to_deliver pack(route, package);
00331     real_route->_route->invoke_callback(pack);
00332       // the callee is responsible for cleaning up.
00333   }
00334 
00335   bool finished_all = (deliveries < DELIVERIES_ALLOWED);
00336     // true if we handled everything we could have.
00337 
00338   if (carrier.should_stop()) return true;  // get out if thread was told to.
00339 
00340   // this bit is for the post office at large, but we don't want an extra
00341   // thread when we've got all these others handy.
00342   bool cleaning_time = time_stamp() > *_next_cleaning;
00343   if (cleaning_time) {
00344     _post->clean_up();  // get rid of dead mailboxes in main post office.
00345     _next_cleaning->reset(CLEANING_INTERVAL);
00346   }
00347 
00348   time_stamp exit;
00349 #ifdef DEBUG_POST_OFFICE
00350   int duration = int(exit.value() - enter.value());
00351   if (duration > 20)
00352     LOG(a_sprintf("deliveries took %d ms.", duration));
00353 #endif
00354   return finished_all;
00355 }
00356 
00357 bool post_office::register_route(const unique_int &id,
00358     mail_stop &carrier_path)
00359 {
00360   auto_synchronizer l(c_mutt);
00361 
00362   tagged_mail_stop *found = _routes->find(id);
00363   if (found) return false;  // already exists.
00364 
00365   postal_carrier *po = new postal_carrier(*this, id);
00366   unique_int thread_id = _threads->add_thread(po, false, NIL);
00367     // add the thread so we can record its id.
00368   tagged_mail_stop *new_stop = new tagged_mail_stop(id, &carrier_path,
00369       thread_id);
00370   _routes->append(new_stop);
00371     // add the mail stop to our listings.
00372   po->start(NIL);
00373     // now start the thread so it can begin cranking.
00374   return true;
00375 }
00376 
00377 bool post_office::unregister_route(const unique_int &id)
00378 {
00379   auto_synchronizer l(c_mutt);
00380 
00381   tagged_mail_stop *tag = _routes->find(id);
00382   if (!tag) return false;  // doesn't exist yet.
00383   unique_int thread_id = tag->_id;
00384   _routes->zap(id);
00385   _threads->zap_thread(thread_id);
00386   return true;
00387 }
00388 
00389 } //namespace.
00390 
00391 
Generated on Sat Jan 28 04:22:24 2012 for hoople2 project by  doxygen 1.6.3