cromp_common.cpp

Go to the documentation of this file.
00001 #ifndef CROMP_COMMON_IMPLEMENTATION_FILE
00002 #define CROMP_COMMON_IMPLEMENTATION_FILE
00003 
00004 /*****************************************************************************\
00005 *                                                                             *
00006 *  Name   : cromp_common                                                      *
00007 *  Author : Chris Koeritz                                                     *
00008 *                                                                             *
00009 *******************************************************************************
00010 * Copyright (c) 2000-$now By Author.  This program is free software; you can  *
00011 * redistribute it and/or modify it under the terms of the GNU General Public  *
00012 * License as published by the Free Software Foundation; either version 2 of   *
00013 * the License or (at your option) any later version.  This is online at:      *
00014 *     http://www.fsf.org/copyleft/gpl.html                                    *
00015 * Please send any updates to: fred@gruntose.com                               *
00016 \*****************************************************************************/
00017 
00018 // NOTES:
00019 // 
00020 //   for a cromp_common that is "normal", the base octopus will be used for
00021 // restoring infotons.
00022 //   for a dependent cromp_common with a singleton and preexisting socket,
00023 // the socket will be used for communications and the singleton octopus will
00024 // be used for restore().
00025 //
00026 //   there are a few tiers of methods here.  the lowest-level tier can be
00027 // called by any other functions except those in the lowest-level (so being on
00028 // tier A implies that a method may not call other methods in tier A, but being
00029 // on a tier X allows calling of all existent tiers X-1, X-2, ...).
00030 
00031 //   last verified that conditions stated in header about variables protected
00032 // by accumulator lock are true: 12/30/2002.
00033 
00034 #include "cromp_common.h"
00035 #include "cromp_transaction.h"
00036 
00037 #include <basis/byte_array.h>
00038 #include <basis/function.h>
00039 #include <basis/istring.h>
00040 #include <basis/log_base.h>
00041 #include <basis/mutex.h>
00042 #include <basis/portable.h>
00043 #include <data_struct/static_memory_gremlin.h>
00044 #include <mechanisms/time_stamp.h>
00045 #include <octopus/entity_data_bin.h>
00046 #include <octopus/entity_defs.h>
00047 #include <octopus/infoton.h>
00048 #include <octopus/octopus.h>
00049 #include <octopus/tentacle.h>
00050 #include <octopus/unhandled_request.h>
00051 #include <loggers/file_logger.h>
00052 #include <sockets/address.h>
00053 #include <sockets/machine_uid.h>
00054 #include <sockets/spocket.h>
00055 #include <sockets/tcpip_stack.h>
00056 #include <textual/byte_format.h>
00057 
00058 //#define DEBUG_CROMP_COMMON
00059   // uncomment for debugging info.
00060 
00061 #undef LOG
00062 #ifdef DEBUG_CROMP_COMMON
00063   // since the transaction stuff is so low-level, we risk a feedback loop if
00064   // we log stuff when the program wide logger is itself a communication
00065   // object.
00066   #define LOG(s) CLASS_EMERGENCY_LOG(file_logger(portable::env_string("TMP") + "/cromp_common.log"), s)
00067 #else
00068   #define LOG(s)
00069 #endif
00070 
00071 const int STALENESS_PERIOD = 2 * MINUTE_ms;
00072   // if data sits in the buffer this long without us seeing more, we assume
00073   // it's gone stale.
00074 
00075 const int SEND_DELAY_TIME = 200;
00076   // if the send failed initially, we'll delay this long before trying again.
00077 
00078 const int DATA_AWAIT_SNOOZE = 80;
00079   // we sleep for this long while we await data.
00080 
00081 const int QUICK_CROMP_SNOOZE = 28;
00082   // we take a quick nap if we're looking for some data and it's not there
00083   // for us yet.
00084 
00085 const int CROMP_BUFFER_CHUNK_SIZE = 256 * KILOBYTE;
00086   // the initial allocation size for buffers.
00087 
00088 const int MAXIMUM_RECEIVES = 70;
00089   // the maximum number of receptions before we skip to next phase.
00090 
00091 const int MAXIMUM_SEND = 128 * KILOBYTE;
00092   // the largest chunk we try to send at a time.  we want to limit this
00093   // rather than continually asking the OS to consume a big transmission.
00094 
00095 const int CLEANUP_INTERVAL = 28 * SECOND_ms;
00096   // this is how frequently we'll flush out items from our data bin that
00097   // are too old.
00098 
00099 const int cromp_common::HOSTCHOP = 6;
00100   // we take this many characters as the readable textual portion of the
00101   // hostname.
00102 
00103 double cromp_common::_bytes_sent_total = 0.0;
00104 double cromp_common::_bytes_received_total = 0.0;
00105 
00106 #ifndef OMIT_CRYPTO_SUPPORT
00107   #include <crypto/rsa_crypto.h>
00108   #include <tentacles/encryption_infoton.h>
00109   SAFE_STATIC_CONST(RSA_crypto, _hidden_localhost_only_key,
00110       (encryption_infoton::RSA_KEY_SIZE))
00111   const RSA_crypto &cromp_common::localhost_only_key() {
00112     FUNCDEF("localhost_only_key");
00113     static bool initted = false;
00114 #ifdef DEBUG_CROMP_COMMON
00115     bool was_initted = initted;
00116 #endif
00117     initted = true;
00118 #ifdef DEBUG_CROMP_COMMON
00119     if (!was_initted)
00120       LOG("started creating localhost RSA key.");
00121 #endif
00122     const RSA_crypto &to_return = _hidden_localhost_only_key();
00123 #ifdef DEBUG_CROMP_COMMON
00124     if (!was_initted)
00125       LOG("done creating localhost RSA key.");
00126 #endif
00127     return to_return;
00128   }
00129 #endif
00130 
00131 cromp_common::cromp_common(const istring &host, int max_per_ent)
00132 : _commlink(NIL),
00133   _octopus(new octopus(host, max_per_ent)),
00134   _singleton(NIL),
00135   _requests(new entity_data_bin(max_per_ent)),
00136   _accum_lock(new mutex),
00137   _last_data_seen(new time_stamp),
00138   _accumulator(new byte_array(CROMP_BUFFER_CHUNK_SIZE, NIL)),
00139   _sendings(new byte_array(CROMP_BUFFER_CHUNK_SIZE, NIL)),
00140   _receive_buffer(new byte_array(CROMP_BUFFER_CHUNK_SIZE, NIL)),
00141   _still_flat(new byte_array(CROMP_BUFFER_CHUNK_SIZE, NIL)),
00142   _last_cleanup(new time_stamp)
00143 {
00144   FUNCDEF("constructor [host/max_per_ent]");
00145   // clear pre-existing space.
00146   _accumulator->reset();
00147   _sendings->reset();
00148   _receive_buffer->reset();
00149   _still_flat->reset();
00150 }
00151 
00152 cromp_common::cromp_common(spocket *preexisting, octopus *singleton)
00153 : _commlink(preexisting),
00154   _octopus(singleton),
00155   _singleton(singleton),
00156   _requests(new entity_data_bin(singleton?
00157       singleton->responses().max_bytes_per_entity()
00158           : DEFAULT_MAX_ENTITY_QUEUE)),
00159   _accum_lock(new mutex),
00160   _last_data_seen(new time_stamp),
00161   _accumulator(new byte_array(CROMP_BUFFER_CHUNK_SIZE, NIL)),
00162   _sendings(new byte_array(CROMP_BUFFER_CHUNK_SIZE, NIL)),
00163   _receive_buffer(new byte_array(CROMP_BUFFER_CHUNK_SIZE, NIL)),
00164   _still_flat(new byte_array(CROMP_BUFFER_CHUNK_SIZE, NIL)),
00165   _last_cleanup(new time_stamp)
00166 {
00167   FUNCDEF("constructor [preexisting/singleton]");
00168   if (!_octopus) {
00169     // they passed us a bad singleton.  carry on as best we can.
00170     LOG("singleton passed as NIL; constructing new octopus instead.");
00171     internet_address local(internet_address::localhost(), "localhost", 0);
00172     _octopus = new octopus(chew_hostname(local), DEFAULT_MAX_ENTITY_QUEUE);
00173   }
00174   // clear pre-existing space.
00175   _accumulator->reset();
00176   _sendings->reset();
00177   _receive_buffer->reset();
00178   _still_flat->reset();
00179 }
00180 
00181 cromp_common::~cromp_common()
00182 {
00183   FUNCDEF("destructor");
00184   close_common();  // shuts down our socket and other stuff.
00185   if (_singleton) {
00186     _singleton = NIL;  // reset the pointer we had.
00187     _octopus = NIL;  // ditto.
00188   } else {
00189     // this one was ours so we need to clean it up.
00190     WHACK(_octopus);
00191   }
00192   WHACK(_accumulator);
00193   WHACK(_sendings);
00194   WHACK(_commlink);
00195   WHACK(_requests);
00196   WHACK(_last_cleanup);
00197   WHACK(_last_data_seen);
00198   WHACK(_receive_buffer);
00199   WHACK(_still_flat);
00200   WHACK(_accum_lock);
00201 }
00202 
00203 spocket *cromp_common::spock() const { return _commlink; }
00204 
00205 int cromp_common::default_port() { return 10008; }
00206 
00207 outcome cromp_common::add_tentacle(tentacle *to_add, bool filter)
00208 { return _octopus->add_tentacle(to_add, filter); }
00209 
00210 int cromp_common::pending_sends() const
00211 {
00212   auto_synchronizer l(*_accum_lock);
00213   return _sendings->length();
00214 }
00215 
00216 int cromp_common::accumulated_bytes() const
00217 {
00218   auto_synchronizer l(*_accum_lock);
00219   return _accumulator->length();
00220 }
00221 
00222 istring cromp_common::chew_hostname(const internet_address &addr,
00223     internet_address *resolved_form)
00224 {
00225   FUNCDEF("chew_hostname");
00226 #ifdef DEBUG_CROMP_COMMON
00227   LOG(istring("addr coming in ") + addr.text_form());
00228 #endif
00229   tcpip_stack stack;
00230   bool worked;
00231   internet_address res1 = stack.fill_and_resolve(addr.hostname, addr.port,
00232       worked);
00233   if (worked) {
00234     if (resolved_form) *resolved_form = res1;
00235 #ifdef DEBUG_CROMP_COMMON
00236     LOG(istring("resolved addr ") + res1.text_form());
00237 #endif
00238   } else {
00239 #ifdef DEBUG_CROMP_COMMON
00240     LOG(istring("failed to resolve host=") + addr.hostname);
00241 #endif
00242   }
00243 
00244   // get a readable form of the host.
00245   istring just_host = res1.normalize_host();
00246   while (just_host.length() < HOSTCHOP) just_host += "-";  // filler.
00247   machine_uid converted = res1.convert();
00248   istring to_return = just_host.substring(0, HOSTCHOP - 1);
00249   to_return += converted.compact_form();
00250 
00251 #ifdef DEBUG_CROMP_COMMON
00252   LOG(istring("returning machid ") + converted.text_form() + ", packed as "
00253       + log_base::platform_ending()
00254       + byte_format::text_dump((byte *)to_return.s(),
00255             to_return.length() + 1));
00256 #endif
00257 
00258   return to_return;
00259 }
00260 
00261 istring cromp_common::responses_text_form() const
00262 { return _requests->text_form(); }
00263 
00264 internet_address cromp_common::other_side() const
00265 {
00266   if (!_commlink) return internet_address();
00267   return _commlink->where();
00268 }
00269 
00270 int cromp_common::max_bytes_per_entity() const
00271 { return _requests->max_bytes_per_entity(); }
00272 
00273 void cromp_common::max_bytes_per_entity(int max_bytes_per_entity)
00274 {
00275   _requests->max_bytes_per_entity(max_bytes_per_entity);
00276   _octopus->responses().max_bytes_per_entity(max_bytes_per_entity);
00277 }
00278 
00279 void cromp_common::conditional_cleaning()
00280 {
00281   FUNCDEF("conditional_cleaning");
00282   if (time_stamp(-CLEANUP_INTERVAL) > *_last_cleanup) {
00283     _requests->clean_out_deadwood();
00284       // flush any items that are too old.
00285     _last_cleanup->reset();
00286       // record that we just cleaned up.
00287   }
00288 }
00289 
00290 outcome cromp_common::open_common(const internet_address &where)
00291 {
00292   FUNCDEF("open_common");
00293   if (_singleton && _commlink)
00294     return OKAY;  // done if this uses pre-existing objects.
00295 
00296   if (_commlink) WHACK(_commlink);  // clean up any pre-existing socket.
00297 
00298   internet_address other_side = where;
00299 
00300 #ifdef DEBUG_CROMP_COMMON
00301   LOG(istring("opening at ") + other_side.text_form());
00302 #endif
00303   _commlink = new spocket(other_side);
00304 //hmmm: check socket health.
00305 
00306   return OKAY;
00307 }
00308 
00309 outcome cromp_common::close_common()
00310 {
00311   if (_commlink) _commlink->disconnect();  // make the thread stop bothering.
00312   return OKAY;
00313 }
00314 
00315 const char *cromp_common::outcome_name(const outcome &to_name)
00316 {
00317   switch (to_name.value()) {
00318     case TOO_FULL: return "TOO_FULL";
00319     case PARTIAL: return "PARTIAL";
00320     default: return communication_commons::outcome_name(to_name);
00321   }
00322 }
00323 
00324 outcome cromp_common::pack_and_ship(const infoton_list &requests,
00325     int max_tries)
00326 {
00327   FUNCDEF("pack_and_ship [multiple]");
00328   if (!_commlink) return BAD_INPUT;  // they haven't opened this yet.
00329   conditional_cleaning();
00330   {
00331     auto_synchronizer l(*_accum_lock);  // lock while packing.
00332     for (int i = 0; i < requests.elements(); i++) {
00333       if (!requests[i] || !requests[i]->_data) {
00334         // this is a screw-up by someone.
00335         LOG("error in infoton_list; missing data element.");
00336         continue;
00337       }
00338       cromp_transaction::flatten(*_sendings, *requests[i]->_data,
00339           requests[i]->_id);
00340     }
00341   }
00342 
00343   return push_outgoing(max_tries);
00344 }
00345 
00346 bool cromp_common::buffer_clog(int max_buff) const
00347 {
00348   auto_synchronizer l(*_accum_lock);
00349   return _sendings->length() >= max_buff;
00350 }
00351 
00352 outcome cromp_common::pack_and_ship(const infoton &request,
00353     const octopus_request_id &item_id, int max_tries)
00354 {
00355   FUNCDEF("pack_and_ship [single]");
00356   if (!_commlink) return BAD_INPUT;  // they haven't opened this yet.
00357   conditional_cleaning();
00358 
00359 #ifdef DEBUG_CROMP_COMMON
00360   LOG(istring("sending req ") + item_id.mangled_form());
00361 #endif
00362 
00363   {
00364     auto_synchronizer l(*_accum_lock);  // lock while packing.
00365     cromp_transaction::flatten(*_sendings, request, item_id);
00366   }
00367 
00368   return push_outgoing(max_tries);
00369 }
00370 
00371 outcome cromp_common::push_outgoing(int max_tries)
00372 {
00373   FUNCDEF("push_outgoing");
00374 
00375   if (!max_tries) return cromp_common::OKAY;
00376     // no tries means we're done already.
00377 
00378   grab_anything(false);  // suck any data in that happens to be waiting.
00379 
00380   outcome to_return = cromp_common::TOO_FULL;
00381   int attempts = 0;
00382   while ( (attempts++ < max_tries) && (to_return == cromp_common::TOO_FULL) ) {
00383     to_return = send_buffer();
00384     grab_anything(false);  // suck any data in that happens to be waiting.
00385     if (to_return == cromp_common::OKAY)
00386       break;  // happy returns.
00387     if (to_return == cromp_common::PARTIAL) {
00388       // we sent all we tried to but there's more left.
00389       attempts = 0;  // skip back since we had a successful attempt.
00390       to_return = cromp_common::TOO_FULL;
00391         // reset so that we treat this by staying in the send loop.
00392       continue;  // jump back without waiting.
00393     }
00394     if (to_return == cromp_common::TOO_FULL) {
00395       // we can't send any more yet so delay for a bit to see if we can get
00396       // some more out.
00397       time_stamp stop_pausing(SEND_DELAY_TIME);
00398       while (time_stamp() < stop_pausing) {
00399 LOG("into too full looping...");
00400         if (!_commlink->connected()) break;
00401         grab_anything(true);  // suck any data in that happens to be waiting.
00402         // snooze a bit until we think we can write again.
00403         outcome ret = _commlink->await_writable(QUICK_CROMP_SNOOZE);
00404         if (ret != spocket::NONE_READY)
00405           break;
00406       }
00407     } else {
00408       LOG(istring("failed send: ") + cromp_common::outcome_name(to_return));
00409       break;
00410     }
00411   }
00412   return to_return;
00413 }
00414 
00415 // rules for send_buffer: this function is in the lowest-level tier for using
00416 // the spocket.  it is allowed to be called by anyone.  it must not call any
00417 // other functions on the cromp_common class.
00418 outcome cromp_common::send_buffer()
00419 {
00420   FUNCDEF("send_buffer");
00421   auto_synchronizer l(*_accum_lock);
00422 
00423   // all done if nothing to send.
00424   if (!_sendings->length())
00425     return OKAY;
00426 
00427   int size_to_send = minimum(_sendings->length(), MAXIMUM_SEND);
00428 #ifdef DEBUG_CROMP_COMMON
00429 //  LOG(isprintf("sending %d bytes on socket %d.", size_to_send,
00430 //      _commlink->OS_socket()));
00431 #endif
00432   int len_sent = 0;
00433   outcome to_return;
00434   outcome send_ret = _commlink->send(_sendings->observe(), size_to_send,
00435       len_sent);
00436   switch (send_ret.value()) {
00437     case spocket::OKAY: {
00438       // success.
00439 #ifdef DEBUG_CROMP_COMMON
00440 //      LOG(isprintf("really sent %d bytes on socket %d.", len_sent,
00441 //          _commlink->OS_socket()));
00442 #endif
00443       _bytes_sent_total += len_sent;
00444       to_return = OKAY;
00445       break;
00446     }
00447     case spocket::PARTIAL: {
00448       // got something done hopefully.
00449 #ifdef DEBUG_CROMP_COMMON
00450       LOG(isprintf("partial send of %d bytes (of %d desired) on socket %d.",
00451           len_sent, size_to_send, _commlink->OS_socket()));
00452 #endif
00453       _bytes_sent_total += len_sent;
00454       to_return = PARTIAL;
00455       break;
00456     }
00457     case spocket::NONE_READY: {
00458       // did nothing useful.
00459 #ifdef DEBUG_CROMP_COMMON
00460       LOG(isprintf("too full to send any on socket %d.",
00461           _commlink->OS_socket()));
00462 #endif
00463       len_sent = 0;  // reset just in case.
00464       to_return = TOO_FULL;
00465       break;
00466     }
00467     default: {
00468       // other things went wrong.
00469 #ifdef DEBUG_CROMP_COMMON
00470       LOG(istring("failing send with ") + spocket::outcome_name(send_ret));
00471 #endif
00472       len_sent = 0;  // reset just in case.
00473 
00474 //hmmm: these are unnecessary now since it's the same set of outcomes.
00475       if (send_ret == spocket::NO_CONNECTION) to_return = NO_CONNECTION;
00476       else if (send_ret == spocket::TIMED_OUT) to_return = TIMED_OUT;
00477 //any other ideas?
00478       else to_return = DISALLOWED;
00479 
00480 #ifdef DEBUG_CROMP_COMMON
00481       LOG(istring("failed to send--got error ") + outcome_name(to_return));
00482 #endif
00483       break;
00484     }
00485   }
00486 
00487   if ( (to_return == PARTIAL) || (to_return == OKAY) ) {
00488     // accomodate our latest activity on the socket.
00489     _sendings->zap(0, len_sent - 1);  // sent just some of it.
00490   }
00491 
00492   return to_return;
00493 }
00494 
00495 outcome cromp_common::retrieve_and_restore_root(bool get_anything,
00496     infoton * &item, octopus_request_id &req_id, int timeout)
00497 {
00498   FUNCDEF("retrieve_and_restore_root");
00499   item = NIL;
00500   if (!_commlink) return BAD_INPUT;  // they haven't opened this yet.
00501   octopus_request_id tmp_id;
00502   outcome to_return;
00503   time_stamp leaving_time(timeout);
00504 
00505   conditional_cleaning();
00506 
00507   do {
00508     // check if it's already in the bin from someone else grabbing it.
00509     if (get_anything)
00510       item = _requests->acquire_for_any(req_id);
00511     else
00512       item = _requests->acquire_for_identifier(req_id);
00513     if (item)
00514       return OKAY;
00515 
00516     // check to see if there's any data.
00517     grab_anything(timeout? true : false);
00518 
00519     push_outgoing(1);
00520 //hmmm: parameterize the push?
00521 
00522     // check again just to make sure.
00523     if (get_anything)
00524       item = _requests->acquire_for_any(req_id);
00525     else
00526       item = _requests->acquire_for_identifier(req_id);
00527     if (item)
00528       return OKAY;
00529 
00530     if (!timeout) return TIMED_OUT;
00531       // timeout is not set so we leave right away.
00532 
00533     if (!_commlink->connected()) return NO_CONNECTION;
00534 
00535     // keep going if we haven't seen it yet and still have time.
00536   } while (time_stamp() < leaving_time);
00537   return TIMED_OUT;
00538 }
00539 
00540 outcome cromp_common::retrieve_and_restore(infoton * &item,
00541     const octopus_request_id &req_id_in, int timeout)
00542 {
00543   octopus_request_id req_id = req_id_in;
00544   return retrieve_and_restore_root(false, item, req_id, timeout);
00545 }
00546 
00547 outcome cromp_common::retrieve_and_restore_any(infoton * &item,
00548     octopus_request_id &req_id, int timeout)
00549 { return retrieve_and_restore_root(true, item, req_id, timeout); }
00550 
00551 // rules: snarf_from_socket is in the second lowest-level tier.  it must not
00552 // call any other functions on cromp_common besides the send_buffer and
00553 // process_accumulator methods.
00554 void cromp_common::snarf_from_socket(bool wait)
00555 {
00556   FUNCDEF("snarf_from_socket");
00557   if (wait) {
00558 #ifdef DEBUG_CROMP_COMMON
00559 //    LOG(isprintf("awaiting rcptblty on socket %d.", _commlink->OS_socket()));
00560 #endif
00561     // snooze until data seems ready for chewing or until we time out.
00562     time_stamp stop_pausing(DATA_AWAIT_SNOOZE);
00563     while (time_stamp() < stop_pausing) {
00564       if (!_commlink->connected()) return;
00565       outcome wait_ret = _commlink->await_readable(QUICK_CROMP_SNOOZE);
00566       if (wait_ret != spocket::NONE_READY)
00567         break;
00568       send_buffer();  // push out some data in between.
00569     }
00570   }
00571 
00572   outcome rcv_ret = spocket::OKAY;
00573   // this loop scrounges as much data as possible, within limits.
00574   int receptions = 0;
00575   while ( (rcv_ret == spocket::OKAY) && (receptions++ < MAXIMUM_RECEIVES) ) {
00576     int rcv_size = CROMP_BUFFER_CHUNK_SIZE;
00577     {
00578       auto_synchronizer l(*_accum_lock);
00579       _receive_buffer->reset();  // clear pre-existing junk.
00580       rcv_ret = _commlink->receive(*_receive_buffer, rcv_size);
00581 #ifdef DEBUG_CROMP_COMMON
00582       if ( (rcv_ret == spocket::OKAY) && rcv_size) {
00583         LOG(isprintf("received %d bytes on socket %d", rcv_size,
00584             _commlink->OS_socket()));
00585       } else if (rcv_ret != spocket::NONE_READY) {
00586         LOG(isprintf("no data on sock %d--outcome=", _commlink->OS_socket())
00587             + spocket::outcome_name(rcv_ret));
00588       }
00589 #endif
00590       if ( (rcv_ret == spocket::OKAY) && rcv_size) {
00591         // we got some data from the receive, so store it.
00592         _bytes_received_total += _receive_buffer->length();
00593         *_accumulator += *_receive_buffer;  // add to overall accumulator.
00594         _last_data_seen->reset();
00595       }
00596     }
00597 
00598     send_buffer();
00599       // force data to go out also.
00600   }
00601 }
00602 
00603 void cromp_common::grab_anything(bool wait)
00604 {
00605   snarf_from_socket(wait);  // get any data that's waiting.
00606   process_accumulator();  // retrieve any commands we see.
00607 }
00608 
00609 #define CHECK_STALENESS \
00610   if (*_last_data_seen < time_stamp(-STALENESS_PERIOD)) { \
00611     LOG("would resynch data due to staleness."); \
00612     _accumulator->zap(0, 0);  /* roast first byte */ \
00613     cromp_transaction::resynchronize(*_accumulator); \
00614     _last_data_seen->reset(); \
00615     continue; \
00616   }
00617 
00618 // process_accumulator should do nothing besides chewing on the buffer.
00619 // this puts it in the lowest-level tier.
00620 
00621 void cromp_common::process_accumulator()
00622 {
00623   FUNCDEF("process_accumulator");
00624   infoton *item = NIL;
00625   octopus_request_id req_id;
00626 
00627   string_array clas;
00628 
00629   if (!_accumulator->length()) return;
00630 
00631   // a little gymnastics to get a large buffer on the first try.
00632   byte_array temp_chow_buffer(CROMP_BUFFER_CHUNK_SIZE, NIL);
00633   temp_chow_buffer.reset();
00634 
00635   int cmds_found = 0;
00636 
00637   while (_accumulator->length()) {
00638 LOG(isprintf("eating command %d", cmds_found++));
00639     {
00640       // first block tries to extract data from the accumulator.
00641       auto_synchronizer l(*_accum_lock);
00642       // there are some contents; let's look at them.
00643       int packed_length = 0;
00644       outcome peek_ret = cromp_transaction::peek_header(*_accumulator,
00645           packed_length);
00646       if ( (peek_ret == cromp_transaction::WAY_TOO_SMALL)
00647           || (peek_ret == cromp_transaction::PARTIAL) ) {
00648         // not ready yet.
00649         CHECK_STALENESS;
00650         return;
00651       } else if (peek_ret != cromp_transaction::OKAY) {
00652         LOG(istring("error unpacking--peek error=")
00653             + cromp_transaction::outcome_name(peek_ret));
00654         // try to get to a real command.
00655         _accumulator->zap(0, 0);  // roast first byte.
00656         if (cromp_transaction::resynchronize(*_accumulator)) continue;
00657         return;
00658       }
00659 
00660 #ifdef DEBUG_CROMP_COMMON
00661       LOG("seeing command ready");
00662 #endif
00663       // temp buffer for undoing cromp transaction.
00664       if (!cromp_transaction::unflatten(*_accumulator, *_still_flat, req_id)) {
00665         LOG("failed to unpack even though peek was happy!");
00666         // try to get to a real command.
00667         _accumulator->zap(0, 0);  // roast first byte.
00668         if (cromp_transaction::resynchronize(*_accumulator)) continue;
00669         return;
00670       }
00671 #ifdef DEBUG_CROMP_COMMON
00672       LOG(istring("got req id of ") + req_id.mangled_form());
00673 #endif
00674 
00675       // now unwrap the onion a bit more to find the real object being sent.
00676       if (!infoton::fast_unpack(*_still_flat, clas, temp_chow_buffer)) {
00677         // try to resynch on transaction boundary.
00678         LOG("failed to get back a packed infoton!");
00679         _accumulator->zap(0, 0);  // roast first byte.
00680         if (cromp_transaction::resynchronize(*_accumulator)) continue;
00681         return;
00682       }
00683 #ifdef DEBUG_CROMP_COMMON
00684       LOG(istring("got classifier of ") + clas.text_form());
00685 #endif
00686     } // end of protected area.
00687 
00688     // restore the infoton from the packed form.
00689     outcome rest_ret = octo()->restore(clas, temp_chow_buffer, item);
00690     if (rest_ret != tentacle::OKAY) {
00691 #ifdef DEBUG_CROMP_COMMON
00692       LOG(istring("our octopus couldn't restore the packed data! ")
00693           + outcome_name(rest_ret));
00694 #endif
00695       // publish an unhandled request back to the requestor.
00696       _requests->add_item(new unhandled_request(req_id, clas, rest_ret),
00697           req_id);
00698     } else {
00699       // we finally have reached a point where we have a valid infoton.
00700       if (_requests->add_item(item, req_id))
00701         cmds_found++;
00702 #ifdef DEBUG_CROMP_COMMON
00703       else
00704         LOG("failed to add item to bin due to space constraints.");
00705 #endif
00706     }
00707 LOG(isprintf("ate command %d", cmds_found));
00708   }
00710 }
00711 
00712 bool cromp_common::decode_host(const istring &coded_host, istring &hostname,
00713     machine_uid &machine)
00714 {
00715   if (coded_host.length() < HOSTCHOP) return false;  // not big enough.
00716   hostname = coded_host.substring(0, cromp_common::HOSTCHOP - 1);
00717   const istring compact_uid = coded_host.substring(cromp_common::HOSTCHOP,
00718       coded_host.length() - 1);
00719   machine = machine_uid::expand(compact_uid);
00720   if (!machine.valid()) return false;
00721   return true;
00722 }
00723 
00724 
00725 #endif //CROMP_COMMON_IMPLEMENTATION_FILE
00726 

Generated on Fri Nov 28 04:28:54 2008 for HOOPLE Libraries by  doxygen 1.5.1