scheduler.cpp

Go to the documentation of this file.
00001 #ifndef SCHEDULER_IMPLEMENTATION_FILE
00002 #define SCHEDULER_IMPLEMENTATION_FILE
00003 
00004 /*****************************************************************************\
00005 *                                                                             *
00006 *  Name   : scheduler                                                         *
00007 *  Author : Chris Koeritz                                                     *
00008 *                                                                             *
00009 *******************************************************************************
00010 * Copyright (c) 1991-$now By Author.  This program is free software; you can  *
00011 * redistribute it and/or modify it under the terms of the GNU General Public  *
00012 * License as published by the Free Software Foundation; either version 2 of   *
00013 * the License or (at your option) any later version.  This is online at:      *
00014 *     http://www.fsf.org/copyleft/gpl.html                                    *
00015 * Please send any updates to: fred@gruntose.com                               *
00016 \*****************************************************************************/
00017 
00018 // Notes:
00019 //
00020 //   _pending_sends must be touched only when the schedule is locked.
00021 //
00022 //   _weight_tracker likewise.
00023 
00024 #include "implem_only.h"
00025 #include "schedulable.h"
00026 #include "schedule_actor.h"
00027 #include "scheduler.h"
00028 
00029 #include <basis/istring.h>
00030 #include <basis/mutex.h>
00031 #include <basis/portable.h>
00032 #include <basis/set.cpp>
00033 #include <mathematics/averager.cpp>
00034 #include <data_struct/memory_limiter.h>
00035 #include <mechanisms/ithread.h>
00036 #include <nodes/safe_list.h>
00037 #include <nodes/safe_node.h>
00038 #include <post_office/letter.h>
00039 #include <post_office/mailbox.h>
00040 
00041 using namespace basis;
00042 using namespace nodes;
00043 
00044 //#define DEBUG_SCHEDULER
00045   // uncomment for noisy version.
00046 
00047 const int DURATION_AVERAGER_SIZE = 20;
00048   // the number of entries to keep in the averager that tracks scheduling
00049   // durations.
00050 
00051 //hmmm: make tunable.
00052 const int ENFORCED_SCHEDULER_PAUSE = 42;
00053   // we will sleep this long if we chewed through everything we could in the
00054   // schedule but there's still more left.  we don't want the scheduler to
00055   // run continually.
00056 
00057 const int SCHEDULE_HITS_ALLOWED = 1480;
00058   // the number of items we'll crank per scheduling run.
00059 
00060 #define LIFT_WEIGHT_AND_VERIFY(id, size) \
00061   if (!_weight_tracker->record_deletion(id, size)) { \
00062     LOG(istring(istring::SPRINTF, "size for actor %d is botched!", id)); \
00063   }
00064 
00066 
00067 class waiting_item : public letter
00068 {
00069 public:
00070   schedulable *_held;
00071 
00072   istring text_form() const
00073       { if (_held) return _held->schedulable_text_form(); else return ""; }
00074 
00075   waiting_item(schedulable *to_hold) : _held(to_hold) {}
00076 };
00077 
00079 
00080 // the scheduler's iterator relies on the scheduler to set its hidden
00081 // iterator pointer correctly.
00082 
00083 schedule_iterator::schedule_iterator(const scheduler &mgr)
00084 : _manager(mgr),
00085   _hidden_iterator(NIL)
00086 {}
00087 
00088 schedule_iterator::~schedule_iterator() { _hidden_iterator = NIL; }
00089 
00090 void schedule_iterator::next() { _hidden_iterator->next(); }
00091 
00092 void schedule_iterator::previous() { _hidden_iterator->previous(); }
00093 
00094 schedulable *schedule_iterator::access()
00095 { return dynamic_cast<schedulable *>(_hidden_iterator->access()); }
00096 
00097 bool schedule_iterator::is_head() { return _hidden_iterator->is_head(); }
00098 
00099 bool schedule_iterator::is_tail() { return _hidden_iterator->is_tail(); }
00100 
00101 void schedule_iterator::jump_head() { _hidden_iterator->jump_head(); }
00102 
00103 void schedule_iterator::jump_tail() { _hidden_iterator->jump_tail(); }
00104 
00106 
00107 class schedule_driver : public ithread
00108 {
00109 public:
00110   schedule_driver(scheduler &to_drive);
00111   ~schedule_driver();
00112 
00113   IMPLEMENT_CLASS_NAME("schedule_driver");
00114 
00115 private:
00116   scheduler &_to_drive;  // the scheduling object we push around.
00117 
00118   void perform_activity(void *thread_data);
00119     // main function of object; invoked by the ithread base.
00120 };
00121 
00123 
00124 schedule_driver::schedule_driver(scheduler &to_drive)
00125 : ithread(to_drive.snooze_interval(), ithread::SLACK_INTERVAL),
00126   _to_drive(to_drive)
00127 {}
00128 
00129 schedule_driver::~schedule_driver() { stop(); }
00130 
00131 void schedule_driver::perform_activity(void *formal(thread_data))
00132 {
00133   FUNCDEF("perform_activity");
00134   outcome ret;  // used inside loop.
00135   // run until thread is told to quit.
00136   time_stamp start_time;  // click, stopwatch starts...
00137   try {
00138     // the scheduling happens for a while.
00139     ret = _to_drive.process_schedule(SCHEDULE_HITS_ALLOWED);
00140   } catch (...) {
00141     LOG("an exception has occurred during schedule driving!");
00142   }
00143   // click, stopwatch stops...
00144   time_stamp stop_time;
00145   int duration = int(stop_time.value() - start_time.value());
00146 //#ifdef DEBUG_SCHEDULER
00147   if (duration > 500)
00148     LOG(istring(istring::SPRINTF, "scheduling took %d ms.", duration));  
00149 //#endif
00150   if (ret == scheduler::MORE_LEFT) {
00151     // if there were more items to do, we re-energize the scheduler.
00152     reschedule(ENFORCED_SCHEDULER_PAUSE);
00153   }
00154 }
00155 
00157 
00158 scheduler::scheduler(actor_mapper &mapper, int time_per, int maximum_weight,
00159     int max_actor_weight, int snooze_interval)
00160 : _mapper(mapper),
00161   _time_per_item(time_per),
00162   _schedule(new safe_list),
00163   _processing_durations(new int_averager(DURATION_AVERAGER_SIZE, false)),
00164   _activation_durations(new int_averager(DURATION_AVERAGER_SIZE, false)),
00165   _item_durations(new int_averager(DURATION_AVERAGER_SIZE, false)),
00166   _bandwidths(new int_averager(DURATION_AVERAGER_SIZE, false)),
00167   _active_item(NIL),
00168   _weight_tracker(new memory_limiter(maximum_weight, max_actor_weight)),
00169   _pending_sends(new mailbox),
00170   _timely_to_run(new int_set),
00171   _queued_actors(new int_set),
00172   _snooze_interval(snooze_interval),
00173   _driver(new schedule_driver(*this))
00174 {}
00175 
00176 scheduler::~scheduler()
00177 {
00178   FUNCDEF("destructor");
00179   // shut down the scheduling thread before anything else happens.
00180   _driver->stop();
00181 
00182   // check if the schedule was clean on shutdown or not.
00183   safe_list_write_iterator *iter = _schedule->open_writer();
00184   if (!_schedule->empty(*iter)) {
00185 #ifdef DEBUG_SCHEDULER
00186     LOG("Some items remain in the schedule!...");
00187     // close our iterator so the text_form function can do one.
00188     _schedule->close_writer(iter);
00189     string_array dump_list;
00190     schedule_text_form(dump_list, 0);
00191     istring dump;
00192     for (int i = 0; i < dump_list.length(); i++)
00193       dump += dump_list[i] + log_base::platform_ending();
00194     LOG(dump);  // dump the schedule's contents out.
00195 
00196     dump.reset();
00197     pending_text_form(dump, 0);
00198     for (int j = 0; j < dump_list.length(); j++)
00199       dump += dump_list[j] + log_base::platform_ending();
00200     LOG(dump);  // dump the pending items out.
00201 
00202     // re-open the iterator so we can clean the schedule and release all
00203     // the trapped memory.
00204     iter = _schedule->open_writer();
00205 #endif
00206     while (!iter->is_tail()) {
00207       schedulable *found = dynamic_cast<schedulable *>(iter->access());
00208       if (found) LIFT_WEIGHT_AND_VERIFY(found->actor_id(), found->weight());
00209       _schedule->zap(*iter);
00210     }
00211   }
00212   _schedule->close_writer(iter);
00213   WHACK(_timely_to_run);
00214   WHACK(_queued_actors);
00215   WHACK(_schedule);
00216   WHACK(_processing_durations);
00217   WHACK(_activation_durations);
00218   WHACK(_item_durations);
00219   WHACK(_bandwidths);
00220   WHACK(_weight_tracker);
00221   WHACK(_pending_sends);
00222   WHACK(_driver);
00223 }
00224 
00225 const int_averager &scheduler::item_durations() const
00226 { return *_item_durations; }
00227 
00228 const int_averager &scheduler::bandwidths() const
00229 { return *_bandwidths; }
00230 
00231 const int_averager &scheduler::processing_durations() const
00232 { return *_processing_durations; }
00233 
00234 const int_averager &scheduler::activation_durations() const
00235 { return *_activation_durations; }
00236 
00237 const schedulable *scheduler::active_item(schedule_iterator &formal(to_use)) const
00238 { return _active_item; }
00239 
00240 void scheduler::start_scheduling() { _driver->start(NIL); }
00241 
00242 void scheduler::stop_scheduling() { _driver->stop(); }
00243 
00244 schedule_iterator *scheduler::open_iterator(common::list_positions where)
00245 {
00246   if ( (where != common::HEAD) && (where != common::TAIL) )
00247     return NIL;  // bad position.
00248   safe_list_write_iterator *iter = _schedule->open_writer(where);
00249   schedule_iterator *to_return = new schedule_iterator(*this);
00250   to_return->_hidden_iterator = iter;
00251   return to_return;
00252 }
00253 
00254 void scheduler::close_iterator(schedule_iterator * &to_close)
00255 {
00256   if (!to_close) return;  // argh!  what the heck are they trying to pull?
00257   if (to_close->_hidden_iterator) {
00258     _schedule->close_writer(to_close->_hidden_iterator);
00259     to_close->_hidden_iterator = NIL;
00260   }
00261   WHACK(to_close);
00262 }
00263 
00264 void scheduler::zap(schedule_iterator &to_use)
00265 {
00266   FUNCDEF("zap");
00267   schedulable *found = dynamic_cast<schedulable *>(to_use.access());
00268   LIFT_WEIGHT_AND_VERIFY(found->actor_id(), found->weight());
00269   _schedule->zap(*to_use._hidden_iterator); 
00270 }
00271 
00272 schedule_iterator *scheduler::find(scheduling_id to_find)
00273 {
00274   safe_list_write_iterator *iter = _schedule->open_writer();
00275   while (!iter->is_tail()) {
00276     schedulable *examining = dynamic_cast<schedulable *>(iter->access());
00277     if (examining && (examining->item_id() == to_find)) {
00278       schedule_iterator *to_return = new schedule_iterator(*this);
00279       to_return->_hidden_iterator = iter;
00280       return to_return;
00281     }
00282     iter->next();
00283   }
00284   _schedule->close_writer(iter);
00285   return NIL;
00286 }
00287 
00288 bool scheduler::items_scheduled(schedule_iterator &iter, int actor)
00289 {
00290   // check the active item first.
00291   const schedulable *active = active_item(iter);
00292   if (active && (active->actor_id() == actor))
00293     return true;
00294 
00295   // now scoot through and look at the other items.
00296   iter.jump_head();
00297   while (!iter.is_tail()) {
00298     schedulable *examining = dynamic_cast<schedulable *>(iter.access());
00299     if (examining && (examining->actor_id() == actor))
00300       return true;
00301     iter.next();
00302   }
00303 
00304   return false;
00305 }
00306 
00307 outcome scheduler::do_preprocess(schedulable &to_preprocess)
00308 {
00309   FUNCDEF("do_preprocess");
00310   // now preprocess the item as it begins to slide into operation.
00311   actor_mapping *actor = _mapper.open_actor(to_preprocess.actor_id());
00312   if (!actor) {
00313     // toss this item because there's no actor for it.
00314     LOG(istring(istring::SPRINTF, "could not find actor for id %d; "
00315         "cancelling item.", to_preprocess.actor_id()));
00316     return CANCEL;
00317   }
00318 
00319   // reset the time stamp on the item to now, since we're just about to
00320   // preprocess.
00321   to_preprocess.stats().entry = time_stamp();
00322 
00323   // reset the expiration counter since we're starting the item now.
00324   to_preprocess.expiration().kabump();
00325 
00326   outcome ret = actor->_actor.preprocess(to_preprocess);
00327   _mapper.close_actor(actor);
00328   to_preprocess.stats().last_outcome = ret.value();
00329   if (ret != schedule_actor::OKAY) return CANCEL;
00330   return OKAY;
00331 }
00332 
00333 outcome scheduler::add(schedulable *to_add, bool single_file)
00334 {
00335   FUNCDEF("add");
00336   if (!to_add) return BAD_INPUT;
00337   if (!to_add->item_id()) {
00338     WHACK(to_add);
00339     return BAD_INPUT;
00340   }
00341 
00342   // see if this item has already been scheduled somehow.  the list must not
00343   // be locked before this check, since the find method will also lock it.
00344   schedule_iterator *found = find(to_add->item_id());
00345   if (found) {
00346     LOG(istring(istring::SPRINTF, "item %d is already in schedule!",
00347         to_add->item_id().raw_id()));
00348     close_iterator(found);
00349     WHACK(to_add);
00350     return BAD_INPUT;
00351   }
00352 
00353   // create a write iterator to enable us to store the item in the schedule.
00354   safe_list_write_iterator *iter = _schedule->open_writer(safe_list::TAIL);
00355 
00356   // check that we have enough memory left.
00357   if (!_weight_tracker->okay_allocation(to_add->actor_id(), to_add->weight())) {
00358 #ifdef DEBUG_SCHEDULER
00359     LOG(isprintf("we are too bloated for item %d!",
00360         to_add->item_id().raw_id()));
00361 #endif
00362     _schedule->close_writer(iter);
00363     WHACK(to_add);
00364     return NO_SPACE;
00365   }
00366 
00367   // reset certain data bits appropriately...
00368   to_add->stats().last_outcome = int(schedule_actor::OKAY);
00369 
00370   if (single_file) {
00371     // instead of queueing this up for action immediately, we add it to the
00372     // mailbox.  then, once we've dropped our lock on the schedule, we push any
00373     // items that are ready into the real schedule.
00374     waiting_item *new_guy = new waiting_item(to_add);
00375     _pending_sends->drop_off(to_add->actor_id(), new_guy);
00376 #ifdef DEBUG_SCHEDULER
00377     LOG(isprintf("adding item %d to the pending sends.",
00378         to_add->item_id().raw_id()));
00379 #endif
00380     _schedule->close_writer(iter);
00381     promote_actor(to_add->actor_id());
00382     return OKAY;
00383   } 
00384 
00385   // stuff the item into the schedule now instead of later.
00386   _schedule->insert(*iter, to_add);
00387 
00388 #ifdef DEBUG_SCHEDULER
00389   LOG(isprintf("added item %d directly to schedule.",
00390       to_add->item_id().raw_id()));
00391 #endif
00392 
00393   // done editing the schedule.
00394   _schedule->close_writer(iter);
00395   return OKAY;
00396 }
00397 
00398 bool scheduler::time_to_schedule(schedulable &to_check)
00399 {
00400   if (to_check.start_time <= time_stamp()) return true;
00401   return false;
00402 }
00403 
00404 bool scheduler::find_timely_items(int_set &timely)
00405 {
00406   FUNCDEF("find_timely_items");
00407   timely.reset();  // clear out any prior contents.
00408   safe_list_write_iterator *iter;
00409   for (iter = _schedule->open_writer(); !iter->is_tail(); iter->next()) {
00410     if (!iter->access()) break;  // no elements, so scram.
00411     // look at the current item in the schedule.
00412     schedulable *to_examine = dynamic_cast<schedulable *>(iter->access());
00413     if (!to_examine) {
00414       // something wrong with this one.
00415       LOG(istring("found bad schedulable in list!"));
00416       _schedule->zap(*iter);
00417       continue;
00418     }
00419     // check if we already added this one.  we don't bother checking again.
00420     if (timely.member(to_examine->item_id().raw_id())) continue;
00421     // if it's time now, we add this one to our list of interesting items.
00422     if (time_to_schedule(*to_examine))
00423       timely += to_examine->item_id().raw_id();
00424   }
00425   _schedule->close_writer(iter);
00426   return !!timely.elements();
00427 }
00428 
00429 outcome scheduler::schedule_item(int &hits, schedulable &to_process)
00430 {
00431   FUNCDEF("schedule_item");
00432   if (!to_process.preprocessed()) {
00433     // this item has not been preprocessed yet, so we'll do it now.
00434 #ifdef DEBUG_SCHEDULER
00435     LOG(isprintf("preprocessing %d.", to_process.item_id().raw_id()));
00436 #endif
00437     outcome ret = do_preprocess(to_process);
00438     if (ret != OKAY) {
00439       // if it wasn't okay, they don't want to schedule it after all.
00440       LOG(isprintf("preprocess disliked item %d, gave outcome ",
00441           to_process.item_id().raw_id()) + outcome_name(ret));
00442       return ret;
00443     }
00444     to_process.preprocessed(true);
00445 #ifdef DEBUG_SCHEDULER
00446     LOG(isprintf("preprocessed %d.", to_process.item_id().raw_id()));
00447 #endif
00448   }
00449 
00450   // find the actor on this item, for future reference.
00451   actor_mapping *actor = _mapper.open_actor(to_process.actor_id());
00452   if (!actor) {
00453     LOG(isprintf("could not find actor for id %d; now whacking item.",
00454         to_process.actor_id()));
00455     return schedule_actor::CANCEL;
00456   } 
00457 
00458   outcome current_outcome = schedule_actor::OKAY;
00459   if (to_process.expiration().dead()) {
00460     // this item is already deceased.
00461     LOG(isprintf("item %d is dead.", to_process.item_id().raw_id()));
00462     current_outcome = schedule_actor::CANCEL;
00463   } else if (to_process.expiration_interval) {
00464     // if an expiration interval was set, we check that the item has
00465     // not already expired.
00466     outcome ret = check_expiration(to_process, actor->_actor);
00467     if (ret != scheduler::OKAY) {
00468       // need to chuck this bugger out of the schedule now; it expired and
00469       // didn't get a reprieve.
00470       LOG(isprintf("item %d is cancelled!  expired and not renewed.",
00471           to_process.item_id().raw_id())
00472           + istring("  got outcome ") + outcome_name(ret));
00473       current_outcome = schedule_actor::CANCEL;
00474       to_process.expiration().reset(0, 0);
00475     }
00476   } else {
00477     // it's not dead, and it has no expiration interval, so we say it had
00478     // a healthy heartbeat and should not expire.
00479     to_process.expiration().kabump();
00480   }
00481 
00482   if (current_outcome != schedule_actor::OKAY) {
00483     return current_outcome;
00484   }
00485 
00486   // if nothing bad has happened so far, schedule the item.
00487   time_stamp activation_start;
00488 #ifdef DEBUG_SCHEDULER
00489   LOG(isprintf("activating %d.", to_process.item_id().raw_id()));
00490 #endif
00491   // give the item some time to operate.
00492   current_outcome = actor->_actor.activate(to_process, _time_per_item);
00493   if (current_outcome != schedule_actor::POSTPONE) {
00494     // we have now definitely activated something.
00495     hits++;
00496 #ifdef DEBUG_SCHEDULER
00497     LOG(isprintf("activated %d (outcome=%s)", to_process.item_id().raw_id(),
00498         schedule_actor::outcome_name(current_outcome)));
00499 #endif
00500   }
00501 
00502   int activation_duration = int(time_stamp().value()
00503       - activation_start.value());
00504   if (activation_duration)
00505     _activation_durations->add(activation_duration, 1);
00506 
00507   to_process.stats().last_outcome = current_outcome.value();
00508     // record what happened.
00509 
00510   // process the returned status from scheduling the item.  if the item
00511   // needs to be removed, it gets whacked here.
00512   if ( (current_outcome == schedule_actor::FINISHED)
00513         || (current_outcome == schedule_actor::CANCEL) ) {
00514 #ifdef DEBUG_SCHEDULER
00515     LOG(isprintf("postprocessing %d.", to_process.item_id().raw_id()));
00516 #endif
00517     // this item has had it; do its postprocessing.
00518     actor->_actor.postprocess(to_process);
00519       // we don't record the postprocessing outcome in the stats, because
00520       // the last outcome is supposed to be the last one encountered
00521       // _during_ scheduling.
00522 #ifdef DEBUG_SCHEDULER
00523     LOG(isprintf("postprocessed %d.", to_process.item_id().raw_id()));
00524 #endif
00525     _mapper.close_actor(actor);
00526       // close out the actor since we're done talking to it about this
00527       // item forever.
00528 
00529     // set the exit time.
00530     to_process.stats().exit = time_stamp();
00531 
00532 #ifdef DEBUG_SCHEDULER
00533     LOG(isprintf("deleting %d.", to_process.item_id().raw_id()));
00534 #endif
00535 
00536     // get its overall duration...
00537     if (to_process.stats().exit.initialized()
00538         && to_process.stats().entry.initialized()) {
00539       time_stamp exit = (const time_stamp &)to_process.stats().exit;
00540       time_stamp entry = (const time_stamp &)to_process.stats().entry;
00541       int overall_duration = int(exit.value() - entry.value());
00542       if (overall_duration) {
00543         _item_durations->add(overall_duration, 1);
00544 #ifdef DEBUG_SCHEDULER
00545         if (!to_process.weight())
00546           LOG("invalid weight (zero) for transmission!!!");
00547 #endif
00548         double raw_bandwidth = double(to_process.weight())
00549             / overall_duration;
00550           // compute bytes per millisecond.
00551         int bandwidth = int(raw_bandwidth / 1024.0 * 1000.0);
00552         _bandwidths->add(bandwidth, 1);
00553       }
00554     }
00555 
00556     return current_outcome;
00557   }
00558 
00559   // shut down the actor now that we're done with it.
00560   _mapper.close_actor(actor);
00561 
00562   // make sure, if it's periodic, that its start time gets bumped.
00563   if (to_process.entry_interval
00564       && (current_outcome == schedule_actor::POSTPONE))
00565     to_process.start_time = time_stamp(to_process.entry_interval);
00566   
00567   return current_outcome;
00568 }
00569 
00570 void scheduler::crunch_on_timely_items(int &hits, int hits_allowed,
00571     int_set &timely)
00572 {
00573   FUNCDEF("crunch_on_timely_items");
00574   safe_list_write_iterator *iter;
00575   for (iter = _schedule->open_writer(); !iter->is_tail(); iter->next()) {
00576     if (hits > hits_allowed) break;  // we've used up our welcome.
00577     if (!iter->access()) break;  // no elements, so scram.
00578     if (!timely.elements()) break;  // we ran out of things to look at.
00579 
00580     // look at the current item in the schedule.
00581     schedulable *to_examine = dynamic_cast<schedulable *>(iter->access());
00582     if (!to_examine) {
00583       // something wrong with this one.
00584       LOG(istring("found bad schedulable in list!"));
00585       _schedule->zap(*iter);
00586       continue;
00587     }
00588     // make sure this is one that we're authorized to talk to.
00589     if (!timely.member(to_examine->item_id().raw_id()))
00590       continue;
00591     // re-check that this particular item is ready.
00592     if (!time_to_schedule(*to_examine)) {
00593       // this one is no longer relevant.
00594       timely -= to_examine->item_id().raw_id();
00595       continue;
00596     }
00597 
00598     to_examine = dynamic_cast<schedulable *>(_schedule->remove(*iter));
00599       // reset the current item by taking over control.
00600     if (!to_examine) {
00601       LOG("logic error!  schedule item removed was blank!");
00602       continue;
00603     }
00604 
00605     _active_item = to_examine;  // set the active object.
00606     _schedule->close_writer(iter);
00607       // relinquish control during the processing of this item.  an iterator
00608       // must be re-opened before continuing the loop.
00609   
00610     outcome sched_ret = schedule_item(hits, *to_examine);
00611       // activate the item now.
00612 
00613     iter = _schedule->open_writer(safe_list::TAIL);
00614       // re-open the iterator so we're protected.
00615     _active_item = NIL;  // nobody active now.
00616     // decide upon the fate of this item based on its scheduling outcome.
00617     if ( (sched_ret == schedule_actor::FINISHED)
00618           || (sched_ret == schedule_actor::CANCEL) ) {
00619       // this one is done, so we're not going to add it back.
00620       LIFT_WEIGHT_AND_VERIFY(to_examine->actor_id(), to_examine->weight());
00621       _schedule->close_writer(iter);  // close list so we can change it.
00622       // now that the item's not listed as active, move up any waiting items.
00623       promote_actor(to_examine->actor_id());
00624       timely -= to_examine->item_id().raw_id();  // remove from consideration.
00625       WHACK(to_examine);
00626       // re-open iterator to keep going.
00627       iter = _schedule->open_writer(safe_list::TAIL);
00628     } else {
00629       // this item is healthy still; add it to the tail of the schedule.
00630       _schedule->insert(*iter, to_examine);
00631       // check if we were told to snooze on this item.
00632       if (sched_ret == schedule_actor::POSTPONE)
00633         timely -= to_examine->item_id().raw_id();
00634     }
00635     iter->jump_head();
00636       // pop back up to the top.  our exit conditions should ensure we
00637       // won't get trapped in a vicious cycle.
00638   }
00639   _schedule->close_writer(iter);
00640 }
00641 
00642 outcome scheduler::process_schedule(int hits_allowed)
00643 {
00644   FUNCDEF("process_schedule");
00645   time_stamp processing_began;  // when scheduling starts.
00646   int hits = 0;  // number of times we've activated an item.
00647 
00648   promote_queued_items();
00649     // move the entries that are queued if there's no other active entry
00650     // for the actor.  actors will be individually promoted during scheduling.
00651 
00652   // iterate through the schedule as long as we have time.
00653   while (hits < hits_allowed) {
00654     bool keep_running = find_timely_items(*_timely_to_run);
00655 #ifdef DEBUG_SCHEDULER
00656     if (_timely_to_run->elements()) {
00657       istring list;
00658       for (int i = 0; i < _timely_to_run->elements(); i++) {
00659         if (i != 0) list += ", ";
00660         list += isprintf("%d", (*_timely_to_run)[i]);
00661       }
00662       LOG(istring("timely list: ") + list);
00663     }
00664 #endif
00665     if (!keep_running) break;
00666     crunch_on_timely_items(hits, hits_allowed, *_timely_to_run);
00667   }
00668 
00669   // record the accounting information.
00670   int time_taken = int(time_stamp().value() - processing_began.value());
00671   if (time_taken) _processing_durations->add(time_taken);
00672 
00673 //#ifdef DEBUG_SCHEDULER
00674   if (hits > 140)
00675     LOG(istring(istring::SPRINTF, "scheduler got in %d hits.", hits));
00676 //#endif
00677 
00678   if (hits >= hits_allowed) return MORE_LEFT;
00679     // not everything was totally finished with.
00680   return OKAY;
00681 }
00682 
00683 outcome scheduler::check_expiration(schedulable &to_check,
00684     schedule_actor &actor)
00685 {
00686   FUNCDEF("check_expiration");
00687   if (to_check.expiration().dead()) return CANCEL;
00688   if (!to_check.expiration().due()) return OKAY;
00689 
00690   // invoke the actor's expiration processing.
00691   outcome ret = actor.expiration(to_check);
00692   to_check.expiration().kabump();
00693     // reset the timer, regardless of the outcome.
00694 
00695   // deal with the expiration appropriately.
00696   switch (ret.value()) {
00697     // treated as fairly fresh.
00698     case schedule_actor::POSTPONE: case schedule_actor::OKAY:
00699       return OKAY;
00700     // treated as expired.
00701     case schedule_actor::FINISHED: case schedule_actor::CANCEL:
00702       return CANCEL;
00703     // unexpected, so expire it.
00704     default: return CANCEL;
00705   }
00706 }
00707 
00708 outcome scheduler::mark_dead(scheduling_id to_mark)
00709 {
00710   FUNCDEF("mark_dead");
00711   schedule_iterator *to_zap = find(to_mark);
00712   if (!to_zap) return NOT_FOUND;
00713   to_zap->access()->expiration().reset(0, 0);
00714   close_iterator(to_zap);
00715   return OKAY;
00716 }
00717 
00718 outcome scheduler::remove_by_actor(int actor_id)
00719 {
00720   FUNCDEF("remove_by_actor");
00721   safe_list_write_iterator *iter = _schedule->open_writer();
00722   while (!iter->is_tail()) {
00723     schedulable *found = dynamic_cast<schedulable *>(iter->access());
00724     if (found && (found->actor_id() == actor_id) ) {
00725       LIFT_WEIGHT_AND_VERIFY(found->actor_id(), found->weight());
00726       _schedule->zap(*iter);
00727       continue;
00728     }
00729     iter->next();
00730   }
00731 
00732   // clear the pending sends for this actor also.
00733   _pending_sends->close_out(actor_id);
00734 
00735   _schedule->close_writer(iter);
00736   return OKAY;
00737 }
00738 
00739 void scheduler::schedule_text_form(string_array &info, int formal(indent))
00740 {
00741 //hmmm: use the indent.
00742   info.reset();
00743   safe_list_read_iterator *iter = _schedule->open_reader();
00744   if (_active_item)
00745     info += istring("active => ") + _active_item->schedulable_text_form();
00746   while (!iter->is_tail()) {
00747     const schedulable *found = dynamic_cast<const schedulable *>
00748         (iter->observe());
00749     if (found) info += found->schedulable_text_form();
00750     iter->next();
00751   }
00752   _schedule->close_reader(iter);
00753 }
00754 
00755 void scheduler::pending_text_form(istring &info, int formal(indent))
00756 {
00757 //hmmm: use the indent.
00758   safe_list_read_iterator *iter = _schedule->open_reader();
00759   _pending_sends->show(info);
00760   _schedule->close_reader(iter);
00761 }
00762 
00763 void scheduler::memory_text_form(istring &info, int indent)
00764 { info = _weight_tracker->text_form(indent); }
00765 
00766 void scheduler::promote_queued_items()
00767 {
00768   FUNCDEF("promote_queued_items");
00769   safe_list_write_iterator *iter = _schedule->open_writer();
00770 
00771   _pending_sends->clean_up();
00772     // shake out any dead mailboxes so we're not promoting unnecessarily.
00773 
00774   _pending_sends->get_ids(*_queued_actors);
00775     // get those actors that have any contents.
00776 
00777   _schedule->close_writer(iter);
00778 
00779   // try to promote each actor with things that need handling.
00780   for (int i = 0; i < _queued_actors->elements(); i++) {
00781     int to_promote = (*_queued_actors)[i];
00782     promote_actor(to_promote);
00783   }
00784 }
00785 
00786 void scheduler::promote_actor(int actor_id)
00787 {
00788   FUNCDEF("promote_actor");
00789   safe_list_write_iterator *iter = _schedule->open_writer();
00790 
00791   if (!_pending_sends->waiting(actor_id)) {
00792     // there wouldn't be anything to send, so why bother?
00793     _schedule->close_writer(iter);
00794     return;
00795   }
00796 
00797   if (_active_item && (_active_item->actor_id() == actor_id) ) {
00798     // the active guy actually belongs to this one, so we can't promote.
00799     _schedule->close_writer(iter);
00800     return;
00801   }
00802 
00803   while (!iter->is_tail()) {
00804     schedulable *examining = dynamic_cast<schedulable *>(iter->access());
00805     if (examining && (examining->actor_id() == actor_id)) {
00806       // this actor already has something in place.
00807       _schedule->close_writer(iter);
00808       return;
00809     }
00810     iter->next();
00811   }
00812 
00813   // we've guaranteed that nothing is in the scheduler for the actor.
00814 
00815   letter *next_letter = NIL;
00816   if (!_pending_sends->pick_up(actor_id, next_letter)) {
00817     // what?  nothing to deliver...
00818 #ifdef DEBUG_SCHEDULER
00819     LOG(istring(istring::SPRINTF, "huh? actor %d had no pending sends, "
00820         "but we checked before and it did.", actor_id));
00821 #endif
00822     _schedule->close_writer(iter);
00823     return;
00824   }
00825 
00826   waiting_item *ready_item = (waiting_item *)next_letter;
00827   // finally, add the item into the scheduler.
00828 #ifdef DEBUG_SCHEDULER
00829   LOG(istring(istring::SPRINTF, "adding item %d for actor %d into "
00830       "schedule from pending.", ready_item->_held->item_id().raw_id(),
00831       actor_id));
00832   int last_sched_id = ready_item->_held->item_id().raw_id();
00833 #endif
00834 
00835   _schedule->insert(*iter, ready_item->_held);
00836   WHACK(next_letter);
00837 
00838 #ifdef DEBUG_SCHEDULER
00839   // test that it's not missing after the insert.
00840   iter->jump_head();
00841   bool found_it = false;
00842   for ( ; !iter->is_tail(); iter->next()) {
00843     schedulable *found = dynamic_cast<schedulable *>(iter->access());
00844     if (found->item_id().raw_id() == last_sched_id) found_it = true;
00845   }
00846   if (!found_it)
00847     deadly_error("scheduler", "promote actor",
00848         "missing schedule id we just added!");
00849 #endif
00850 
00851   _schedule->close_writer(iter);
00852 }
00853 
00854 outcome removal_applier(letter &to_promote, int uid, void *datalink)
00855 {
00856   #define static_class_name() "scheduler"
00857   FUNCDEF("removal_applier");
00858   if (uid) {}
00859   waiting_item *item = (waiting_item *)&to_promote;
00860   int to_whack = *(int *)datalink;
00861 
00862 #ifdef DEBUG_SCHEDULER
00863   if (item->_held->item_id() == to_whack)
00864     LOG(istring(istring::SPRINTF, "whacking item %x for actor %d "
00865         "from pending.", to_whack, uid));
00866 #endif
00867 
00868   if (item->_held->item_id() == to_whack) {
00869     WHACK(item->_held);   // clean the item for real; the husk is tossed after.
00870     return mailbox::APPLY_WHACK_STOP;
00871   }
00872   return mailbox::OKAY;
00873   #undef class_name
00874 }
00875 
00876 outcome scheduler::remove_by_id(scheduling_id to_remove)
00877 {
00878   FUNCDEF("remove_by_id");
00879   safe_list_write_iterator *iter = _schedule->open_writer();
00880   bool found_it = false;
00881   while (!iter->is_tail()) {
00882     schedulable *found = dynamic_cast<schedulable *>(iter->access());
00883     if (found->item_id() == to_remove) {
00884       LIFT_WEIGHT_AND_VERIFY(found->actor_id(), found->weight());
00885 #ifdef DEBUG_SCHEDULER
00886       LOG(istring(istring::SPRINTF, "found item %d for actor %d to whack.",
00887           to_remove.raw_id(), found->actor_id()));
00888 #endif
00889       _schedule->zap(*iter);
00890       found_it = true;
00891       break;
00892     }
00893     iter->next();
00894   }
00895 
00896   if (!found_it) {
00897     // clean any with that id out of the pending items also.
00898     int id = to_remove.raw_id();
00899     _pending_sends->apply(removal_applier, (void *)&id);
00900   }
00901 
00902   _schedule->close_writer(iter);
00903   return OKAY;
00904 }
00905 
00906 outcome filter_actor_applier(letter &to_promote, int uid, void *datalink)
00907 {
00908   #undef static_class_name
00909   #define static_class_name() "scheduler"
00910   FUNCDEF("filter_actor_applier");
00911   if (uid) {}
00912   waiting_item *item = (waiting_item *)&to_promote;
00913   int_set *healthy_actors = (int_set *)datalink;
00914 
00915   if (!healthy_actors->member(item->_held->actor_id())) {
00916 #ifdef DEBUG_SCHEDULER
00917     LOG(istring(istring::SPRINTF, "whacking item %d for actor %d "
00918         "from pending.", item->_held->actor_id(), uid));
00919 #endif
00920     WHACK(item->_held);   // clean the item for real; the husk is tossed after.
00921     return mailbox::APPLY_WHACK;
00922   }
00923 
00924   return mailbox::OKAY;
00925   #undef class_name
00926 }
00927 
00928 void scheduler::filter_missing_actors(const int_set &valid_actors)
00929 {
00930   FUNCDEF("filter_missing_actors");
00931   safe_list_write_iterator *iter = _schedule->open_writer();
00932   while (!iter->is_tail()) {
00933     schedulable *found = dynamic_cast<schedulable *>(iter->access());
00934     if (found && !valid_actors.member(found->actor_id())) {
00935       // this one seems bad, since it's not in our list.
00936       LIFT_WEIGHT_AND_VERIFY(found->actor_id(), found->weight());
00937 #ifdef DEBUG_SCHEDULER
00938       LOG(istring(istring::SPRINTF, "cleaning item %d for dead conn %d.",
00939           found->item_id().raw_id(), found->actor_id()));
00940 #endif
00941       _schedule->zap(*iter);
00942     } else
00943       iter->next();  // only go to next node if we haven't changed the list.
00944   }
00945 
00946   // clean any out of the pending queue also.
00947   _pending_sends->apply