00001 #ifndef BUFFER_IMPLEMENTATION_FILE
00002 #define BUFFER_IMPLEMENTATION_FILE
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018 #include "buffer.h"
00019 #include "buffer_key.h"
00020 #include "implem_only.h"
00021
00022 #include <basis/byte_array.h>
00023 #include <basis/mutex.h>
00024 #include <basis/set.cpp>
00025 #include <data_struct/bit_vector.h>
00026 #include <data_struct/span_manager.h>
00027 #include <mechanisms/time_stamp.h>
00028
00030
00031
00032
00033 class buffer_pack_array : public array<byte_array>
00034 {
00035 public:
00036 buffer_pack_array(int num = 0) : array<byte_array>(num) {}
00037 };
00038
00040
00041 buffer::buffer(const buffer_key &key, byte_array &contents,
00042 int packet_size)
00043 : _lock(new mutex),
00044 _owner_alerted(false),
00045 _alive(true),
00046 _maximum_packet(packet_size),
00047 _key(new buffer_key(key)),
00048 _actual_storage(new byte_array),
00049 _is_storage_active(true),
00050 _last_add(new time_stamp()),
00051 _attachment(new byte_array()),
00052 _packet_list(new buffer_pack_array(number_of_packets
00053 (int(contents.length()), packet_size))),
00054 _buffer_status(new span_manager(number_of_packets(int(contents.length()),
00055 packet_size))),
00056 _estimation(0),
00057 _references(new int_set)
00058 {
00059
00060 _actual_storage->snarf(contents);
00061
00062
00063 int_array status_init(2);
00064 status_init[0] = 0;
00065 status_init[1] = short(_packet_list->length() - 1);
00066 _buffer_status->update(status_init);
00067
00068
00069 int offset = 0;
00070 for (int i = 0; i < _packet_list->length(); i++) {
00071 int len = packet_size;
00072
00073 if (i == _packet_list->length() - 1)
00074 len = last_packet_size(int(_actual_storage->length()), packet_size);
00075
00076 _packet_list->use(i).reset(len, _actual_storage->access() + offset);
00077 offset += len;
00078 }
00079 }
00080
00081 buffer::buffer(const buffer_key &key, int total_packets, int maximum_packet)
00082 : _lock(new mutex),
00083 _owner_alerted(false),
00084 _alive(true),
00085 _maximum_packet(maximum_packet),
00086 _key(new buffer_key(key)),
00087 _actual_storage(new byte_array()),
00088 _is_storage_active(false),
00089 _last_add(new time_stamp()),
00090 _attachment(new byte_array()),
00091 _packet_list(new buffer_pack_array(total_packets)),
00092 _buffer_status(new span_manager(total_packets)),
00093 _estimation(0),
00094 _references(new int_set)
00095 {}
00096
00097 buffer::~buffer()
00098 {
00099 release();
00100 WHACK(_attachment);
00101 WHACK(_buffer_status);
00102 WHACK(_key);
00103 WHACK(_last_add);
00104 WHACK(_packet_list);
00105 WHACK(_lock);
00106 WHACK(_references);
00107 WHACK(_actual_storage);
00108 }
00109
00110 void buffer::reset_estimate() { _estimation = 0; }
00111
00112 bool buffer::did_estimate() const { return !!_estimation; }
00113
00114 int buffer::estimate() const { return _estimation; }
00115
00116 bool buffer::alive() const { return _alive; }
00117
00118 bool buffer::owner_alerted() const { return _owner_alerted; }
00119
00120 void buffer::set_alerted(bool owner_alerted)
00121 { _owner_alerted = owner_alerted; }
00122
00123 span_manager buffer::buffer_status() const
00124 { auto_synchronizer l(*_lock); return *_buffer_status; }
00125
00126 int buffer::references() const
00127 { auto_synchronizer l(*_lock); return _references->elements(); }
00128
00129 int_set buffer::get_references() const
00130 { auto_synchronizer l(*_lock); return *_references; }
00131
00132 void buffer::add_reference(int conn_id)
00133 { auto_synchronizer l(*_lock); *_references += conn_id; }
00134
00135 void buffer::remove_reference(int conn_id)
00136 { auto_synchronizer l(*_lock); *_references -= conn_id; }
00137
00138 void buffer::set_last_add(const time_stamp &new_last_add)
00139 { auto_synchronizer l(*_lock); *_last_add = new_last_add; }
00140
00141 time_stamp buffer::last_add() const
00142 { auto_synchronizer l(*_lock); return *_last_add; }
00143
00144 buffer_key buffer::key() const
00145 { auto_synchronizer l(*_lock); return *_key; }
00146
00147 void buffer::set_key(const buffer_key &to_set)
00148 { auto_synchronizer l(*_lock); *_key = to_set; }
00149
00150 void buffer::get_attachment(byte_array &storage) const
00151 { auto_synchronizer l(*_lock); storage = *_attachment; }
00152
00153 void buffer::set_attachment(const byte_array &new_attach)
00154 { auto_synchronizer l(*_lock); *_attachment = new_attach; }
00155
00156 int buffer::packets() const
00157 { auto_synchronizer l(*_lock); return _packet_list->length(); }
00158
00159 bool buffer::whole() const
00160 { auto_synchronizer l(*_lock); return _buffer_status->vector().whole(); }
00161
00162 bool buffer::empty() const
00163 { auto_synchronizer l(*_lock); return _buffer_status->vector().empty(); }
00164
00165 int_array buffer::packet_sizes() const
00166 {
00167 int_array to_return;
00168 auto_synchronizer l(*_lock);
00169 for (int i = 0; i < _packet_list->length(); i++)
00170 to_return += _packet_list->get(i).length();
00171 return to_return;
00172 }
00173
00174 void buffer::release()
00175 {
00176 FUNCDEF("release");
00177 auto_synchronizer l(*_lock);
00178 _last_add->reset();
00179 for (int i = 0; i < _packet_list->length(); i++)
00180 _packet_list->use(i).reset();
00181 WHACK(_actual_storage);
00182
00183 _is_storage_active = false;
00184 _key->connection = 0;
00185 _key->msg_id = 0;
00186 _packet_list->reset(0);
00187 _buffer_status->reset(0);
00188 _owner_alerted = false;
00189 _estimation = 0;
00190 _alive = true;
00191 }
00192
00193 istring buffer::text_form() const
00194 {
00195 auto_synchronizer l(*_lock);
00196 return istring(istring::SPRINTF, "[%d] size=%d alive=%s conn_id=%d "
00197 "msg_id=%d whole?=%s mod_time=%s", _key->buff_id.raw_id(),
00198 (whole()? full_length() : (_estimation? _estimation : -1) ),
00199 _alive? "true" : "false", _key->connection, _key->msg_id,
00200 whole()? "true":"false", _last_add->text_form().observe());
00201
00202
00203 }
00204
00205 istring buffer::detailed_form(bool really_detailed) const
00206 {
00207 FUNCDEF("detailed_form");
00208 auto_synchronizer l(*_lock);
00209 istring to_return(text_form() + istring(" "));
00210 to_return += _buffer_status->print_received_list();
00211 if (really_detailed) {
00212 to_return += " lengths: ";
00213 for (int i = 0; i < packets(); i++) {
00214 if (_buffer_status->vector().on(i))
00215 to_return += istring(istring::SPRINTF, "[%d=%d] ", i,
00216 _packet_list->get(i).length());
00217 else to_return += istring(istring::SPRINTF, "[%d=empty] ", i);
00218 }
00219 }
00220 return to_return;
00221 }
00222
00223 void buffer::release(int packet_number, release_style status_handling)
00224 {
00225 FUNCDEF("release [pack/style]");
00226 auto_synchronizer l(*_lock);
00227 if (!_alive) return;
00228 bounds_return(packet_number, 0, _packet_list->length() - 1, );
00229 if (_is_storage_active) return;
00230 _last_add->reset();
00231
00232 if (status_handling == RESET_STATUS)
00233 _buffer_status->vector().clear(packet_number);
00234 _packet_list->use(packet_number).reset();
00235 }
00236
00237 outcome buffer::store_packet(int packet_number, const byte_array &buffer)
00238 {
00239 FUNCDEF("store_packet");
00240 auto_synchronizer l(*_lock);
00241 if (!_alive) return BAD_INPUT;
00242 bounds_return(packet_number, 0, _packet_list->length() - 1, BAD_INPUT);
00243 if (_is_storage_active) return BAD_INPUT;
00244
00245 _last_add->reset();
00246 if (_buffer_status->vector().on(packet_number)) return IN_USE;
00247
00248 _packet_list->use(packet_number) = buffer;
00249 _buffer_status->vector().light(packet_number);
00250 return OKAY;
00251 }
00252
00253 byte_array buffer::get_packet(int packet_number) const
00254 {
00255 FUNCDEF("get_packet");
00256 auto_synchronizer l(*_lock);
00257 if (!_alive) return byte_array();
00258 bounds_return(packet_number, 0, _packet_list->length() - 1, byte_array());
00259 return _packet_list->get(packet_number);
00260 }
00261
00262 int buffer::packet_length(int packet_number) const
00263 {
00264 FUNCDEF("packet_length");
00265 auto_synchronizer l(*_lock);
00266 if (!_alive) return 0;
00267 bounds_return(packet_number, 0, _packet_list->length() - 1, 0);
00268 return _packet_list->get(packet_number).length();
00269 }
00270
00271 int buffer::full_length(int offset) const
00272 {
00273 FUNCDEF("full_length");
00274 auto_synchronizer l(*_lock);
00275 if (!_alive) return 0;
00276 if (offset < 0) return BAD_INPUT;
00277 int to_return = 0;
00278
00279 if (!whole()) return INCOMPLETE;
00280 for (int i = 0; i < _packet_list->length(); i++) {
00281
00282 int add_in = _packet_list->get(i).length() - offset;
00283 if (add_in < 0) return BAD_INPUT;
00284 to_return += add_in;
00285 }
00286 return to_return;
00287 }
00288
00289 int buffer::guess_size(int formal(offset))
00290 {
00291 auto_synchronizer l(*_lock);
00292
00293 if (!did_estimate())
00294 _estimation = _packet_list->length() * _maximum_packet;
00295 return _estimation;
00296 }
00297
00298 outcome buffer::dump(byte_array &storage_location, int offset)
00299 {
00300 FUNCDEF("dump");
00301 auto_synchronizer l(*_lock);
00302 if (!_alive) return BAD_INPUT;
00303 _last_add->reset();
00304 if (offset < 0) return BAD_INPUT;
00305 if (!whole()) return INCOMPLETE;
00306 int len = full_length(offset);
00307 if (len < 0) return outcomes(len);
00308 storage_location.reset(int(len));
00309 if (!storage_location.observe()) return OUT_OF_MEMORY;
00310
00311 int store_offset = 0;
00312
00313 for (int i = 0; i < _packet_list->length(); i++) {
00314 if (offset > _packet_list->get(i).length() - 1) return BAD_INPUT;
00315
00316 byte_array new_chunk = _packet_list->get(i).subarray(offset,
00317 _packet_list->get(i).length() - 1);
00318 storage_location.overwrite(store_offset, new_chunk);
00319
00320 store_offset += new_chunk.length();
00321 }
00322 return OKAY;
00323 }
00324
00325
00326 #endif //BUFFER_IMPLEMENTATION_FILE
00327