00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015 #include "ethread.h"
00016 #include "letter.h"
00017 #include "mailbox.h"
00018 #include "post_office.h"
00019 #include "thread_cabinet.h"
00020
00021 #include <basis/mutex.h>
00022 #include <configuration/application_configuration.h>
00023 #include <loggers/program_wide_logger.h>
00024 #include <structures/set.h>
00025 #include <structures/amorph.h>
00026 #include <structures/unique_id.h>
00027 #include <textual/parser_bits.h>
00028 #include <timely/time_stamp.h>
00029
00030 using namespace basis;
00031 using namespace configuration;
00032 using namespace loggers;
00033 using namespace structures;
00034 using namespace textual;
00035 using namespace timely;
00036
00037 namespace processes {
00038
00039
00040
00041
00042 #undef LOG
00043 #define LOG(a) CLASS_EMERGENCY_LOG(program_wide_logger::get(), a)
00044
00045 const int CLEANING_INTERVAL = 14 * SECOND_ms;
00046
00047
00048 const int SNOOZE_TIME_FOR_POSTMAN = 42;
00049
00050
00051
00052
00053 const int DELIVERIES_ALLOWED = 350;
00054
00055
00057
00058
00059
00060 class postal_carrier : public ethread
00061 {
00062 public:
00063 postal_carrier(post_office &parent, const unique_int &route)
00064 : ethread(SNOOZE_TIME_FOR_POSTMAN, ethread::SLACK_INTERVAL),
00065 _parent(parent),
00066 _route(route)
00067 {}
00068
00069 DEFINE_CLASS_NAME("postal_carrier");
00070
00071 void perform_activity(void *) {
00072 FUNCDEF("perform_activity");
00073 bool finished;
00074 try {
00075 finished = _parent.deliver_mail_on_route(_route, *this);
00076 } catch(...) {
00077 LOG(astring("caught exception during mail delivery!"));
00078 }
00079 if (!finished) {
00080
00081 reschedule();
00082 } else {
00083 reschedule(SNOOZE_TIME_FOR_POSTMAN);
00084 }
00085 }
00086
00087 private:
00088 post_office &_parent;
00089 unique_int _route;
00090 };
00091
00093
00094 class postal_cache : public mailbox {};
00095
00097
00098 class tagged_mail_stop : public virtual text_formable
00099 {
00100 public:
00101 mail_stop *_route;
00102 unique_int _thread_id;
00103 unique_int _id;
00104
00105 tagged_mail_stop(const unique_int &id = 0, mail_stop *route = NIL,
00106 const unique_int &thread_id = 0)
00107 : _route(route), _thread_id(thread_id), _id(id) {}
00108
00109 DEFINE_CLASS_NAME("tagged_mail_stop");
00110
00111 virtual void text_form(basis::base_string &fill) const {
00112 fill.assign(text_form());
00113 }
00114
00115 virtual astring text_form() const {
00116 return a_sprintf("%s: id=%d, addr=%08lx, thr_id=%d",
00117 static_class_name(), _id.raw_id(), _route, _thread_id.raw_id());
00118 }
00119 };
00120
00122
00123 class route_map : public amorph<tagged_mail_stop>
00124 {
00125 public:
00126 tagged_mail_stop *find(const unique_int &id) {
00127 for (int i = 0; i < elements(); i++) {
00128 tagged_mail_stop *curr = borrow(i);
00129 if (curr && (curr->_id == id)) return curr;
00130 }
00131 return NIL;
00132 }
00133
00134 bool zap(const unique_int &id) {
00135 for (int i = 0; i < elements(); i++) {
00136 tagged_mail_stop *curr = borrow(i);
00137 if (curr && (curr->_id == id)) {
00138 amorph<tagged_mail_stop>::zap(i, i);
00139 return true;
00140 }
00141 }
00142 return false;
00143 }
00144
00145 };
00146
00148
00149 class letter_morph : public amorph<letter> {};
00150
00152
00153 post_office::post_office()
00154 : _post(new mailbox),
00155 _routes(new route_map),
00156 _next_cleaning(new time_stamp),
00157 _threads(new thread_cabinet)
00158 {
00159 }
00160
00161 post_office::~post_office()
00162 {
00163 stop_serving();
00164 WHACK(_post);
00165 WHACK(_routes);
00166 WHACK(_next_cleaning);
00167 WHACK(_threads);
00168 }
00169
00170 void post_office::show_routes(astring &to_fill)
00171 {
00172 auto_synchronizer l(c_mutt);
00173
00174 astring current_line;
00175 astring temp;
00176 if (_routes->elements())
00177 to_fill += astring("Mail Delivery Routes:") + parser_bits::platform_eol_to_chars();
00178
00179 for (int i = 0; i < _routes->elements(); i++) {
00180 const tagged_mail_stop *tag = _routes->get(i);
00181 if (!tag) continue;
00182 temp = astring(astring::SPRINTF, "%d ", tag->_id.raw_id());
00183 if (current_line.length() + temp.length() >= 80) {
00184 current_line += parser_bits::platform_eol_to_chars();
00185 to_fill += current_line;
00186 current_line.reset();
00187 }
00188 current_line += temp;
00189 }
00190
00191 if (!!current_line) to_fill += current_line;
00192 }
00193
00194 void post_office::stop_serving() { if (_threads) _threads->stop_all(); }
00195
00196 void post_office::show_mail(astring &output)
00197 {
00198 output.reset();
00199 output += parser_bits::platform_eol_to_chars();
00200 output += astring("Mailbox Contents at ") + time_stamp::notarize(true)
00201 + parser_bits::platform_eol_to_chars() + parser_bits::platform_eol_to_chars();
00202 astring box_state;
00203 _post->show(box_state);
00204 if (box_state.t()) output += box_state;
00205 else
00206 output += astring("No items are awaiting delivery.")
00207 + parser_bits::platform_eol_to_chars();
00208 }
00209
00210 void post_office::drop_off(const unique_int &id, letter *package)
00211 {
00212 #ifdef DEBUG_POST_OFFICE
00213 FUNCDEF("drop_off");
00214 LOG(astring(astring::SPRINTF, "mailbox drop for %d: ", id)
00215 + package->text_form());
00216 #endif
00217 _post->drop_off(id, package);
00218 #ifdef DEBUG_POST_OFFICE
00219 if (!route_listed(id)) {
00220 LOG(a_sprintf("letter for %d has no route!", id));
00221 }
00222 #endif
00223 }
00224
00225 bool post_office::pick_up(const unique_int &id, letter * &package)
00226 {
00227 #ifdef DEBUG_POST_OFFICE
00228 FUNCDEF("pick_up");
00229 #endif
00230 bool to_return = _post->pick_up(id, package);
00231 #ifdef DEBUG_POST_OFFICE
00232 if (to_return)
00233 LOG(astring(astring::SPRINTF, "mailbox grab for %d: ", id)
00234 + package->text_form());
00235 #endif
00236 return to_return;
00237 }
00238
00239 bool post_office::route_listed(const unique_int &id)
00240 {
00241 int_set route_set;
00242 get_route_list(route_set);
00243 return route_set.member(id.raw_id());
00244 }
00245
00246 void post_office::get_route_list(int_set &route_set)
00247 {
00248 auto_synchronizer l(c_mutt);
00249
00250
00251 route_set.reset();
00252
00253 if (!_routes->elements()) {
00254
00255 return;
00256 }
00257
00258 for (int i = 0; i < _routes->elements(); i++) {
00259 const tagged_mail_stop *tag = _routes->get(i);
00260 if (!tag) continue;
00261 route_set.add(tag->_id.raw_id());
00262 }
00263 }
00264
00265 void post_office::clean_package_list(post_office &formal(post),
00266 letter_morph &to_clean)
00267 {
00268 FUNCDEF("clean_package_list");
00269 auto_synchronizer l(c_mutt);
00270
00271
00272 while (to_clean.elements()) {
00273 letter *package = to_clean.acquire(0);
00274 to_clean.zap(0, 0);
00275 if (!package) {
00276 LOG("saw empty package in list to clean!");
00277 continue;
00278 }
00279 WHACK(package);
00280 }
00281 }
00282
00283 bool post_office::deliver_mail_on_route(const unique_int &route,
00284 ethread &carrier)
00285 {
00286 FUNCDEF("deliver_mail_on_route");
00287 auto_synchronizer l(c_mutt);
00288
00289 #ifdef DEBUG_POST_OFFICE
00290 time_stamp enter;
00291 #endif
00292 if (carrier.should_stop()) return true;
00293
00294 int deliveries = 0;
00295 letter_morph items_for_route;
00296
00297
00298
00299 while (deliveries < DELIVERIES_ALLOWED) {
00300 if (carrier.should_stop())
00301 return true;
00302 letter *package;
00303 if (!_post->pick_up(route, package)) {
00304
00305 break;
00306 }
00307 deliveries++;
00308 items_for_route.append(package);
00309 }
00310
00311 if (!items_for_route.elements()) return true;
00312
00313
00314 tagged_mail_stop *real_route = _routes->find(route);
00315 if (!real_route) {
00316
00317 LOG(astring(astring::SPRINTF, "route %d disappeared!", route.raw_id()));
00318 clean_package_list(*this, items_for_route);
00319 return true;
00320 }
00321
00322
00323 for (int t = 0; t < items_for_route.elements(); t++) {
00324 if (carrier.should_stop()) {
00325
00326 return true;
00327 }
00328 letter *package = items_for_route.acquire(t);
00329
00330 mail_stop::items_to_deliver pack(route, package);
00331 real_route->_route->invoke_callback(pack);
00332
00333 }
00334
00335 bool finished_all = (deliveries < DELIVERIES_ALLOWED);
00336
00337
00338 if (carrier.should_stop()) return true;
00339
00340
00341
00342 bool cleaning_time = time_stamp() > *_next_cleaning;
00343 if (cleaning_time) {
00344 _post->clean_up();
00345 _next_cleaning->reset(CLEANING_INTERVAL);
00346 }
00347
00348 time_stamp exit;
00349 #ifdef DEBUG_POST_OFFICE
00350 int duration = int(exit.value() - enter.value());
00351 if (duration > 20)
00352 LOG(a_sprintf("deliveries took %d ms.", duration));
00353 #endif
00354 return finished_all;
00355 }
00356
00357 bool post_office::register_route(const unique_int &id,
00358 mail_stop &carrier_path)
00359 {
00360 auto_synchronizer l(c_mutt);
00361
00362 tagged_mail_stop *found = _routes->find(id);
00363 if (found) return false;
00364
00365 postal_carrier *po = new postal_carrier(*this, id);
00366 unique_int thread_id = _threads->add_thread(po, false, NIL);
00367
00368 tagged_mail_stop *new_stop = new tagged_mail_stop(id, &carrier_path,
00369 thread_id);
00370 _routes->append(new_stop);
00371
00372 po->start(NIL);
00373
00374 return true;
00375 }
00376
00377 bool post_office::unregister_route(const unique_int &id)
00378 {
00379 auto_synchronizer l(c_mutt);
00380
00381 tagged_mail_stop *tag = _routes->find(id);
00382 if (!tag) return false;
00383 unique_int thread_id = tag->_id;
00384 _routes->zap(id);
00385 _threads->zap_thread(thread_id);
00386 return true;
00387 }
00388
00389 }
00390
00391