00001 #ifndef SCHEDULER_IMPLEMENTATION_FILE
00002 #define SCHEDULER_IMPLEMENTATION_FILE
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024 #include "implem_only.h"
00025 #include "schedulable.h"
00026 #include "schedule_actor.h"
00027 #include "scheduler.h"
00028
00029 #include <basis/istring.h>
00030 #include <basis/mutex.h>
00031 #include <basis/portable.h>
00032 #include <basis/set.cpp>
00033 #include <mathematics/averager.cpp>
00034 #include <data_struct/memory_limiter.h>
00035 #include <mechanisms/ithread.h>
00036 #include <nodes/safe_list.h>
00037 #include <nodes/safe_node.h>
00038 #include <post_office/letter.h>
00039 #include <post_office/mailbox.h>
00040
00041 using namespace basis;
00042 using namespace nodes;
00043
00044
00045
00046
00047 const int DURATION_AVERAGER_SIZE = 20;
00048
00049
00050
00051
00052 const int ENFORCED_SCHEDULER_PAUSE = 42;
00053
00054
00055
00056
00057 const int SCHEDULE_HITS_ALLOWED = 1480;
00058
00059
00060 #define LIFT_WEIGHT_AND_VERIFY(id, size) \
00061 if (!_weight_tracker->record_deletion(id, size)) { \
00062 LOG(istring(istring::SPRINTF, "size for actor %d is botched!", id)); \
00063 }
00064
00066
00067 class waiting_item : public letter
00068 {
00069 public:
00070 schedulable *_held;
00071
00072 istring text_form() const
00073 { if (_held) return _held->schedulable_text_form(); else return ""; }
00074
00075 waiting_item(schedulable *to_hold) : _held(to_hold) {}
00076 };
00077
00079
00080
00081
00082
00083 schedule_iterator::schedule_iterator(const scheduler &mgr)
00084 : _manager(mgr),
00085 _hidden_iterator(NIL)
00086 {}
00087
00088 schedule_iterator::~schedule_iterator() { _hidden_iterator = NIL; }
00089
00090 void schedule_iterator::next() { _hidden_iterator->next(); }
00091
00092 void schedule_iterator::previous() { _hidden_iterator->previous(); }
00093
00094 schedulable *schedule_iterator::access()
00095 { return dynamic_cast<schedulable *>(_hidden_iterator->access()); }
00096
00097 bool schedule_iterator::is_head() { return _hidden_iterator->is_head(); }
00098
00099 bool schedule_iterator::is_tail() { return _hidden_iterator->is_tail(); }
00100
00101 void schedule_iterator::jump_head() { _hidden_iterator->jump_head(); }
00102
00103 void schedule_iterator::jump_tail() { _hidden_iterator->jump_tail(); }
00104
00106
00107 class schedule_driver : public ithread
00108 {
00109 public:
00110 schedule_driver(scheduler &to_drive);
00111 ~schedule_driver();
00112
00113 IMPLEMENT_CLASS_NAME("schedule_driver");
00114
00115 private:
00116 scheduler &_to_drive;
00117
00118 void perform_activity(void *thread_data);
00119
00120 };
00121
00123
00124 schedule_driver::schedule_driver(scheduler &to_drive)
00125 : ithread(to_drive.snooze_interval(), ithread::SLACK_INTERVAL),
00126 _to_drive(to_drive)
00127 {}
00128
00129 schedule_driver::~schedule_driver() { stop(); }
00130
00131 void schedule_driver::perform_activity(void *formal(thread_data))
00132 {
00133 FUNCDEF("perform_activity");
00134 outcome ret;
00135
00136 time_stamp start_time;
00137 try {
00138
00139 ret = _to_drive.process_schedule(SCHEDULE_HITS_ALLOWED);
00140 } catch (...) {
00141 LOG("an exception has occurred during schedule driving!");
00142 }
00143
00144 time_stamp stop_time;
00145 int duration = int(stop_time.value() - start_time.value());
00146
00147 if (duration > 500)
00148 LOG(istring(istring::SPRINTF, "scheduling took %d ms.", duration));
00149
00150 if (ret == scheduler::MORE_LEFT) {
00151
00152 reschedule(ENFORCED_SCHEDULER_PAUSE);
00153 }
00154 }
00155
00157
00158 scheduler::scheduler(actor_mapper &mapper, int time_per, int maximum_weight,
00159 int max_actor_weight, int snooze_interval)
00160 : _mapper(mapper),
00161 _time_per_item(time_per),
00162 _schedule(new safe_list),
00163 _processing_durations(new int_averager(DURATION_AVERAGER_SIZE, false)),
00164 _activation_durations(new int_averager(DURATION_AVERAGER_SIZE, false)),
00165 _item_durations(new int_averager(DURATION_AVERAGER_SIZE, false)),
00166 _bandwidths(new int_averager(DURATION_AVERAGER_SIZE, false)),
00167 _active_item(NIL),
00168 _weight_tracker(new memory_limiter(maximum_weight, max_actor_weight)),
00169 _pending_sends(new mailbox),
00170 _timely_to_run(new int_set),
00171 _queued_actors(new int_set),
00172 _snooze_interval(snooze_interval),
00173 _driver(new schedule_driver(*this))
00174 {}
00175
00176 scheduler::~scheduler()
00177 {
00178 FUNCDEF("destructor");
00179
00180 _driver->stop();
00181
00182
00183 safe_list_write_iterator *iter = _schedule->open_writer();
00184 if (!_schedule->empty(*iter)) {
00185 #ifdef DEBUG_SCHEDULER
00186 LOG("Some items remain in the schedule!...");
00187
00188 _schedule->close_writer(iter);
00189 string_array dump_list;
00190 schedule_text_form(dump_list, 0);
00191 istring dump;
00192 for (int i = 0; i < dump_list.length(); i++)
00193 dump += dump_list[i] + log_base::platform_ending();
00194 LOG(dump);
00195
00196 dump.reset();
00197 pending_text_form(dump, 0);
00198 for (int j = 0; j < dump_list.length(); j++)
00199 dump += dump_list[j] + log_base::platform_ending();
00200 LOG(dump);
00201
00202
00203
00204 iter = _schedule->open_writer();
00205 #endif
00206 while (!iter->is_tail()) {
00207 schedulable *found = dynamic_cast<schedulable *>(iter->access());
00208 if (found) LIFT_WEIGHT_AND_VERIFY(found->actor_id(), found->weight());
00209 _schedule->zap(*iter);
00210 }
00211 }
00212 _schedule->close_writer(iter);
00213 WHACK(_timely_to_run);
00214 WHACK(_queued_actors);
00215 WHACK(_schedule);
00216 WHACK(_processing_durations);
00217 WHACK(_activation_durations);
00218 WHACK(_item_durations);
00219 WHACK(_bandwidths);
00220 WHACK(_weight_tracker);
00221 WHACK(_pending_sends);
00222 WHACK(_driver);
00223 }
00224
00225 const int_averager &scheduler::item_durations() const
00226 { return *_item_durations; }
00227
00228 const int_averager &scheduler::bandwidths() const
00229 { return *_bandwidths; }
00230
00231 const int_averager &scheduler::processing_durations() const
00232 { return *_processing_durations; }
00233
00234 const int_averager &scheduler::activation_durations() const
00235 { return *_activation_durations; }
00236
00237 const schedulable *scheduler::active_item(schedule_iterator &formal(to_use)) const
00238 { return _active_item; }
00239
00240 void scheduler::start_scheduling() { _driver->start(NIL); }
00241
00242 void scheduler::stop_scheduling() { _driver->stop(); }
00243
00244 schedule_iterator *scheduler::open_iterator(common::list_positions where)
00245 {
00246 if ( (where != common::HEAD) && (where != common::TAIL) )
00247 return NIL;
00248 safe_list_write_iterator *iter = _schedule->open_writer(where);
00249 schedule_iterator *to_return = new schedule_iterator(*this);
00250 to_return->_hidden_iterator = iter;
00251 return to_return;
00252 }
00253
00254 void scheduler::close_iterator(schedule_iterator * &to_close)
00255 {
00256 if (!to_close) return;
00257 if (to_close->_hidden_iterator) {
00258 _schedule->close_writer(to_close->_hidden_iterator);
00259 to_close->_hidden_iterator = NIL;
00260 }
00261 WHACK(to_close);
00262 }
00263
00264 void scheduler::zap(schedule_iterator &to_use)
00265 {
00266 FUNCDEF("zap");
00267 schedulable *found = dynamic_cast<schedulable *>(to_use.access());
00268 LIFT_WEIGHT_AND_VERIFY(found->actor_id(), found->weight());
00269 _schedule->zap(*to_use._hidden_iterator);
00270 }
00271
00272 schedule_iterator *scheduler::find(scheduling_id to_find)
00273 {
00274 safe_list_write_iterator *iter = _schedule->open_writer();
00275 while (!iter->is_tail()) {
00276 schedulable *examining = dynamic_cast<schedulable *>(iter->access());
00277 if (examining && (examining->item_id() == to_find)) {
00278 schedule_iterator *to_return = new schedule_iterator(*this);
00279 to_return->_hidden_iterator = iter;
00280 return to_return;
00281 }
00282 iter->next();
00283 }
00284 _schedule->close_writer(iter);
00285 return NIL;
00286 }
00287
00288 bool scheduler::items_scheduled(schedule_iterator &iter, int actor)
00289 {
00290
00291 const schedulable *active = active_item(iter);
00292 if (active && (active->actor_id() == actor))
00293 return true;
00294
00295
00296 iter.jump_head();
00297 while (!iter.is_tail()) {
00298 schedulable *examining = dynamic_cast<schedulable *>(iter.access());
00299 if (examining && (examining->actor_id() == actor))
00300 return true;
00301 iter.next();
00302 }
00303
00304 return false;
00305 }
00306
00307 outcome scheduler::do_preprocess(schedulable &to_preprocess)
00308 {
00309 FUNCDEF("do_preprocess");
00310
00311 actor_mapping *actor = _mapper.open_actor(to_preprocess.actor_id());
00312 if (!actor) {
00313
00314 LOG(istring(istring::SPRINTF, "could not find actor for id %d; "
00315 "cancelling item.", to_preprocess.actor_id()));
00316 return CANCEL;
00317 }
00318
00319
00320
00321 to_preprocess.stats().entry = time_stamp();
00322
00323
00324 to_preprocess.expiration().kabump();
00325
00326 outcome ret = actor->_actor.preprocess(to_preprocess);
00327 _mapper.close_actor(actor);
00328 to_preprocess.stats().last_outcome = ret.value();
00329 if (ret != schedule_actor::OKAY) return CANCEL;
00330 return OKAY;
00331 }
00332
00333 outcome scheduler::add(schedulable *to_add, bool single_file)
00334 {
00335 FUNCDEF("add");
00336 if (!to_add) return BAD_INPUT;
00337 if (!to_add->item_id()) {
00338 WHACK(to_add);
00339 return BAD_INPUT;
00340 }
00341
00342
00343
00344 schedule_iterator *found = find(to_add->item_id());
00345 if (found) {
00346 LOG(istring(istring::SPRINTF, "item %d is already in schedule!",
00347 to_add->item_id().raw_id()));
00348 close_iterator(found);
00349 WHACK(to_add);
00350 return BAD_INPUT;
00351 }
00352
00353
00354 safe_list_write_iterator *iter = _schedule->open_writer(safe_list::TAIL);
00355
00356
00357 if (!_weight_tracker->okay_allocation(to_add->actor_id(), to_add->weight())) {
00358 #ifdef DEBUG_SCHEDULER
00359 LOG(isprintf("we are too bloated for item %d!",
00360 to_add->item_id().raw_id()));
00361 #endif
00362 _schedule->close_writer(iter);
00363 WHACK(to_add);
00364 return NO_SPACE;
00365 }
00366
00367
00368 to_add->stats().last_outcome = int(schedule_actor::OKAY);
00369
00370 if (single_file) {
00371
00372
00373
00374 waiting_item *new_guy = new waiting_item(to_add);
00375 _pending_sends->drop_off(to_add->actor_id(), new_guy);
00376 #ifdef DEBUG_SCHEDULER
00377 LOG(isprintf("adding item %d to the pending sends.",
00378 to_add->item_id().raw_id()));
00379 #endif
00380 _schedule->close_writer(iter);
00381 promote_actor(to_add->actor_id());
00382 return OKAY;
00383 }
00384
00385
00386 _schedule->insert(*iter, to_add);
00387
00388 #ifdef DEBUG_SCHEDULER
00389 LOG(isprintf("added item %d directly to schedule.",
00390 to_add->item_id().raw_id()));
00391 #endif
00392
00393
00394 _schedule->close_writer(iter);
00395 return OKAY;
00396 }
00397
00398 bool scheduler::time_to_schedule(schedulable &to_check)
00399 {
00400 if (to_check.start_time <= time_stamp()) return true;
00401 return false;
00402 }
00403
00404 bool scheduler::find_timely_items(int_set &timely)
00405 {
00406 FUNCDEF("find_timely_items");
00407 timely.reset();
00408 safe_list_write_iterator *iter;
00409 for (iter = _schedule->open_writer(); !iter->is_tail(); iter->next()) {
00410 if (!iter->access()) break;
00411
00412 schedulable *to_examine = dynamic_cast<schedulable *>(iter->access());
00413 if (!to_examine) {
00414
00415 LOG(istring("found bad schedulable in list!"));
00416 _schedule->zap(*iter);
00417 continue;
00418 }
00419
00420 if (timely.member(to_examine->item_id().raw_id())) continue;
00421
00422 if (time_to_schedule(*to_examine))
00423 timely += to_examine->item_id().raw_id();
00424 }
00425 _schedule->close_writer(iter);
00426 return !!timely.elements();
00427 }
00428
00429 outcome scheduler::schedule_item(int &hits, schedulable &to_process)
00430 {
00431 FUNCDEF("schedule_item");
00432 if (!to_process.preprocessed()) {
00433
00434 #ifdef DEBUG_SCHEDULER
00435 LOG(isprintf("preprocessing %d.", to_process.item_id().raw_id()));
00436 #endif
00437 outcome ret = do_preprocess(to_process);
00438 if (ret != OKAY) {
00439
00440 LOG(isprintf("preprocess disliked item %d, gave outcome ",
00441 to_process.item_id().raw_id()) + outcome_name(ret));
00442 return ret;
00443 }
00444 to_process.preprocessed(true);
00445 #ifdef DEBUG_SCHEDULER
00446 LOG(isprintf("preprocessed %d.", to_process.item_id().raw_id()));
00447 #endif
00448 }
00449
00450
00451 actor_mapping *actor = _mapper.open_actor(to_process.actor_id());
00452 if (!actor) {
00453 LOG(isprintf("could not find actor for id %d; now whacking item.",
00454 to_process.actor_id()));
00455 return schedule_actor::CANCEL;
00456 }
00457
00458 outcome current_outcome = schedule_actor::OKAY;
00459 if (to_process.expiration().dead()) {
00460
00461 LOG(isprintf("item %d is dead.", to_process.item_id().raw_id()));
00462 current_outcome = schedule_actor::CANCEL;
00463 } else if (to_process.expiration_interval) {
00464
00465
00466 outcome ret = check_expiration(to_process, actor->_actor);
00467 if (ret != scheduler::OKAY) {
00468
00469
00470 LOG(isprintf("item %d is cancelled! expired and not renewed.",
00471 to_process.item_id().raw_id())
00472 + istring(" got outcome ") + outcome_name(ret));
00473 current_outcome = schedule_actor::CANCEL;
00474 to_process.expiration().reset(0, 0);
00475 }
00476 } else {
00477
00478
00479 to_process.expiration().kabump();
00480 }
00481
00482 if (current_outcome != schedule_actor::OKAY) {
00483 return current_outcome;
00484 }
00485
00486
00487 time_stamp activation_start;
00488 #ifdef DEBUG_SCHEDULER
00489 LOG(isprintf("activating %d.", to_process.item_id().raw_id()));
00490 #endif
00491
00492 current_outcome = actor->_actor.activate(to_process, _time_per_item);
00493 if (current_outcome != schedule_actor::POSTPONE) {
00494
00495 hits++;
00496 #ifdef DEBUG_SCHEDULER
00497 LOG(isprintf("activated %d (outcome=%s)", to_process.item_id().raw_id(),
00498 schedule_actor::outcome_name(current_outcome)));
00499 #endif
00500 }
00501
00502 int activation_duration = int(time_stamp().value()
00503 - activation_start.value());
00504 if (activation_duration)
00505 _activation_durations->add(activation_duration, 1);
00506
00507 to_process.stats().last_outcome = current_outcome.value();
00508
00509
00510
00511
00512 if ( (current_outcome == schedule_actor::FINISHED)
00513 || (current_outcome == schedule_actor::CANCEL) ) {
00514 #ifdef DEBUG_SCHEDULER
00515 LOG(isprintf("postprocessing %d.", to_process.item_id().raw_id()));
00516 #endif
00517
00518 actor->_actor.postprocess(to_process);
00519
00520
00521
00522 #ifdef DEBUG_SCHEDULER
00523 LOG(isprintf("postprocessed %d.", to_process.item_id().raw_id()));
00524 #endif
00525 _mapper.close_actor(actor);
00526
00527
00528
00529
00530 to_process.stats().exit = time_stamp();
00531
00532 #ifdef DEBUG_SCHEDULER
00533 LOG(isprintf("deleting %d.", to_process.item_id().raw_id()));
00534 #endif
00535
00536
00537 if (to_process.stats().exit.initialized()
00538 && to_process.stats().entry.initialized()) {
00539 time_stamp exit = (const time_stamp &)to_process.stats().exit;
00540 time_stamp entry = (const time_stamp &)to_process.stats().entry;
00541 int overall_duration = int(exit.value() - entry.value());
00542 if (overall_duration) {
00543 _item_durations->add(overall_duration, 1);
00544 #ifdef DEBUG_SCHEDULER
00545 if (!to_process.weight())
00546 LOG("invalid weight (zero) for transmission!!!");
00547 #endif
00548 double raw_bandwidth = double(to_process.weight())
00549 / overall_duration;
00550
00551 int bandwidth = int(raw_bandwidth / 1024.0 * 1000.0);
00552 _bandwidths->add(bandwidth, 1);
00553 }
00554 }
00555
00556 return current_outcome;
00557 }
00558
00559
00560 _mapper.close_actor(actor);
00561
00562
00563 if (to_process.entry_interval
00564 && (current_outcome == schedule_actor::POSTPONE))
00565 to_process.start_time = time_stamp(to_process.entry_interval);
00566
00567 return current_outcome;
00568 }
00569
00570 void scheduler::crunch_on_timely_items(int &hits, int hits_allowed,
00571 int_set &timely)
00572 {
00573 FUNCDEF("crunch_on_timely_items");
00574 safe_list_write_iterator *iter;
00575 for (iter = _schedule->open_writer(); !iter->is_tail(); iter->next()) {
00576 if (hits > hits_allowed) break;
00577 if (!iter->access()) break;
00578 if (!timely.elements()) break;
00579
00580
00581 schedulable *to_examine = dynamic_cast<schedulable *>(iter->access());
00582 if (!to_examine) {
00583
00584 LOG(istring("found bad schedulable in list!"));
00585 _schedule->zap(*iter);
00586 continue;
00587 }
00588
00589 if (!timely.member(to_examine->item_id().raw_id()))
00590 continue;
00591
00592 if (!time_to_schedule(*to_examine)) {
00593
00594 timely -= to_examine->item_id().raw_id();
00595 continue;
00596 }
00597
00598 to_examine = dynamic_cast<schedulable *>(_schedule->remove(*iter));
00599
00600 if (!to_examine) {
00601 LOG("logic error! schedule item removed was blank!");
00602 continue;
00603 }
00604
00605 _active_item = to_examine;
00606 _schedule->close_writer(iter);
00607
00608
00609
00610 outcome sched_ret = schedule_item(hits, *to_examine);
00611
00612
00613 iter = _schedule->open_writer(safe_list::TAIL);
00614
00615 _active_item = NIL;
00616
00617 if ( (sched_ret == schedule_actor::FINISHED)
00618 || (sched_ret == schedule_actor::CANCEL) ) {
00619
00620 LIFT_WEIGHT_AND_VERIFY(to_examine->actor_id(), to_examine->weight());
00621 _schedule->close_writer(iter);
00622
00623 promote_actor(to_examine->actor_id());
00624 timely -= to_examine->item_id().raw_id();
00625 WHACK(to_examine);
00626
00627 iter = _schedule->open_writer(safe_list::TAIL);
00628 } else {
00629
00630 _schedule->insert(*iter, to_examine);
00631
00632 if (sched_ret == schedule_actor::POSTPONE)
00633 timely -= to_examine->item_id().raw_id();
00634 }
00635 iter->jump_head();
00636
00637
00638 }
00639 _schedule->close_writer(iter);
00640 }
00641
00642 outcome scheduler::process_schedule(int hits_allowed)
00643 {
00644 FUNCDEF("process_schedule");
00645 time_stamp processing_began;
00646 int hits = 0;
00647
00648 promote_queued_items();
00649
00650
00651
00652
00653 while (hits < hits_allowed) {
00654 bool keep_running = find_timely_items(*_timely_to_run);
00655 #ifdef DEBUG_SCHEDULER
00656 if (_timely_to_run->elements()) {
00657 istring list;
00658 for (int i = 0; i < _timely_to_run->elements(); i++) {
00659 if (i != 0) list += ", ";
00660 list += isprintf("%d", (*_timely_to_run)[i]);
00661 }
00662 LOG(istring("timely list: ") + list);
00663 }
00664 #endif
00665 if (!keep_running) break;
00666 crunch_on_timely_items(hits, hits_allowed, *_timely_to_run);
00667 }
00668
00669
00670 int time_taken = int(time_stamp().value() - processing_began.value());
00671 if (time_taken) _processing_durations->add(time_taken);
00672
00673
00674 if (hits > 140)
00675 LOG(istring(istring::SPRINTF, "scheduler got in %d hits.", hits));
00676
00677
00678 if (hits >= hits_allowed) return MORE_LEFT;
00679
00680 return OKAY;
00681 }
00682
00683 outcome scheduler::check_expiration(schedulable &to_check,
00684 schedule_actor &actor)
00685 {
00686 FUNCDEF("check_expiration");
00687 if (to_check.expiration().dead()) return CANCEL;
00688 if (!to_check.expiration().due()) return OKAY;
00689
00690
00691 outcome ret = actor.expiration(to_check);
00692 to_check.expiration().kabump();
00693
00694
00695
00696 switch (ret.value()) {
00697
00698 case schedule_actor::POSTPONE: case schedule_actor::OKAY:
00699 return OKAY;
00700
00701 case schedule_actor::FINISHED: case schedule_actor::CANCEL:
00702 return CANCEL;
00703
00704 default: return CANCEL;
00705 }
00706 }
00707
00708 outcome scheduler::mark_dead(scheduling_id to_mark)
00709 {
00710 FUNCDEF("mark_dead");
00711 schedule_iterator *to_zap = find(to_mark);
00712 if (!to_zap) return NOT_FOUND;
00713 to_zap->access()->expiration().reset(0, 0);
00714 close_iterator(to_zap);
00715 return OKAY;
00716 }
00717
00718 outcome scheduler::remove_by_actor(int actor_id)
00719 {
00720 FUNCDEF("remove_by_actor");
00721 safe_list_write_iterator *iter = _schedule->open_writer();
00722 while (!iter->is_tail()) {
00723 schedulable *found = dynamic_cast<schedulable *>(iter->access());
00724 if (found && (found->actor_id() == actor_id) ) {
00725 LIFT_WEIGHT_AND_VERIFY(found->actor_id(), found->weight());
00726 _schedule->zap(*iter);
00727 continue;
00728 }
00729 iter->next();
00730 }
00731
00732
00733 _pending_sends->close_out(actor_id);
00734
00735 _schedule->close_writer(iter);
00736 return OKAY;
00737 }
00738
00739 void scheduler::schedule_text_form(string_array &info, int formal(indent))
00740 {
00741
00742 info.reset();
00743 safe_list_read_iterator *iter = _schedule->open_reader();
00744 if (_active_item)
00745 info += istring("active => ") + _active_item->schedulable_text_form();
00746 while (!iter->is_tail()) {
00747 const schedulable *found = dynamic_cast<const schedulable *>
00748 (iter->observe());
00749 if (found) info += found->schedulable_text_form();
00750 iter->next();
00751 }
00752 _schedule->close_reader(iter);
00753 }
00754
00755 void scheduler::pending_text_form(istring &info, int formal(indent))
00756 {
00757
00758 safe_list_read_iterator *iter = _schedule->open_reader();
00759 _pending_sends->show(info);
00760 _schedule->close_reader(iter);
00761 }
00762
00763 void scheduler::memory_text_form(istring &info, int indent)
00764 { info = _weight_tracker->text_form(indent); }
00765
00766 void scheduler::promote_queued_items()
00767 {
00768 FUNCDEF("promote_queued_items");
00769 safe_list_write_iterator *iter = _schedule->open_writer();
00770
00771 _pending_sends->clean_up();
00772
00773
00774 _pending_sends->get_ids(*_queued_actors);
00775
00776
00777 _schedule->close_writer(iter);
00778
00779
00780 for (int i = 0; i < _queued_actors->elements(); i++) {
00781 int to_promote = (*_queued_actors)[i];
00782 promote_actor(to_promote);
00783 }
00784 }
00785
00786 void scheduler::promote_actor(int actor_id)
00787 {
00788 FUNCDEF("promote_actor");
00789 safe_list_write_iterator *iter = _schedule->open_writer();
00790
00791 if (!_pending_sends->waiting(actor_id)) {
00792
00793 _schedule->close_writer(iter);
00794 return;
00795 }
00796
00797 if (_active_item && (_active_item->actor_id() == actor_id) ) {
00798
00799 _schedule->close_writer(iter);
00800 return;
00801 }
00802
00803 while (!iter->is_tail()) {
00804 schedulable *examining = dynamic_cast<schedulable *>(iter->access());
00805 if (examining && (examining->actor_id() == actor_id)) {
00806
00807 _schedule->close_writer(iter);
00808 return;
00809 }
00810 iter->next();
00811 }
00812
00813
00814
00815 letter *next_letter = NIL;
00816 if (!_pending_sends->pick_up(actor_id, next_letter)) {
00817
00818 #ifdef DEBUG_SCHEDULER
00819 LOG(istring(istring::SPRINTF, "huh? actor %d had no pending sends, "
00820 "but we checked before and it did.", actor_id));
00821 #endif
00822 _schedule->close_writer(iter);
00823 return;
00824 }
00825
00826 waiting_item *ready_item = (waiting_item *)next_letter;
00827
00828 #ifdef DEBUG_SCHEDULER
00829 LOG(istring(istring::SPRINTF, "adding item %d for actor %d into "
00830 "schedule from pending.", ready_item->_held->item_id().raw_id(),
00831 actor_id));
00832 int last_sched_id = ready_item->_held->item_id().raw_id();
00833 #endif
00834
00835 _schedule->insert(*iter, ready_item->_held);
00836 WHACK(next_letter);
00837
00838 #ifdef DEBUG_SCHEDULER
00839
00840 iter->jump_head();
00841 bool found_it = false;
00842 for ( ; !iter->is_tail(); iter->next()) {
00843 schedulable *found = dynamic_cast<schedulable *>(iter->access());
00844 if (found->item_id().raw_id() == last_sched_id) found_it = true;
00845 }
00846 if (!found_it)
00847 deadly_error("scheduler", "promote actor",
00848 "missing schedule id we just added!");
00849 #endif
00850
00851 _schedule->close_writer(iter);
00852 }
00853
00854 outcome removal_applier(letter &to_promote, int uid, void *datalink)
00855 {
00856 #define static_class_name() "scheduler"
00857 FUNCDEF("removal_applier");
00858 if (uid) {}
00859 waiting_item *item = (waiting_item *)&to_promote;
00860 int to_whack = *(int *)datalink;
00861
00862 #ifdef DEBUG_SCHEDULER
00863 if (item->_held->item_id() == to_whack)
00864 LOG(istring(istring::SPRINTF, "whacking item %x for actor %d "
00865 "from pending.", to_whack, uid));
00866 #endif
00867
00868 if (item->_held->item_id() == to_whack) {
00869 WHACK(item->_held);
00870 return mailbox::APPLY_WHACK_STOP;
00871 }
00872 return mailbox::OKAY;
00873 #undef class_name
00874 }
00875
00876 outcome scheduler::remove_by_id(scheduling_id to_remove)
00877 {
00878 FUNCDEF("remove_by_id");
00879 safe_list_write_iterator *iter = _schedule->open_writer();
00880 bool found_it = false;
00881 while (!iter->is_tail()) {
00882 schedulable *found = dynamic_cast<schedulable *>(iter->access());
00883 if (found->item_id() == to_remove) {
00884 LIFT_WEIGHT_AND_VERIFY(found->actor_id(), found->weight());
00885 #ifdef DEBUG_SCHEDULER
00886 LOG(istring(istring::SPRINTF, "found item %d for actor %d to whack.",
00887 to_remove.raw_id(), found->actor_id()));
00888 #endif
00889 _schedule->zap(*iter);
00890 found_it = true;
00891 break;
00892 }
00893 iter->next();
00894 }
00895
00896 if (!found_it) {
00897
00898 int id = to_remove.raw_id();
00899 _pending_sends->apply(removal_applier, (void *)&id);
00900 }
00901
00902 _schedule->close_writer(iter);
00903 return OKAY;
00904 }
00905
00906 outcome filter_actor_applier(letter &to_promote, int uid, void *datalink)
00907 {
00908 #undef static_class_name
00909 #define static_class_name() "scheduler"
00910 FUNCDEF("filter_actor_applier");
00911 if (uid) {}
00912 waiting_item *item = (waiting_item *)&to_promote;
00913 int_set *healthy_actors = (int_set *)datalink;
00914
00915 if (!healthy_actors->member(item->_held->actor_id())) {
00916 #ifdef DEBUG_SCHEDULER
00917 LOG(istring(istring::SPRINTF, "whacking item %d for actor %d "
00918 "from pending.", item->_held->actor_id(), uid));
00919 #endif
00920 WHACK(item->_held);
00921 return mailbox::APPLY_WHACK;
00922 }
00923
00924 return mailbox::OKAY;
00925 #undef class_name
00926 }
00927
00928 void scheduler::filter_missing_actors(const int_set &valid_actors)
00929 {
00930 FUNCDEF("filter_missing_actors");
00931 safe_list_write_iterator *iter = _schedule->open_writer();
00932 while (!iter->is_tail()) {
00933 schedulable *found = dynamic_cast<schedulable *>(iter->access());
00934 if (found && !valid_actors.member(found->actor_id())) {
00935
00936 LIFT_WEIGHT_AND_VERIFY(found->actor_id(), found->weight());
00937 #ifdef DEBUG_SCHEDULER
00938 LOG(istring(istring::SPRINTF, "cleaning item %d for dead conn %d.",
00939 found->item_id().raw_id(), found->actor_id()));
00940 #endif
00941 _schedule->zap(*iter);
00942 } else
00943 iter->next();
00944 }
00945
00946
00947 _pending_sends->apply