buffer.cpp

Go to the documentation of this file.
00001 #ifndef BUFFER_IMPLEMENTATION_FILE
00002 #define BUFFER_IMPLEMENTATION_FILE
00003 
00004 /*****************************************************************************\
00005 *                                                                             *
00006 *  Name   : buffer                                                            *
00007 *  Author : Chris Koeritz                                                     *
00008 *                                                                             *
00009 *******************************************************************************
00010 * Copyright (c) 1992-$now By Author.  This program is free software; you can  *
00011 * redistribute it and/or modify it under the terms of the GNU General Public  *
00012 * License as published by the Free Software Foundation; either version 2 of   *
00013 * the License or (at your option) any later version.  This is online at:      *
00014 *     http://www.fsf.org/copyleft/gpl.html                                    *
00015 * Please send any updates to: fred@gruntose.com                               *
00016 \*****************************************************************************/
00017 
00018 #include "buffer.h"
00019 #include "buffer_key.h"
00020 #include "implem_only.h"
00021 
00022 #include <basis/byte_array.h>
00023 #include <basis/mutex.h>
00024 #include <basis/set.cpp>
00025 #include <data_struct/bit_vector.h>
00026 #include <data_struct/span_manager.h>
00027 #include <mechanisms/time_stamp.h>
00028 
00030 
00031 // holds onto the packet information for the buffer.
00032 
00033 class buffer_pack_array : public array<byte_array>
00034 {
00035 public:
00036   buffer_pack_array(int num = 0) : array<byte_array>(num) {}
00037 };
00038 
00040 
00041 buffer::buffer(const buffer_key &key, byte_array &contents,
00042     int packet_size)
00043 : _lock(new mutex),
00044   _owner_alerted(false),
00045   _alive(true),
00046   _maximum_packet(packet_size),
00047   _key(new buffer_key(key)),
00048   _actual_storage(new byte_array),
00049   _is_storage_active(true),
00050   _last_add(new time_stamp()),
00051   _attachment(new byte_array()),
00052   _packet_list(new buffer_pack_array(number_of_packets
00053        (int(contents.length()), packet_size))),
00054   _buffer_status(new span_manager(number_of_packets(int(contents.length()),
00055       packet_size))),
00056   _estimation(0),
00057   _references(new int_set)
00058 {
00059   // snarf the contents out of the passed in array.
00060   _actual_storage->snarf(contents);
00061 
00062   // mark that all packets are in place already.
00063   int_array status_init(2);  // the range of the marking.
00064   status_init[0] = 0;
00065   status_init[1] = short(_packet_list->length() - 1);
00066   _buffer_status->update(status_init);  // state that whole buffer is in place.
00067 
00068   // loop through and point at places in the buffer.
00069   int offset = 0;
00070   for (int i = 0; i < _packet_list->length(); i++) {
00071     int len = packet_size;
00072     // last packet is a different length usually.
00073     if (i == _packet_list->length() - 1)
00074       len = last_packet_size(int(_actual_storage->length()), packet_size);
00075     // store the data.
00076     _packet_list->use(i).reset(len, _actual_storage->access() + offset);
00077     offset += len;  // update the place we're pointing at.
00078   }
00079 }
00080 
00081 buffer::buffer(const buffer_key &key, int total_packets, int maximum_packet)
00082 : _lock(new mutex),
00083   _owner_alerted(false),
00084   _alive(true),
00085   _maximum_packet(maximum_packet),
00086   _key(new buffer_key(key)),
00087   _actual_storage(new byte_array()),
00088   _is_storage_active(false),
00089   _last_add(new time_stamp()),
00090   _attachment(new byte_array()),
00091   _packet_list(new buffer_pack_array(total_packets)),
00092   _buffer_status(new span_manager(total_packets)),
00093   _estimation(0),
00094   _references(new int_set)
00095 {}
00096 
00097 buffer::~buffer()
00098 {
00099   release();
00100   WHACK(_attachment);
00101   WHACK(_buffer_status);
00102   WHACK(_key);
00103   WHACK(_last_add);
00104   WHACK(_packet_list);
00105   WHACK(_lock);
00106   WHACK(_references);
00107   WHACK(_actual_storage);  // done in release, but what the hey.
00108 }
00109 
00110 void buffer::reset_estimate() { _estimation = 0; }
00111 
00112 bool buffer::did_estimate() const { return !!_estimation; }
00113 
00114 int buffer::estimate() const { return _estimation; }
00115 
00116 bool buffer::alive() const { return _alive; }
00117 
00118 bool buffer::owner_alerted() const { return _owner_alerted; }
00119 
00120 void buffer::set_alerted(bool owner_alerted)
00121 { _owner_alerted = owner_alerted; }
00122 
00123 span_manager buffer::buffer_status() const
00124 { auto_synchronizer l(*_lock); return *_buffer_status; }
00125 
00126 int buffer::references() const
00127 { auto_synchronizer l(*_lock); return _references->elements(); }
00128 
00129 int_set buffer::get_references() const
00130 { auto_synchronizer l(*_lock); return *_references; }
00131 
00132 void buffer::add_reference(int conn_id)
00133 { auto_synchronizer l(*_lock); *_references += conn_id; }
00134 
00135 void buffer::remove_reference(int conn_id)
00136 { auto_synchronizer l(*_lock); *_references -= conn_id; }
00137 
00138 void buffer::set_last_add(const time_stamp &new_last_add)
00139 { auto_synchronizer l(*_lock); *_last_add = new_last_add; }
00140 
00141 time_stamp buffer::last_add() const
00142 { auto_synchronizer l(*_lock); return *_last_add; }
00143 
00144 buffer_key buffer::key() const
00145 { auto_synchronizer l(*_lock); return *_key; }
00146 
00147 void buffer::set_key(const buffer_key &to_set)
00148 { auto_synchronizer l(*_lock); *_key = to_set; }
00149 
00150 void buffer::get_attachment(byte_array &storage) const
00151 { auto_synchronizer l(*_lock); storage = *_attachment; }
00152 
00153 void buffer::set_attachment(const byte_array &new_attach)
00154 { auto_synchronizer l(*_lock); *_attachment = new_attach; }
00155 
00156 int buffer::packets() const
00157 { auto_synchronizer l(*_lock); return _packet_list->length(); }
00158 
00159 bool buffer::whole() const
00160 { auto_synchronizer l(*_lock); return _buffer_status->vector().whole(); }
00161 
00162 bool buffer::empty() const
00163 { auto_synchronizer l(*_lock); return _buffer_status->vector().empty(); }
00164 
00165 int_array buffer::packet_sizes() const
00166 {
00167   int_array to_return;
00168   auto_synchronizer l(*_lock);
00169   for (int i = 0; i < _packet_list->length(); i++)
00170     to_return += _packet_list->get(i).length();
00171   return to_return;
00172 }
00173 
00174 void buffer::release()
00175 {
00176   FUNCDEF("release");
00177   auto_synchronizer l(*_lock);
00178   _last_add->reset();
00179   for (int i = 0; i < _packet_list->length(); i++)
00180     _packet_list->use(i).reset();
00181   WHACK(_actual_storage);
00182   // reset all status indicators.
00183   _is_storage_active = false;  // no storage at all now.
00184   _key->connection = 0;
00185   _key->msg_id = 0;
00186   _packet_list->reset(0);
00187   _buffer_status->reset(0);
00188   _owner_alerted = false;
00189   _estimation = 0;  // reset our estimator.
00190   _alive = true;
00191 }
00192 
00193 istring buffer::text_form() const
00194 {
00195   auto_synchronizer l(*_lock);
00196   return istring(istring::SPRINTF, "[%d] size=%d alive=%s conn_id=%d "
00197       "msg_id=%d whole?=%s mod_time=%s", _key->buff_id.raw_id(),
00198       (whole()? full_length() : (_estimation? _estimation : -1) ),
00199       _alive? "true" : "false", _key->connection, _key->msg_id,
00200       whole()? "true":"false", _last_add->text_form().observe());
00201   // note that -1 is used here as a kludgy way of saying "has not been
00202   // determined yet".
00203 }
00204 
00205 istring buffer::detailed_form(bool really_detailed) const
00206 {
00207   FUNCDEF("detailed_form");
00208   auto_synchronizer l(*_lock);
00209   istring to_return(text_form() + istring(" "));
00210   to_return += _buffer_status->print_received_list();
00211   if (really_detailed) {
00212     to_return += " lengths: ";
00213     for (int i = 0; i < packets(); i++) {
00214       if (_buffer_status->vector().on(i))
00215         to_return += istring(istring::SPRINTF, "[%d=%d] ", i, 
00216             _packet_list->get(i).length());
00217       else to_return += istring(istring::SPRINTF, "[%d=empty] ", i);
00218     }
00219   }
00220   return to_return;
00221 }
00222 
00223 void buffer::release(int packet_number, release_style status_handling)
00224 {
00225   FUNCDEF("release [pack/style]");
00226   auto_synchronizer l(*_lock);
00227   if (!_alive) return;
00228   bounds_return(packet_number, 0, _packet_list->length() - 1, );
00229   if (_is_storage_active) return;  // illegal to release inside large message.
00230   _last_add->reset();  // reset the stamp.
00231   // clear out any thoughts that a packet resided in this index.
00232   if (status_handling == RESET_STATUS)
00233     _buffer_status->vector().clear(packet_number);
00234   _packet_list->use(packet_number).reset();
00235 }
00236 
00237 outcome buffer::store_packet(int packet_number, const byte_array &buffer)
00238 {
00239   FUNCDEF("store_packet");
00240   auto_synchronizer l(*_lock);
00241   if (!_alive) return BAD_INPUT;
00242   bounds_return(packet_number, 0, _packet_list->length() - 1, BAD_INPUT);
00243   if (_is_storage_active) return BAD_INPUT;
00244     // can't store into prealloced space.
00245   _last_add->reset();
00246   if (_buffer_status->vector().on(packet_number)) return IN_USE;
00247     // they can't overstore a packet; this is deemed an error.
00248   _packet_list->use(packet_number) = buffer;
00249   _buffer_status->vector().light(packet_number);  // this index is now in use.
00250   return OKAY;
00251 }
00252 
00253 byte_array buffer::get_packet(int packet_number) const
00254 {
00255   FUNCDEF("get_packet");
00256   auto_synchronizer l(*_lock);
00257   if (!_alive) return byte_array();
00258   bounds_return(packet_number, 0, _packet_list->length() - 1, byte_array());
00259   return _packet_list->get(packet_number);
00260 }
00261 
00262 int buffer::packet_length(int packet_number) const
00263 {
00264   FUNCDEF("packet_length");
00265   auto_synchronizer l(*_lock);
00266   if (!_alive) return 0;
00267   bounds_return(packet_number, 0, _packet_list->length() - 1, 0);
00268   return _packet_list->get(packet_number).length();
00269 }
00270 
00271 int buffer::full_length(int offset) const
00272 {
00273   FUNCDEF("full_length");
00274   auto_synchronizer l(*_lock);
00275   if (!_alive) return 0;
00276   if (offset < 0) return BAD_INPUT;
00277   int to_return = 0;
00278   // make sure all packets exist.
00279   if (!whole()) return INCOMPLETE;
00280   for (int i = 0; i < _packet_list->length(); i++) {
00281     // compute the size of the packet by ignoring the offset portion.
00282     int add_in = _packet_list->get(i).length() - offset;
00283     if (add_in < 0) return BAD_INPUT;
00284     to_return += add_in;
00285   }
00286   return to_return;
00287 }
00288 
00289 int buffer::guess_size(int formal(offset))
00290 {
00291   auto_synchronizer l(*_lock);
00292   // only recompute our guess if it's not already set up.
00293   if (!did_estimate())
00294     _estimation = _packet_list->length() * _maximum_packet;
00295   return _estimation;
00296 }
00297 
00298 outcome buffer::dump(byte_array &storage_location, int offset)
00299 {
00300   FUNCDEF("dump");
00301   auto_synchronizer l(*_lock);
00302   if (!_alive) return BAD_INPUT;
00303   _last_add->reset();
00304   if (offset < 0) return BAD_INPUT;
00305   if (!whole()) return INCOMPLETE;
00306   int len = full_length(offset);
00307   if (len < 0) return outcomes(len);
00308   storage_location.reset(int(len));  // make the storage area the right size.
00309   if (!storage_location.observe()) return OUT_OF_MEMORY;
00310 
00311   int store_offset = 0;  // where we're at inside the storage location.
00312   // loop through and snag the packs in the buffer.
00313   for (int i = 0; i < _packet_list->length(); i++) {
00314     if (offset > _packet_list->get(i).length() - 1) return BAD_INPUT;
00315       // this is a failure in the choice of offsets.
00316     byte_array new_chunk = _packet_list->get(i).subarray(offset, 
00317         _packet_list->get(i).length() - 1);
00318     storage_location.overwrite(store_offset, new_chunk);
00319     // update the place we're pointing at.
00320     store_offset += new_chunk.length();
00321   }
00322   return OKAY;
00323 }
00324 
00325 
00326 #endif //BUFFER_IMPLEMENTATION_FILE
00327 

Generated on Fri Aug 29 04:28:26 2008 for HOOPLE Libraries by  doxygen 1.5.1