cromp_common.cpp

Go to the documentation of this file.
00001 /*****************************************************************************\
00002 *                                                                             *
00003 *  Name   : cromp_common                                                      *
00004 *  Author : Chris Koeritz                                                     *
00005 *                                                                             *
00006 *******************************************************************************
00007 * Copyright (c) 2000-$now By Author.  This program is free software; you can  *
00008 * redistribute it and/or modify it under the terms of the GNU General Public  *
00009 * License as published by the Free Software Foundation; either version 2 of   *
00010 * the License or (at your option) any later version.  This is online at:      *
00011 *     http://www.fsf.org/copyleft/gpl.html                                    *
00012 * Please send any updates to: fred@gruntose.com                               *
00013 \*****************************************************************************/
00014 
00015 // NOTES:
00016 // 
00017 //   for a cromp_common that is "normal", the base octopus will be used for
00018 // restoring infotons.
00019 //   for a dependent cromp_common with a singleton and preexisting socket,
00020 // the socket will be used for communications and the singleton octopus will
00021 // be used for restore().
00022 //
00023 //   there are a few tiers of methods here.  the lowest-level tier can be
00024 // called by any other functions except those in the lowest-level (so being on
00025 // tier A implies that a method may not call other methods in tier A, but being
00026 // on a tier X allows calling of all existent tiers X-1, X-2, ...).
00027 
00028 //   last verified that conditions stated in header about variables protected
00029 // by accumulator lock are true: 12/30/2002.
00030 
00031 #include "cromp_common.h"
00032 #include "cromp_transaction.h"
00033 
00034 #include <basis/byte_array.h>
00035 #include <basis/functions.h>
00036 #include <basis/astring.h>
00037 #include <basis/mutex.h>
00038 #include <crypto/rsa_crypto.h>
00039 #include <loggers/program_wide_logger.h>
00040 #include <octopus/entity_data_bin.h>
00041 #include <octopus/entity_defs.h>
00042 #include <octopus/infoton.h>
00043 #include <octopus/octopus.h>
00044 #include <octopus/tentacle.h>
00045 #include <octopus/unhandled_request.h>
00046 #include <sockets/internet_address.h>
00047 #include <sockets/machine_uid.h>
00048 #include <sockets/spocket.h>
00049 #include <sockets/tcpip_stack.h>
00050 #include <structures/static_memory_gremlin.h>
00051 #include <tentacles/encryption_infoton.h>
00052 #include <textual/byte_formatter.h>
00053 #include <timely/time_stamp.h>
00054 
00055 using namespace basis;
00056 using namespace crypto;
00057 using namespace loggers;
00058 using namespace octopi;
00059 using namespace sockets;
00060 using namespace structures;
00061 using namespace textual;
00062 using namespace timely;
00063 
00064 namespace cromp {
00065 
00066 //#define DEBUG_CROMP_COMMON
00067   // uncomment for debugging info.
00068 
00069 #undef LOG
00070 #define LOG(to_print) CLASS_EMERGENCY_LOG(program_wide_logger::get(), astring(to_print).s())
00071 
00072 const int STALENESS_PERIOD = 2 * MINUTE_ms;
00073   // if data sits in the buffer this long without us seeing more, we assume
00074   // it's gone stale.
00075 
00076 const int SEND_DELAY_TIME = 200;
00077   // if the send failed initially, we'll delay this long before trying again.
00078 
00079 const int DATA_AWAIT_SNOOZE = 80;
00080   // we sleep for this long while we await data.
00081 
00082 const int QUICK_CROMP_SNOOZE = 28;
00083   // we take a quick nap if we're looking for some data and it's not there
00084   // for us yet.
00085 
00086 const int CROMP_BUFFER_CHUNK_SIZE = 256 * KILOBYTE;
00087   // the initial allocation size for buffers.
00088 
00089 const int MAXIMUM_RECEIVES = 70;
00090   // the maximum number of receptions before we skip to next phase.
00091 
00092 const int MAXIMUM_SEND = 128 * KILOBYTE;
00093   // the largest chunk we try to send at a time.  we want to limit this
00094   // rather than continually asking the OS to consume a big transmission.
00095 
00096 const int CLEANUP_INTERVAL = 28 * SECOND_ms;
00097   // this is how frequently we'll flush out items from our data bin that
00098   // are too old.
00099 
00100 const int cromp_common::HOSTCHOP = 6;
00101   // we take this many characters as the readable textual portion of the
00102   // hostname.
00103 
00104 double cromp_common::_bytes_sent_total = 0.0;
00105 double cromp_common::_bytes_received_total = 0.0;
00106 
00107   SAFE_STATIC_CONST(rsa_crypto, _hidden_localhost_only_key,
00108       (encryption_infoton::RSA_KEY_SIZE))
00109   const rsa_crypto &cromp_common::localhost_only_key() {
00110 #ifdef DEBUG_CROMP_COMMON
00111     FUNCDEF("localhost_only_key");
00112 #endif
00113     static bool initted = false;
00114 #ifdef DEBUG_CROMP_COMMON
00115     bool was_initted = initted;
00116 #endif
00117     initted = true;
00118 #ifdef DEBUG_CROMP_COMMON
00119     if (!was_initted)
00120       LOG("started creating localhost RSA key.");
00121 #endif
00122     const rsa_crypto &to_return = _hidden_localhost_only_key();
00123 #ifdef DEBUG_CROMP_COMMON
00124     if (!was_initted)
00125       LOG("done creating localhost RSA key.");
00126 #endif
00127     return to_return;
00128   }
00129 
00130 cromp_common::cromp_common(const astring &host, int max_per_ent)
00131 : _commlink(NIL),
00132   _octopus(new octopus(host, max_per_ent)),
00133   _singleton(NIL),
00134   _requests(new entity_data_bin(max_per_ent)),
00135   _accum_lock(new mutex),
00136   _last_data_seen(new time_stamp),
00137   _accumulator(new byte_array(CROMP_BUFFER_CHUNK_SIZE, NIL)),
00138   _sendings(new byte_array(CROMP_BUFFER_CHUNK_SIZE, NIL)),
00139   _receive_buffer(new byte_array(CROMP_BUFFER_CHUNK_SIZE, NIL)),
00140   _still_flat(new byte_array(CROMP_BUFFER_CHUNK_SIZE, NIL)),
00141   _last_cleanup(new time_stamp)
00142 {
00143 //  FUNCDEF("constructor [host/max_per_ent]");
00144   // clear pre-existing space.
00145   _accumulator->reset();
00146   _sendings->reset();
00147   _receive_buffer->reset();
00148   _still_flat->reset();
00149 }
00150 
00151 cromp_common::cromp_common(spocket *preexisting, octopus *singleton)
00152 : _commlink(preexisting),
00153   _octopus(singleton),
00154   _singleton(singleton),
00155   _requests(new entity_data_bin(singleton?
00156       singleton->responses().max_bytes_per_entity()
00157           : DEFAULT_MAX_ENTITY_QUEUE)),
00158   _accum_lock(new mutex),
00159   _last_data_seen(new time_stamp),
00160   _accumulator(new byte_array(CROMP_BUFFER_CHUNK_SIZE, NIL)),
00161   _sendings(new byte_array(CROMP_BUFFER_CHUNK_SIZE, NIL)),
00162   _receive_buffer(new byte_array(CROMP_BUFFER_CHUNK_SIZE, NIL)),
00163   _still_flat(new byte_array(CROMP_BUFFER_CHUNK_SIZE, NIL)),
00164   _last_cleanup(new time_stamp)
00165 {
00166   FUNCDEF("constructor [preexisting/singleton]");
00167   if (!_octopus) {
00168     // they passed us a bad singleton.  carry on as best we can.
00169     LOG("singleton passed as NIL; constructing new octopus instead.");
00170     internet_address local(internet_address::localhost(), "localhost", 0);
00171     _octopus = new octopus(chew_hostname(local), DEFAULT_MAX_ENTITY_QUEUE);
00172   }
00173   // clear pre-existing space.
00174   _accumulator->reset();
00175   _sendings->reset();
00176   _receive_buffer->reset();
00177   _still_flat->reset();
00178 }
00179 
00180 cromp_common::~cromp_common()
00181 {
00182 //  FUNCDEF("destructor");
00183   close_common();  // shuts down our socket and other stuff.
00184   if (_singleton) {
00185     _singleton = NIL;  // reset the pointer we had.
00186     _octopus = NIL;  // ditto.
00187   } else {
00188     // this one was ours so we need to clean it up.
00189     WHACK(_octopus);
00190   }
00191   WHACK(_accumulator);
00192   WHACK(_sendings);
00193   WHACK(_commlink);
00194   WHACK(_requests);
00195   WHACK(_last_cleanup);
00196   WHACK(_last_data_seen);
00197   WHACK(_receive_buffer);
00198   WHACK(_still_flat);
00199   WHACK(_accum_lock);
00200 }
00201 
00202 spocket *cromp_common::spock() const { return _commlink; }
00203 
00204 int cromp_common::default_port() { return 10008; }
00205 
00206 outcome cromp_common::add_tentacle(tentacle *to_add, bool filter)
00207 { return _octopus->add_tentacle(to_add, filter); }
00208 
00209 int cromp_common::pending_sends() const
00210 {
00211   auto_synchronizer l(*_accum_lock);
00212   return _sendings->length();
00213 }
00214 
00215 int cromp_common::accumulated_bytes() const
00216 {
00217   auto_synchronizer l(*_accum_lock);
00218   return _accumulator->length();
00219 }
00220 
00221 astring cromp_common::chew_hostname(const internet_address &addr,
00222     internet_address *resolved_form)
00223 {
00224 #ifdef DEBUG_CROMP_COMMON
00225   FUNCDEF("chew_hostname");
00226   LOG(astring("addr coming in ") + addr.text_form());
00227 #endif
00228   tcpip_stack stack;
00229   bool worked;
00230   internet_address res1 = stack.fill_and_resolve(addr.hostname, addr.port,
00231       worked);
00232   if (worked) {
00233     if (resolved_form) *resolved_form = res1;
00234 #ifdef DEBUG_CROMP_COMMON
00235     LOG(astring("resolved addr ") + res1.text_form());
00236 #endif
00237   } else {
00238 #ifdef DEBUG_CROMP_COMMON
00239     LOG(astring("failed to resolve host=") + addr.hostname);
00240 #endif
00241   }
00242 
00243   // get a readable form of the host.
00244   astring just_host = res1.normalize_host();
00245   while (just_host.length() < HOSTCHOP) just_host += "-";  // filler.
00246   machine_uid converted = res1.convert();
00247   astring to_return = just_host.substring(0, HOSTCHOP - 1);
00248   to_return += converted.compact_form();
00249 
00250 #ifdef DEBUG_CROMP_COMMON
00251   LOG(astring("returning machid ") + converted.text_form() + ", packed as "
00252       + parser_bits::platform_eol_to_chars()
00253       + byte_formatter::text_dump((abyte *)to_return.s(),
00254             to_return.length() + 1));
00255 #endif
00256 
00257   return to_return;
00258 }
00259 
00260 astring cromp_common::responses_text_form() const
00261 { return _requests->text_form(); }
00262 
00263 internet_address cromp_common::other_side() const
00264 {
00265   if (!_commlink) return internet_address();
00266   return _commlink->where();
00267 }
00268 
00269 int cromp_common::max_bytes_per_entity() const
00270 { return _requests->max_bytes_per_entity(); }
00271 
00272 void cromp_common::max_bytes_per_entity(int max_bytes_per_entity)
00273 {
00274   _requests->max_bytes_per_entity(max_bytes_per_entity);
00275   _octopus->responses().max_bytes_per_entity(max_bytes_per_entity);
00276 }
00277 
00278 void cromp_common::conditional_cleaning()
00279 {
00280 //  FUNCDEF("conditional_cleaning");
00281   if (time_stamp(-CLEANUP_INTERVAL) > *_last_cleanup) {
00282     _requests->clean_out_deadwood();
00283       // flush any items that are too old.
00284     _last_cleanup->reset();
00285       // record that we just cleaned up.
00286   }
00287 }
00288 
00289 outcome cromp_common::open_common(const internet_address &where)
00290 {
00291 #ifdef DEBUG_CROMP_COMMON
00292   FUNCDEF("open_common");
00293 #endif
00294   if (_singleton && _commlink)
00295     return OKAY;  // done if this uses pre-existing objects.
00296 
00297   if (_commlink) WHACK(_commlink);  // clean up any pre-existing socket.
00298 
00299   internet_address other_side = where;
00300 
00301 #ifdef DEBUG_CROMP_COMMON
00302   LOG(astring("opening at ") + other_side.text_form());
00303 #endif
00304   _commlink = new spocket(other_side);
00305 //hmmm: check socket health.
00306 
00307   return OKAY;
00308 }
00309 
00310 outcome cromp_common::close_common()
00311 {
00312   if (_commlink) _commlink->disconnect();  // make the thread stop bothering.
00313   return OKAY;
00314 }
00315 
00316 const char *cromp_common::outcome_name(const outcome &to_name)
00317 {
00318   switch (to_name.value()) {
00319     case TOO_FULL: return "TOO_FULL";
00320     case PARTIAL: return "PARTIAL";
00321     default: return communication_commons::outcome_name(to_name);
00322   }
00323 }
00324 
00325 outcome cromp_common::pack_and_ship(const infoton_list &requests,
00326     int max_tries)
00327 {
00328   FUNCDEF("pack_and_ship [multiple]");
00329   if (!_commlink) return BAD_INPUT;  // they haven't opened this yet.
00330   conditional_cleaning();
00331   {
00332     auto_synchronizer l(*_accum_lock);  // lock while packing.
00333     for (int i = 0; i < requests.elements(); i++) {
00334       if (!requests[i] || !requests[i]->_data) {
00335         // this is a screw-up by someone.
00336         LOG("error in infoton_list; missing data element.");
00337         continue;
00338       }
00339       cromp_transaction::flatten(*_sendings, *requests[i]->_data,
00340           requests[i]->_id);
00341     }
00342   }
00343 
00344   return push_outgoing(max_tries);
00345 }
00346 
00347 bool cromp_common::buffer_clog(int max_buff) const
00348 {
00349   auto_synchronizer l(*_accum_lock);
00350   return _sendings->length() >= max_buff;
00351 }
00352 
00353 outcome cromp_common::pack_and_ship(const infoton &request,
00354     const octopus_request_id &item_id, int max_tries)
00355 {
00356 #ifdef DEBUG_CROMP_COMMON
00357   FUNCDEF("pack_and_ship [single]");
00358 #endif
00359   if (!_commlink) return BAD_INPUT;  // they haven't opened this yet.
00360   conditional_cleaning();
00361 
00362 #ifdef DEBUG_CROMP_COMMON
00363   LOG(astring("sending req ") + item_id.mangled_form());
00364 #endif
00365 
00366   {
00367     auto_synchronizer l(*_accum_lock);  // lock while packing.
00368     cromp_transaction::flatten(*_sendings, request, item_id);
00369   }
00370 
00371   return push_outgoing(max_tries);
00372 }
00373 
00374 outcome cromp_common::push_outgoing(int max_tries)
00375 {
00376   FUNCDEF("push_outgoing");
00377 
00378   if (!max_tries) return cromp_common::OKAY;
00379     // no tries means we're done already.
00380 
00381   grab_anything(false);  // suck any data in that happens to be waiting.
00382 
00383   outcome to_return = cromp_common::TOO_FULL;
00384   int attempts = 0;
00385   while ( (attempts++ < max_tries) && (to_return == cromp_common::TOO_FULL) ) {
00386     to_return = send_buffer();
00387     grab_anything(false);  // suck any data in that happens to be waiting.
00388     if (to_return == cromp_common::OKAY)
00389       break;  // happy returns.
00390     if (to_return == cromp_common::PARTIAL) {
00391       // we sent all we tried to but there's more left.
00392       attempts = 0;  // skip back since we had a successful attempt.
00393       to_return = cromp_common::TOO_FULL;
00394         // reset so that we treat this by staying in the send loop.
00395       continue;  // jump back without waiting.
00396     }
00397     if (to_return == cromp_common::TOO_FULL) {
00398       // we can't send any more yet so delay for a bit to see if we can get
00399       // some more out.
00400       time_stamp stop_pausing(SEND_DELAY_TIME);
00401       while (time_stamp() < stop_pausing) {
00402 LOG("into too full looping...");
00403         if (!_commlink->connected()) break;
00404         grab_anything(true);  // suck any data in that happens to be waiting.
00405         // snooze a bit until we think we can write again.
00406         outcome ret = _commlink->await_writable(QUICK_CROMP_SNOOZE);
00407         if (ret != spocket::NONE_READY)
00408           break;
00409       }
00410     } else {
00411       LOG(astring("failed send: ") + cromp_common::outcome_name(to_return));
00412       break;
00413     }
00414   }
00415   return to_return;
00416 }
00417 
00418 // rules for send_buffer: this function is in the lowest-level tier for using
00419 // the spocket.  it is allowed to be called by anyone.  it must not call any
00420 // other functions on the cromp_common class.
00421 outcome cromp_common::send_buffer()
00422 {
00423 #ifdef DEBUG_CROMP_COMMON
00424   FUNCDEF("send_buffer");
00425 #endif
00426   auto_synchronizer l(*_accum_lock);
00427 
00428   // all done if nothing to send.
00429   if (!_sendings->length())
00430     return OKAY;
00431 
00432   int size_to_send = minimum(_sendings->length(), MAXIMUM_SEND);
00433 #ifdef DEBUG_CROMP_COMMON
00434 //  LOG(a_sprintf("sending %d bytes on socket %d.", size_to_send,
00435 //      _commlink->OS_socket()));
00436 #endif
00437   int len_sent = 0;
00438   outcome to_return;
00439   outcome send_ret = _commlink->send(_sendings->observe(), size_to_send,
00440       len_sent);
00441   switch (send_ret.value()) {
00442     case spocket::OKAY: {
00443       // success.
00444 #ifdef DEBUG_CROMP_COMMON
00445 //      LOG(a_sprintf("really sent %d bytes on socket %d.", len_sent,
00446 //          _commlink->OS_socket()));
00447 #endif
00448       _bytes_sent_total += len_sent;
00449       to_return = OKAY;
00450       break;
00451     }
00452     case spocket::PARTIAL: {
00453       // got something done hopefully.
00454 #ifdef DEBUG_CROMP_COMMON
00455       LOG(a_sprintf("partial send of %d bytes (of %d desired) on socket %d.",
00456           len_sent, size_to_send, _commlink->OS_socket()));
00457 #endif
00458       _bytes_sent_total += len_sent;
00459       to_return = PARTIAL;
00460       break;
00461     }
00462     case spocket::NONE_READY: {
00463       // did nothing useful.
00464 #ifdef DEBUG_CROMP_COMMON
00465       LOG(a_sprintf("too full to send any on socket %d.",
00466           _commlink->OS_socket()));
00467 #endif
00468       len_sent = 0;  // reset just in case.
00469       to_return = TOO_FULL;
00470       break;
00471     }
00472     default: {
00473       // other things went wrong.
00474 #ifdef DEBUG_CROMP_COMMON
00475       LOG(astring("failing send with ") + spocket::outcome_name(send_ret));
00476 #endif
00477       len_sent = 0;  // reset just in case.
00478 
00479 //hmmm: these are unnecessary now since it's the same set of outcomes.
00480       if (send_ret == spocket::NO_CONNECTION) to_return = NO_CONNECTION;
00481       else if (send_ret == spocket::TIMED_OUT) to_return = TIMED_OUT;
00482 //any other ideas?
00483       else to_return = DISALLOWED;
00484 
00485 #ifdef DEBUG_CROMP_COMMON
00486       LOG(astring("failed to send--got error ") + outcome_name(to_return));
00487 #endif
00488       break;
00489     }
00490   }
00491 
00492   if ( (to_return == PARTIAL) || (to_return == OKAY) ) {
00493     // accomodate our latest activity on the socket.
00494     _sendings->zap(0, len_sent - 1);  // sent just some of it.
00495   }
00496 
00497   return to_return;
00498 }
00499 
00500 outcome cromp_common::retrieve_and_restore_root(bool get_anything,
00501     infoton * &item, octopus_request_id &req_id, int timeout)
00502 {
00503 //  FUNCDEF("retrieve_and_restore_root");
00504   item = NIL;
00505   if (!_commlink) return BAD_INPUT;  // they haven't opened this yet.
00506   octopus_request_id tmp_id;
00507   time_stamp leaving_time(timeout);
00508 
00509   conditional_cleaning();
00510 
00511   do {
00512     // check if it's already in the bin from someone else grabbing it.
00513     if (get_anything)
00514       item = _requests->acquire_for_any(req_id);
00515     else
00516       item = _requests->acquire_for_identifier(req_id);
00517     if (item)
00518       return OKAY;
00519 
00520     // check to see if there's any data.
00521     grab_anything(timeout? true : false);
00522 
00523     push_outgoing(1);
00524 //hmmm: parameterize the push?
00525 
00526     // check again just to make sure.  this is before we check the timeout,
00527     // since we could squeak in with something before that.
00528     if (get_anything)
00529       item = _requests->acquire_for_any(req_id);
00530     else
00531       item = _requests->acquire_for_identifier(req_id);
00532     if (item)
00533       return OKAY;
00534 
00535     if (!timeout) return TIMED_OUT;
00536       // timeout is not set so we leave right away.
00537 
00538     if (!_commlink->connected()) return NO_CONNECTION;
00539 
00540     // keep going if we haven't seen it yet and still have time.
00541   } while (time_stamp() < leaving_time);
00542   return TIMED_OUT;
00543 }
00544 
00545 outcome cromp_common::retrieve_and_restore(infoton * &item,
00546     const octopus_request_id &req_id_in, int timeout)
00547 {
00548   octopus_request_id req_id = req_id_in;
00549   return retrieve_and_restore_root(false, item, req_id, timeout);
00550 }
00551 
00552 outcome cromp_common::retrieve_and_restore_any(infoton * &item,
00553     octopus_request_id &req_id, int timeout)
00554 { return retrieve_and_restore_root(true, item, req_id, timeout); }
00555 
00556 // rules: snarf_from_socket is in the second lowest-level tier.  it must not
00557 // call any other functions on cromp_common besides the send_buffer and
00558 // process_accumulator methods.
00559 void cromp_common::snarf_from_socket(bool wait)
00560 {
00561 #ifdef DEBUG_CROMP_COMMON
00562   FUNCDEF("snarf_from_socket");
00563 #endif
00564   if (wait) {
00565 #ifdef DEBUG_CROMP_COMMON
00566 //    LOG(a_sprintf("awaiting rcptblty on socket %d.", _commlink->OS_socket()));
00567 #endif
00568     // snooze until data seems ready for chewing or until we time out.
00569     time_stamp stop_pausing(DATA_AWAIT_SNOOZE);
00570     while (time_stamp() < stop_pausing) {
00571       if (!_commlink->connected()) return;
00572       outcome wait_ret = _commlink->await_readable(QUICK_CROMP_SNOOZE);
00573       if (wait_ret != spocket::NONE_READY)
00574         break;
00575       send_buffer();  // push out some data in between.
00576     }
00577   }
00578 
00579   outcome rcv_ret = spocket::OKAY;
00580   // this loop scrounges as much data as possible, within limits.
00581   int receptions = 0;
00582   while ( (rcv_ret == spocket::OKAY) && (receptions++ < MAXIMUM_RECEIVES) ) {
00583     int rcv_size = CROMP_BUFFER_CHUNK_SIZE;
00584     {
00585       auto_synchronizer l(*_accum_lock);
00586       _receive_buffer->reset();  // clear pre-existing junk.
00587       rcv_ret = _commlink->receive(*_receive_buffer, rcv_size);
00588 #ifdef DEBUG_CROMP_COMMON
00589       if ( (rcv_ret == spocket::OKAY) && rcv_size) {
00590         LOG(a_sprintf("received %d bytes on socket %d", rcv_size,
00591             _commlink->OS_socket()));
00592       } else if (rcv_ret != spocket::NONE_READY) {
00593         LOG(a_sprintf("no data on sock %d--outcome=", _commlink->OS_socket())
00594             + spocket::outcome_name(rcv_ret));
00595       }
00596 #endif
00597       if ( (rcv_ret == spocket::OKAY) && rcv_size) {
00598         // we got some data from the receive, so store it.
00599         _bytes_received_total += _receive_buffer->length();
00600         *_accumulator += *_receive_buffer;  // add to overall accumulator.
00601         _last_data_seen->reset();
00602       }
00603     }
00604 
00605     send_buffer();
00606       // force data to go out also.
00607   }
00608 }
00609 
00610 void cromp_common::grab_anything(bool wait)
00611 {
00612   snarf_from_socket(wait);  // get any data that's waiting.
00613   process_accumulator();  // retrieve any commands we see.
00614 }
00615 
00616 #define CHECK_STALENESS \
00617   if (*_last_data_seen < time_stamp(-STALENESS_PERIOD)) { \
00618     LOG("would resynch data due to staleness."); \
00619     _accumulator->zap(0, 0);  /* roast first byte */ \
00620     cromp_transaction::resynchronize(*_accumulator); \
00621     _last_data_seen->reset(); \
00622     continue; \
00623   }
00624 
00625 // process_accumulator should do nothing besides chewing on the buffer.
00626 // this puts it in the lowest-level tier.
00627 
00628 void cromp_common::process_accumulator()
00629 {
00630   FUNCDEF("process_accumulator");
00631   infoton *item = NIL;
00632   octopus_request_id req_id;
00633 
00634   string_array clas;
00635 
00636   if (!_accumulator->length()) return;
00637 
00638   // a little gymnastics to get a large buffer on the first try.
00639   byte_array temp_chow_buffer(CROMP_BUFFER_CHUNK_SIZE, NIL);
00640   temp_chow_buffer.reset();
00641 
00642   int cmds_found = 0;
00643 
00644   while (_accumulator->length()) {
00645 LOG(a_sprintf("eating command %d", cmds_found++));
00646     {
00647       // first block tries to extract data from the accumulator.
00648       auto_synchronizer l(*_accum_lock);
00649       // there are some contents; let's look at them.
00650       int packed_length = 0;
00651       outcome peek_ret = cromp_transaction::peek_header(*_accumulator,
00652           packed_length);
00653       if ( (peek_ret == cromp_transaction::WAY_TOO_SMALL)
00654           || (peek_ret == cromp_transaction::PARTIAL) ) {
00655         // not ready yet.
00656         CHECK_STALENESS;
00657         return;
00658       } else if (peek_ret != cromp_transaction::OKAY) {
00659         LOG(astring("error unpacking--peek error=")
00660             + cromp_transaction::outcome_name(peek_ret));
00661         // try to get to a real command.
00662         _accumulator->zap(0, 0);  // roast first byte.
00663         if (cromp_transaction::resynchronize(*_accumulator)) continue;
00664         return;
00665       }
00666 
00667 #ifdef DEBUG_CROMP_COMMON
00668       LOG("seeing command ready");
00669 #endif
00670       // temp buffer for undoing cromp transaction.
00671       if (!cromp_transaction::unflatten(*_accumulator, *_still_flat, req_id)) {
00672         LOG("failed to unpack even though peek was happy!");
00673         // try to get to a real command.
00674         _accumulator->zap(0, 0);  // roast first byte.
00675         if (cromp_transaction::resynchronize(*_accumulator)) continue;
00676         return;
00677       }
00678 #ifdef DEBUG_CROMP_COMMON
00679       LOG(astring("got req id of ") + req_id.mangled_form());
00680 #endif
00681 
00682       // now unwrap the onion a bit more to find the real object being sent.
00683       if (!infoton::fast_unpack(*_still_flat, clas, temp_chow_buffer)) {
00684         // try to resynch on transaction boundary.
00685         LOG("failed to get back a packed infoton!");
00686         _accumulator->zap(0, 0);  // roast first byte.
00687         if (cromp_transaction::resynchronize(*_accumulator)) continue;
00688         return;
00689       }
00690 #ifdef DEBUG_CROMP_COMMON
00691       LOG(astring("got classifier of ") + clas.text_form());
00692 #endif
00693     } // end of protected area.
00694 
00695     // restore the infoton from the packed form.
00696     outcome rest_ret = octo()->restore(clas, temp_chow_buffer, item);
00697     if (rest_ret != tentacle::OKAY) {
00698 #ifdef DEBUG_CROMP_COMMON
00699       LOG(astring("our octopus couldn't restore the packed data! ")
00700           + outcome_name(rest_ret));
00701 #endif
00702       // publish an unhandled request back to the requestor.
00703       _requests->add_item(new unhandled_request(req_id, clas, rest_ret),
00704           req_id);
00705     } else {
00706       // we finally have reached a point where we have a valid infoton.
00707       if (_requests->add_item(item, req_id))
00708         cmds_found++;
00709 #ifdef DEBUG_CROMP_COMMON
00710       else
00711         LOG("failed to add item to bin due to space constraints.");
00712 #endif
00713     }
00714 LOG(a_sprintf("ate command %d", cmds_found));
00715   }
00717 }
00718 
00719 bool cromp_common::decode_host(const astring &coded_host, astring &hostname,
00720     machine_uid &machine)
00721 {
00722   if (coded_host.length() < HOSTCHOP) return false;  // not big enough.
00723   hostname = coded_host.substring(0, cromp_common::HOSTCHOP - 1);
00724   const astring compact_uid = coded_host.substring(cromp_common::HOSTCHOP,
00725       coded_host.length() - 1);
00726   machine = machine_uid::expand(compact_uid);
00727   if (!machine.valid()) return false;
00728   return true;
00729 }
00730 
00731 } //namespace.
00732 
Generated on Sat Jan 28 04:22:42 2012 for hoople2 project by  doxygen 1.6.3