00001 #ifndef BUFFER_MANAGER_IMPLEMENTATION_FILE
00002 #define BUFFER_MANAGER_IMPLEMENTATION_FILE
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018 #include "buffer_key.h"
00019 #include "buffer_manager.h"
00020 #include "implem_only.h"
00021
00022 #include <basis/log_base.h>
00023 #include <basis/mutex.h>
00024 #include <data_struct/amorph.cpp>
00025 #include <data_struct/memory_limiter.h>
00026 #include <mechanisms/time_stamp.h>
00027 #include <textual/string_manipulation.h>
00028
00029 #define LOCKIT auto_synchronizer l(*_buffer_lock);
00030
00031
00032 #define LOG(to_print) CLASS_EMERGENCY_LOG(program_wide_logger(), to_print);
00033
00035
00036 class internal_buffer_list : public amorph<buffer_base> {};
00037
00039
00040 buffer_manager::buffer_manager(int total_memory, int connection_memory)
00041 : _buffer_lock(new mutex),
00042 _buffer_list(new internal_buffer_list()),
00043 _space_minder(new memory_limiter(total_memory, connection_memory))
00044 {}
00045
00046 buffer_manager::~buffer_manager()
00047 {
00048 WHACK(_buffer_list);
00049 WHACK(_buffer_lock);
00050 WHACK(_space_minder);
00051 }
00052
00053 int buffer_manager::buffers() const
00054 {
00055 LOCKIT;
00056 return _buffer_list->elements();
00057 }
00058
00059 int buffer_manager::total_memory_allowed() const
00060 {
00061 LOCKIT;
00062 return _space_minder->overall_limit();
00063 }
00064
00065 int buffer_manager::total_allocated() const
00066 {
00067 LOCKIT;
00068 return _space_minder->overall_usage();
00069 }
00070
00071 bool buffer_manager::okay_addition(buffer_base &to_check)
00072 {
00073 FUNCDEF("okay_addition");
00074 LOCKIT;
00075 int new_size = 0;
00076 if (to_check.whole() && !to_check.did_estimate())
00077 new_size = to_check.full_length();
00078 else new_size = to_check.guess_size();
00079 if (negative(new_size)) return false;
00080
00081 if (!_space_minder->okay_allocation(to_check.key().connection, new_size)) {
00082
00083
00084 find_space();
00085
00086 if (!_space_minder->okay_allocation(to_check.key().connection, new_size)) {
00087
00088 return false;
00089 }
00090 }
00091 return true;
00092 }
00093
00094 void buffer_manager::find_space()
00095 {
00096 FUNCDEF("find_space");
00097 LOCKIT;
00098
00099
00100
00101 for (int i = 0; i < _buffer_list->elements(); i++) {
00102 buffer_base *to_check = _buffer_list->borrow(i);
00103 if (to_check->did_estimate() && to_check->whole()) {
00104
00105
00106 int space_to_remove = to_check->guess_size() - to_check->full_length();
00107 if (space_to_remove < 0) {
00108
00109 LOG(istring(istring::SPRINTF, "estimated size was less than "
00110 "required amount! for conn id %d, buff id %d, estimate=%d, "
00111 "real_size=%d.", to_check->key().connection,
00112 to_check->key().buff_id.raw_id(),
00113 to_check->guess_size(), to_check->full_length()));
00114 continue;
00115 }
00116 to_check->reset_estimate();
00117 if (!_space_minder->record_deletion(to_check->key().connection,
00118 space_to_remove)) {
00119 LOG(istring(istring::SPRINTF, "failed to properly update "
00120 "buffer record for conn id %d, buff id %d.",
00121 to_check->key().connection, to_check->key().buff_id.raw_id()));
00122 }
00123 }
00124 }
00125 }
00126
00127 void buffer_manager::record_subtraction(buffer_base &to_check)
00128 {
00129 FUNCDEF("record_subtraction");
00130 LOCKIT;
00131 int old_size = 0;
00132 if (to_check.did_estimate() || !to_check.whole())
00133 old_size = to_check.guess_size();
00134 else old_size = to_check.full_length();
00135 if (negative(old_size)) return;
00136 if (!_space_minder->record_deletion(to_check.key().connection, old_size)) {
00137 LOG(istring(istring::SPRINTF, "failed to properly update "
00138 "buffer record for conn id %d, buff id %d.",
00139 to_check.key().connection, to_check.key().buff_id.raw_id()));
00140 }
00141 }
00142
00143 int buffer_manager::locked_find(int &start, const buffer_key &key)
00144 {
00145 FUNCDEF("locked_find");
00146 bounds_return(start, 0, _buffer_list->elements() - 1, buffer_base::BAD_INPUT);
00147
00148 for (int i = start; i < _buffer_list->elements(); i++) {
00149
00150
00151
00152 if ( (key.buff_id == _buffer_list->get(i)->key().buff_id)
00153 || (!key.buff_id
00154 && (_buffer_list->get(i)->key().connection == key.connection)
00155 && (!key.msg_id
00156 || (_buffer_list->get(i)->key().msg_id == key.msg_id)))) {
00157 start = i + 1;
00158 return i;
00159 }
00160 }
00161 return common::NOT_FOUND;
00162 }
00163
00164 outcome buffer_manager::add_buffer(buffer_base *to_add)
00165 {
00166 FUNCDEF("add_buffer");
00167 if (!to_add) return buffer_base::BAD_INPUT;
00168 LOCKIT;
00169 buffer_key key = to_add->key();
00170 if (!key.connection) {
00171
00172 LOG("buffer was passed in with a connection identifier of zero!");
00173 WHACK(to_add);
00174 return buffer_base::BAD_INPUT;
00175 }
00176 if (!key.buff_id) {
00177 LOG("buffer was passed in with a buffer identifier of zero!");
00178 WHACK(to_add);
00179 return buffer_base::BAD_INPUT;
00180 }
00181
00182
00183 int start = 0;
00184 if (locked_locate(start, key)) {
00185 LOG(istring(istring::SPRINTF, "the buffer %d already exists!!!",
00186 key.buff_id.raw_id()));
00187 WHACK(to_add);
00188 return buffer_base::IN_USE;
00189 }
00190
00191
00192
00193 if (!okay_addition(*to_add)) {
00194
00195 WHACK(to_add);
00196 return buffer_base::NO_SPACE;
00197 }
00198
00199
00200 _buffer_list->append(to_add);
00201 return buffer_base::OKAY;
00202 }
00203
00204 buffer_base *buffer_manager::lock_buffer(int &start, const buffer_key &key)
00205 {
00206 _buffer_lock->lock();
00207 buffer_base *to_return = locked_locate(start, key);
00208 if (!to_return) {
00209 _buffer_lock->unlock();
00210 return NIL;
00211 }
00212
00213 return to_return;
00214 }
00215
00216 void buffer_manager::unlock_buffer(buffer_base *to_unlock)
00217 {
00218 if (!to_unlock) return;
00219
00220 to_unlock = NIL;
00221 _buffer_lock->unlock();
00222 }
00223
00224 buffer_base *buffer_manager::locked_locate(int &start, const buffer_key &key)
00225 {
00226 FUNCDEF("locked_locate");
00227 int index = locked_find(start, key);
00228 if (negative(index)) return NIL;
00229 return _buffer_list->borrow(index);
00230 }
00231
00232 bool buffer_manager::zap(buffer_id to_whack)
00233 {
00234 FUNCDEF("zap");
00235 LOCKIT;
00236 int start = 0;
00237 buffer_key key(to_whack, 0, 0);
00238 int index = locked_find(start, key);
00239 if (negative(index)) return false;
00240
00241 key = _buffer_list->get(index)->key();
00242
00243 buffer_base *found = _buffer_list->acquire(index);
00244
00245 _buffer_list->zap(index, index);
00246
00247 record_subtraction(*found);
00248 WHACK(found);
00249 return true;
00250 }
00251
00252 bool buffer_manager::zap_match(int &start, const buffer_key &key)
00253 {
00254 FUNCDEF("zap_match");
00255 LOCKIT;
00256 int index = locked_find(start, key);
00257 if (negative(index)) return false;
00258
00259 buffer_base *found = _buffer_list->acquire(index);
00260
00261 _buffer_list->zap(index, index);
00262
00263 record_subtraction(*found);
00264 WHACK(found);
00265 return true;
00266 }
00267
00268 bool buffer_manager::zap_all_matches(const buffer_key &key)
00269 {
00270 FUNCDEF("zap_all_matches");
00271 LOCKIT;
00272 int start = 0;
00273 int index = locked_find(start, key);
00274 if (negative(index)) return false;
00275 while (non_negative(index)) {
00276
00277 buffer_base *found = _buffer_list->acquire(index);
00278
00279 _buffer_list->zap(index, index);
00280
00281 record_subtraction(*found);
00282 WHACK(found);
00283 start = 0;
00284 index = locked_find(start, key);
00285 }
00286 return true;
00287 }
00288
00289 void buffer_manager::zap_all()
00290 {
00291 FUNCDEF("zap_all");
00292 LOCKIT;
00293 _buffer_list->zap(0, _buffer_list->elements() - 1);
00294 _space_minder->reset();
00295 }
00296
00297 bool buffer_manager::zap_expired(istring &text_form, buffer_key &dead_key,
00298 int &start, int age, kind_to_find which)
00299 {
00300 FUNCDEF("zap_expired");
00301 LOCKIT;
00302 if (start > _buffer_list->elements() - 1) return false;
00303 time_stamp comparator(-age);
00304 for (int i = start; i < _buffer_list->elements(); i++) {
00305 if (!_buffer_list->get(i)->alive()) {
00306
00307 record_subtraction(*_buffer_list->borrow(i));
00308 _buffer_list->zap(i, i);
00309 i--;
00310 } else if ( (_buffer_list->get(i)->last_add() <= comparator)
00311 && ( (which == EITHER)
00312 || (_buffer_list->get(i)->whole() && (which == WHOLE))
00313 || (!_buffer_list->get(i)->whole() && (which == NON_WHOLE)) ) ) {
00314 start = i + 1;
00315 text_form = _buffer_list->get(i)->text_form();
00316 dead_key = _buffer_list->get(i)->key();
00317 record_subtraction(*_buffer_list->borrow(i));
00318 _buffer_list->zap(i, i);
00319 return true;
00320 }
00321 }
00322 return false;
00323 }
00324
00325 void buffer_manager::show_buffers(istring &dump, int indenting)
00326 {
00327 istring indent = string_manipulation::indentation(indenting);
00328 LOCKIT;
00329 if (!_buffer_list->elements()) {
00330 dump += indent + "No buffers are presently held."
00331 + log_base::platform_ending() + log_base::platform_ending();
00332 return;
00333 }
00334 for (int j = 0; j < _buffer_list->elements(); j++) {
00335 dump += indent + _buffer_list->get(j)->detailed_form();
00336 dump += log_base::platform_ending();
00337 }
00338 dump += log_base::platform_ending();
00339 dump += istring("Memory Usage: ") + _space_minder->text_form(indenting);
00340 dump += log_base::platform_ending();
00341 dump += log_base::platform_ending();
00342 }
00343
00344 buffer_base *buffer_manager::disengage(buffer_key &key)
00345 {
00346 FUNCDEF("disengage");
00347 LOCKIT;
00348 int start = 0;
00349 int index = locked_find(start, key);
00350 if (negative(index)) return NIL;
00351 buffer_base *to_return = _buffer_list->acquire(index);
00352 _buffer_list->zap(index, index);
00353 record_subtraction(*to_return);
00354 return to_return;
00355 }
00356
00357 outcome buffer_manager::extract(buffer_key &key, byte_array &to_fill,
00358 byte_array &attachment, int offset, bool remove_buffer)
00359 {
00360 FUNCDEF("extract");
00361 LOCKIT;
00362 int check_position = 0;
00363 while (check_position < buffers()) {
00364 buffer_base *buff = locked_locate(check_position, key);
00365 if (!buff) return buffer_base::INCOMPLETE;
00366 if (buff->whole()) {
00367 outcome ret = buff->dump(to_fill, offset);
00368 key = buff->key();
00369
00370 if (ret == buffer_base::OKAY) {
00371 buff->get_attachment(attachment);
00372 }
00373
00374
00375 if ( (ret == buffer_base::OKAY) && remove_buffer) {
00376 buffer_key key_to_whack = _buffer_list->get(check_position - 1)->key();
00377 if (!zap(key_to_whack.buff_id)) {
00378 LOG(istring(istring::SPRINTF, "could not whack buffer %d.",
00379 key_to_whack.buff_id.raw_id()));
00380 }
00381 }
00382 return ret;
00383 }
00384 }
00385 return buffer_base::INCOMPLETE;
00386 }
00387
00388
00389 #endif //BUFFER_MANAGER_IMPLEMENTATION_FILE
00390