00001 #ifndef POST_OFFICE_IMPLEMENTATION_FILE
00002 #define POST_OFFICE_IMPLEMENTATION_FILE
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018 #include "letter.h"
00019 #include "mailbox.h"
00020 #include "post_office.h"
00021
00022 #include <basis/mutex.h>
00023 #include <basis/log_base.h>
00024 #include <basis/portable.h>
00025 #include <basis/set.h>
00026 #include <data_struct/amorph.cpp>
00027 #include <data_struct/unique_id.h>
00028 #include <mechanisms/ithread.h>
00029 #include <mechanisms/thread_cabinet.h>
00030 #include <mechanisms/time_stamp.h>
00031 #include <nodes/catalogable.h>
00032 #include <nodes/object_catalog.h>
00033 #include <opsystem/path_configuration.h>
00034
00035 using namespace nodes;
00036
00037
00038
00039
00040 #undef LOG
00041 #define LOG(a) CLASS_EMERGENCY_LOG(program_wide_logger(), a)
00042
00043 const int CLEANING_INTERVAL = 14 * SECOND_ms;
00044
00045
00046 const int SNOOZE_TIME_FOR_POSTMAN = 42;
00047
00048
00049
00050
00051 const int DELIVERIES_ALLOWED = 350;
00052
00053
00055
00056
00057
00058 class postal_carrier : public ithread
00059 {
00060 public:
00061 postal_carrier(post_office &parent, const unique_int &route)
00062 : ithread(SNOOZE_TIME_FOR_POSTMAN, ithread::SLACK_INTERVAL),
00063 _parent(parent),
00064 _route(route)
00065 {}
00066
00067 IMPLEMENT_CLASS_NAME("postal_carrier");
00068
00069 void perform_activity(void *) {
00070 FUNCDEF("perform_activity");
00071 bool finished;
00072 try {
00073 finished = _parent.deliver_mail_on_route(_route, *this);
00074 } catch(...) {
00075 LOG("caught exception during mail delivery!");
00076 }
00077 if (!finished) {
00078
00079 reschedule();
00080 } else {
00081 reschedule(SNOOZE_TIME_FOR_POSTMAN);
00082 }
00083 }
00084
00085 private:
00086 post_office &_parent;
00087 unique_int _route;
00088 };
00089
00091
00092 class postal_cache : public mailbox {};
00093
00095
00096 class tagged_mail_stop : public nodes::catalogable
00097 {
00098 public:
00099 mail_stop *_route;
00100 unique_int _thread_id;
00101
00102 tagged_mail_stop(const unique_int &id = 0, mail_stop *route = NIL,
00103 const unique_int &thread_id = 0)
00104 : catalogable(id), _route(route), _thread_id(thread_id) {}
00105
00106 IMPLEMENT_CLASS_NAME("tagged_mail_stop");
00107
00108 virtual istring catalogable_name() const {
00109 return isprintf("%s: id=%d, addr=%08lx, thr_id=%d",
00110 static_class_name(), id().raw_id(), _route, _thread_id.raw_id());
00111 }
00112 };
00113
00114 class route_map : public object_catalog {};
00115
00117
00118 class letter_morph : public amorph<letter> {};
00119
00121
00122 post_office::post_office()
00123 : _post(new mailbox),
00124 _routes(new route_map),
00125 _stock_lock(new mutex),
00126 _next_cleaning(new time_stamp),
00127 _threads(new thread_cabinet)
00128 {
00129 }
00130
00131 post_office::~post_office()
00132 {
00133 stop_serving();
00134 WHACK(_post);
00135 WHACK(_routes);
00136 WHACK(_next_cleaning);
00137 WHACK(_stock_lock);
00138 WHACK(_threads);
00139 }
00140
00141 void post_office::show_routes(istring &to_fill)
00142 {
00143
00144 objcat_iterator *iter = _routes->find_head();
00145 istring current_line;
00146 istring temp;
00147 if (_routes->elements(*iter))
00148 to_fill += istring("Mail Delivery Routes:") + log_base::platform_ending();
00149
00150 for ( ; !iter->is_tail(); iter->next()) {
00151 const tagged_mail_stop *tag
00152 = dynamic_cast<const tagged_mail_stop *>(iter->cat());
00153 if (!tag) continue;
00154
00155 temp = istring(istring::SPRINTF, "%d ", tag->id().raw_id());
00156 if (current_line.length() + temp.length() >= 80) {
00157 current_line += log_base::platform_ending();
00158 to_fill += current_line;
00159 current_line.reset();
00160 }
00161 current_line += temp;
00162 }
00163 _routes->iter_unlock(iter);
00164
00165 if (!!current_line) to_fill += current_line;
00166 }
00167
00168 void post_office::stop_serving() { if (_threads) _threads->stop_all(); }
00169
00170 void post_office::show_mail(istring &output)
00171 {
00172 output.reset();
00173 output += log_base::platform_ending();
00174 output += istring("Mailbox Contents at ") + utility::timestamp(true, true)
00175 + log_base::platform_ending() + log_base::platform_ending();
00176 istring box_state;
00177 _post->show(box_state);
00178 if (box_state.t()) output += box_state;
00179 else
00180 output += istring("No items are awaiting delivery.")
00181 + log_base::platform_ending();
00182 }
00183
00184 void post_office::drop_off(const unique_int &id, letter *package)
00185 {
00186 FUNCDEF("drop_off");
00187 #ifdef DEBUG_POST_OFFICE
00188 LOG(istring(istring::SPRINTF, "mailbox drop for %d: ", id)
00189 + package->text_form());
00190 #endif
00191 _post->drop_off(id, package);
00192 #ifdef DEBUG_POST_OFFICE
00193 if (!route_listed(id)) {
00194 LOG(isprintf("letter for %d has no route!", id));
00195 }
00196 #endif
00197 }
00198
00199 bool post_office::pick_up(const unique_int &id, letter * &package)
00200 {
00201 FUNCDEF("pick_up");
00202 bool to_return = _post->pick_up(id, package);
00203 #ifdef DEBUG_POST_OFFICE
00204 if (to_return)
00205 LOG(istring(istring::SPRINTF, "mailbox grab for %d: ", id)
00206 + package->text_form());
00207 #endif
00208 return to_return;
00209 }
00210
00211 bool post_office::route_listed(const unique_int &id)
00212 {
00213 int_set route_set;
00214 get_route_list(route_set);
00215 return route_set.member(id.raw_id());
00216 }
00217
00218 void post_office::get_route_list(int_set &route_set)
00219 {
00220
00221 route_set.reset();
00222
00223 objcat_iterator *iter = _routes->find_head();
00224 if (!_routes->elements(*iter)) {
00225
00226 _routes->iter_unlock(iter);
00227 return;
00228 }
00229
00230 for ( ; !iter->is_tail(); iter->next()) {
00231 const tagged_mail_stop *tag
00232 = dynamic_cast<const tagged_mail_stop *>(iter->cat());
00233 if (!tag) continue;
00234 route_set.add(tag->id().raw_id());
00235 }
00236 _routes->iter_unlock(iter);
00237 }
00238
00239 void post_office::clean_package_list(post_office &formal(post),
00240 letter_morph &to_clean)
00241 {
00242 FUNCDEF("clean_package_list");
00243
00244 while (to_clean.elements()) {
00245 letter *package = to_clean.acquire(0);
00246 to_clean.zap(0, 0);
00247 if (!package) {
00248 LOG("saw empty package in list to clean!");
00249 continue;
00250 }
00251 WHACK(package);
00252 }
00253 }
00254
00255 bool post_office::deliver_mail_on_route(const unique_int &route,
00256 ithread &carrier)
00257 {
00258 FUNCDEF("deliver_mail_on_route");
00259 #ifdef DEBUG_POST_OFFICE
00260 time_stamp enter;
00261 #endif
00262 if (carrier.should_stop()) return true;
00263
00264 int deliveries = 0;
00265 letter_morph items_for_route;
00266
00267
00268
00269 while (deliveries < DELIVERIES_ALLOWED) {
00270 if (carrier.should_stop())
00271 return true;
00272 letter *package;
00273 if (!_post->pick_up(route, package)) {
00274
00275 break;
00276 }
00277 deliveries++;
00278 items_for_route.append(package);
00279 }
00280
00281 if (!items_for_route.elements()) return true;
00282
00283
00284 objcat_iterator *iter = _routes->find(route);
00285 if (!iter) {
00286
00287 LOG(istring(istring::SPRINTF, "route %d disappeared!", route.raw_id()));
00288 clean_package_list(*this, items_for_route);
00289 return true;
00290 }
00291
00292 objcat_isolater *iso = _routes->separate(iter);
00293 if (!iso) {
00294
00295 LOG(isprintf("failed to separate route %d!", route.raw_id()));
00296 clean_package_list(*this, items_for_route);
00297 _routes->iter_unlock(iter);
00298 return true;
00299 }
00300 tagged_mail_stop *real_route = dynamic_cast<tagged_mail_stop *>(&iso->cat());
00301 if (!real_route) {
00302
00303 LOG(isprintf("route was wrong type for %d!", route.raw_id()));
00304 _routes->rejoin(iso);
00305 clean_package_list(*this, items_for_route);
00306 return true;
00307 }
00308
00309
00310 for (int t = 0; t < items_for_route.elements(); t++) {
00311 if (carrier.should_stop()) {
00312
00313 _routes->rejoin(iso);
00314 return true;
00315 }
00316 letter *package = items_for_route.acquire(t);
00317
00318 mail_stop::items_to_deliver pack(route, package);
00319 real_route->_route->invoke_callback(pack);
00320
00321 }
00322
00323 _routes->rejoin(iso);
00324
00325 bool finished_all = (deliveries < DELIVERIES_ALLOWED);
00326
00327
00328 if (carrier.should_stop()) return true;
00329
00330
00331
00332 bool cleaning_time = false;
00333 {
00334
00335 auto_synchronizer l(*_stock_lock);
00336 cleaning_time = time_stamp() > *_next_cleaning;
00337 }
00338 if (cleaning_time) {
00339 _post->clean_up();
00340 _next_cleaning->reset(CLEANING_INTERVAL);
00341 }
00342
00343 time_stamp exit;
00344 #ifdef DEBUG_POST_OFFICE
00345 int duration = int(exit.value() - enter.value());
00346 if (duration > 20)
00347 LOG(isprintf("deliveries took %d ms.", duration));
00348 #endif
00349 return finished_all;
00350 }
00351
00352 bool post_office::register_route(const unique_int &id,
00353 mail_stop &carrier_path)
00354 {
00355 objcat_iterator *iter = _routes->find(id);
00356 if (iter) {
00357 _routes->iter_unlock(iter);
00358 return false;
00359 }
00360 postal_carrier *po = new postal_carrier(*this, id);
00361 unique_int thread_id = _threads->add_thread(po, false, NIL);
00362
00363 tagged_mail_stop *new_stop = new tagged_mail_stop(id, &carrier_path,
00364 thread_id);
00365 _routes->add(new_stop);
00366
00367 po->start(NIL);
00368
00369 return true;
00370 }
00371
00372 bool post_office::unregister_route(const unique_int &id)
00373 {
00374 objcat_iterator *iter = _routes->find(id);
00375 if (!iter) return false;
00376 unique_int thread_id = 0;
00377 const tagged_mail_stop *tag
00378 = dynamic_cast<const tagged_mail_stop *>(iter->cat());
00379 if (tag) thread_id = tag->_thread_id;
00380
00381 _routes->iter_unlock(iter);
00382 _routes->zap_id(id);
00383 _threads->zap_thread(thread_id);
00384 return true;
00385 }
00386
00387
00388 #endif //POST_OFFICE_IMPLEMENTATION_FILE
00389