00001 #ifndef THREAD_CABINET_IMPLEMENTATION_FILE
00002 #define THREAD_CABINET_IMPLEMENTATION_FILE
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018 #include "ithread.h"
00019 #include "roller.cpp"
00020 #include "thread_cabinet.h"
00021
00022 #include <basis/set.cpp>
00023 #include <basis/log_base.h>
00024 #include <basis/mutex.h>
00025 #include <basis/portable.h>
00026 #include <data_struct/amorph.cpp>
00027 #include <data_struct/unique_id.h>
00028
00029 #undef LOCKIT
00030 #define LOCKIT auto_synchronizer l(*_lock)
00031
00032
00033 #undef LOG
00034 #define LOG(s) CLASS_EMERGENCY_LOG(program_wide_logger(), s)
00035
00036
00037
00038
00040
00041 class thread_record
00042 {
00043 public:
00044 ithread *_thread;
00045 unique_int _id;
00046
00047 thread_record(const unique_int &id, ithread *t)
00048 : _thread(t), _id(id) {}
00049
00050 ~thread_record() {
00051 _thread->stop();
00052 WHACK(_thread);
00053 }
00054 };
00055
00057
00058 class thread_amorph : public amorph<thread_record>
00059 {
00060 public:
00061 };
00062
00064
00065 thread_cabinet::thread_cabinet()
00066 : _lock(new mutex),
00067 _threads(new thread_amorph),
00068 _next_id(new int_roller(1, MAXINT - 1)),
00069 _no_additions(0)
00070 {
00071 }
00072
00073 thread_cabinet::~thread_cabinet()
00074 {
00075 WHACK(_threads);
00076 WHACK(_lock);
00077 WHACK(_next_id);
00078 }
00079
00080 int thread_cabinet::threads() const { return _threads->elements(); }
00081
00082 unique_int thread_cabinet::add_thread(ithread *to_add, bool start_it,
00083 void *parm)
00084 {
00085 FUNCDEF("add_thread");
00086 LOCKIT;
00087 if (_no_additions) {
00088 #ifdef DEBUG_THREAD_CABINET
00089 LOG("no additions flag is true; destroying the thread and failing out.");
00090 #endif
00091
00092 WHACK(to_add);
00093 return 0;
00094 }
00095 int use_id = _next_id->next_id();
00096 if (start_it) {
00097 to_add->start(parm);
00098 } else {
00099 #ifdef DEBUG_THREAD_CABINET
00100 if (to_add->thread_finished())
00101 LOG(isprintf("thread %x is not going to be started and it "
00102 "hasn't started yet!", to_add));
00103 #endif
00104 }
00105 _threads->append(new thread_record(use_id, to_add));
00106 return use_id;
00107 }
00108
00109 bool thread_cabinet::any_running() const
00110 {
00111 LOCKIT;
00112 for (int i = 0; i < _threads->elements(); i++) {
00113 if (_threads->borrow(i)->_thread->thread_started()) return true;
00114 }
00115 return false;
00116 }
00117
00118 void thread_cabinet::start_all(void *ptr)
00119 {
00120 LOCKIT;
00121 for (int i = 0; i < _threads->elements(); i++) {
00122 if (_threads->borrow(i)->_thread->thread_finished()) {
00123 _threads->borrow(i)->_thread->start(ptr);
00124 }
00125 }
00126 }
00127
00128 void thread_cabinet::cancel_all()
00129 {
00130 FUNCDEF("cancel_all");
00131 {
00132 LOCKIT;
00133 _no_additions++;
00134 for (int i = 0; i < _threads->elements(); i++) {
00135 _threads->borrow(i)->_thread->cancel();
00136 }
00137 }
00138 LOCKIT;
00139 _no_additions--;
00140 if (_no_additions < 0)
00141 continuable_error(class_name(), func, "error in logic regarding "
00142 "no additions.");
00143 }
00144
00145 void thread_cabinet::stop_all()
00146 {
00147 FUNCDEF("stop_all");
00148 {
00149 LOCKIT;
00150 _no_additions++;
00151 }
00152 cancel_all();
00153
00154 portable::sleep_ms(20);
00155 while (true) {
00156 LOCKIT;
00157 if (!_threads->elements()) break;
00158 clean_debris();
00159 portable::sleep_ms(20);
00160 }
00161 LOCKIT;
00162 _no_additions--;
00163 if (_no_additions < 0)
00164 continuable_error(class_name(), func, "error in logic regarding "
00165 "no additions.");
00166 }
00167
00168 bool thread_cabinet::zap_thread(const unique_int &to_whack)
00169 {
00170 LOCKIT;
00171 for (int i = 0; i < _threads->elements(); i++) {
00172 if (_threads->borrow(i)->_id == to_whack) {
00173
00174 _threads->zap(i, i);
00175 return true;
00176 }
00177 }
00178 return false;
00179 }
00180
00181 bool thread_cabinet::cancel_thread(const unique_int &to_cancel)
00182 {
00183 LOCKIT;
00184 for (int i = 0; i < _threads->elements(); i++) {
00185 if (_threads->borrow(i)->_id == to_cancel) {
00186
00187 _threads->borrow(i)->_thread->cancel();
00188 return true;
00189 }
00190 }
00191 return false;
00192 }
00193
00194 void thread_cabinet::clean_debris()
00195 {
00196 FUNCDEF("clean_debris");
00197 LOCKIT;
00198 for (int i = 0; i < _threads->elements(); i++) {
00199 if (_threads->borrow(i)->_thread->thread_finished()) {
00200
00201 #ifdef DEBUG_THREAD_CABINET
00202 LOG(isprintf("clearing thread %x out.", _threads->borrow(i)->_thread));
00203 #endif
00204 _threads->zap(i, i);
00205 i--;
00206 }
00207 }
00208 }
00209
00210 int_set thread_cabinet::thread_ids() const
00211 {
00212 LOCKIT;
00213 int_set to_return;
00214 for (int i = 0; i < _threads->elements(); i++)
00215 to_return += _threads->borrow(i)->_id.raw_id();
00216 return to_return;
00217 }
00218
00219 ithread *thread_cabinet::get_thread(int index)
00220 {
00221 LOCKIT;
00222 thread_record *rec = _threads->borrow(index);
00223 if (rec) return rec->_thread;
00224 return NIL;
00225 }
00226
00227
00228 #endif //THREAD_CABINET_IMPLEMENTATION_FILE
00229