00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031 #include "cromp_common.h"
00032 #include "cromp_transaction.h"
00033
00034 #include <basis/byte_array.h>
00035 #include <basis/functions.h>
00036 #include <basis/astring.h>
00037 #include <basis/mutex.h>
00038 #include <crypto/rsa_crypto.h>
00039 #include <loggers/program_wide_logger.h>
00040 #include <octopus/entity_data_bin.h>
00041 #include <octopus/entity_defs.h>
00042 #include <octopus/infoton.h>
00043 #include <octopus/octopus.h>
00044 #include <octopus/tentacle.h>
00045 #include <octopus/unhandled_request.h>
00046 #include <sockets/internet_address.h>
00047 #include <sockets/machine_uid.h>
00048 #include <sockets/spocket.h>
00049 #include <sockets/tcpip_stack.h>
00050 #include <structures/static_memory_gremlin.h>
00051 #include <tentacles/encryption_infoton.h>
00052 #include <textual/byte_formatter.h>
00053 #include <timely/time_stamp.h>
00054
00055 using namespace basis;
00056 using namespace crypto;
00057 using namespace loggers;
00058 using namespace octopi;
00059 using namespace sockets;
00060 using namespace structures;
00061 using namespace textual;
00062 using namespace timely;
00063
00064 namespace cromp {
00065
00066
00067
00068
00069 #undef LOG
00070 #define LOG(to_print) CLASS_EMERGENCY_LOG(program_wide_logger::get(), astring(to_print).s())
00071
00072 const int STALENESS_PERIOD = 2 * MINUTE_ms;
00073
00074
00075
00076 const int SEND_DELAY_TIME = 200;
00077
00078
00079 const int DATA_AWAIT_SNOOZE = 80;
00080
00081
00082 const int QUICK_CROMP_SNOOZE = 28;
00083
00084
00085
00086 const int CROMP_BUFFER_CHUNK_SIZE = 256 * KILOBYTE;
00087
00088
00089 const int MAXIMUM_RECEIVES = 70;
00090
00091
00092 const int MAXIMUM_SEND = 128 * KILOBYTE;
00093
00094
00095
00096 const int CLEANUP_INTERVAL = 28 * SECOND_ms;
00097
00098
00099
00100 const int cromp_common::HOSTCHOP = 6;
00101
00102
00103
00104 double cromp_common::_bytes_sent_total = 0.0;
00105 double cromp_common::_bytes_received_total = 0.0;
00106
00107 SAFE_STATIC_CONST(rsa_crypto, _hidden_localhost_only_key,
00108 (encryption_infoton::RSA_KEY_SIZE))
00109 const rsa_crypto &cromp_common::localhost_only_key() {
00110 #ifdef DEBUG_CROMP_COMMON
00111 FUNCDEF("localhost_only_key");
00112 #endif
00113 static bool initted = false;
00114 #ifdef DEBUG_CROMP_COMMON
00115 bool was_initted = initted;
00116 #endif
00117 initted = true;
00118 #ifdef DEBUG_CROMP_COMMON
00119 if (!was_initted)
00120 LOG("started creating localhost RSA key.");
00121 #endif
00122 const rsa_crypto &to_return = _hidden_localhost_only_key();
00123 #ifdef DEBUG_CROMP_COMMON
00124 if (!was_initted)
00125 LOG("done creating localhost RSA key.");
00126 #endif
00127 return to_return;
00128 }
00129
00130 cromp_common::cromp_common(const astring &host, int max_per_ent)
00131 : _commlink(NIL),
00132 _octopus(new octopus(host, max_per_ent)),
00133 _singleton(NIL),
00134 _requests(new entity_data_bin(max_per_ent)),
00135 _accum_lock(new mutex),
00136 _last_data_seen(new time_stamp),
00137 _accumulator(new byte_array(CROMP_BUFFER_CHUNK_SIZE, NIL)),
00138 _sendings(new byte_array(CROMP_BUFFER_CHUNK_SIZE, NIL)),
00139 _receive_buffer(new byte_array(CROMP_BUFFER_CHUNK_SIZE, NIL)),
00140 _still_flat(new byte_array(CROMP_BUFFER_CHUNK_SIZE, NIL)),
00141 _last_cleanup(new time_stamp)
00142 {
00143
00144
00145 _accumulator->reset();
00146 _sendings->reset();
00147 _receive_buffer->reset();
00148 _still_flat->reset();
00149 }
00150
00151 cromp_common::cromp_common(spocket *preexisting, octopus *singleton)
00152 : _commlink(preexisting),
00153 _octopus(singleton),
00154 _singleton(singleton),
00155 _requests(new entity_data_bin(singleton?
00156 singleton->responses().max_bytes_per_entity()
00157 : DEFAULT_MAX_ENTITY_QUEUE)),
00158 _accum_lock(new mutex),
00159 _last_data_seen(new time_stamp),
00160 _accumulator(new byte_array(CROMP_BUFFER_CHUNK_SIZE, NIL)),
00161 _sendings(new byte_array(CROMP_BUFFER_CHUNK_SIZE, NIL)),
00162 _receive_buffer(new byte_array(CROMP_BUFFER_CHUNK_SIZE, NIL)),
00163 _still_flat(new byte_array(CROMP_BUFFER_CHUNK_SIZE, NIL)),
00164 _last_cleanup(new time_stamp)
00165 {
00166 FUNCDEF("constructor [preexisting/singleton]");
00167 if (!_octopus) {
00168
00169 LOG("singleton passed as NIL; constructing new octopus instead.");
00170 internet_address local(internet_address::localhost(), "localhost", 0);
00171 _octopus = new octopus(chew_hostname(local), DEFAULT_MAX_ENTITY_QUEUE);
00172 }
00173
00174 _accumulator->reset();
00175 _sendings->reset();
00176 _receive_buffer->reset();
00177 _still_flat->reset();
00178 }
00179
00180 cromp_common::~cromp_common()
00181 {
00182
00183 close_common();
00184 if (_singleton) {
00185 _singleton = NIL;
00186 _octopus = NIL;
00187 } else {
00188
00189 WHACK(_octopus);
00190 }
00191 WHACK(_accumulator);
00192 WHACK(_sendings);
00193 WHACK(_commlink);
00194 WHACK(_requests);
00195 WHACK(_last_cleanup);
00196 WHACK(_last_data_seen);
00197 WHACK(_receive_buffer);
00198 WHACK(_still_flat);
00199 WHACK(_accum_lock);
00200 }
00201
00202 spocket *cromp_common::spock() const { return _commlink; }
00203
00204 int cromp_common::default_port() { return 10008; }
00205
00206 outcome cromp_common::add_tentacle(tentacle *to_add, bool filter)
00207 { return _octopus->add_tentacle(to_add, filter); }
00208
00209 int cromp_common::pending_sends() const
00210 {
00211 auto_synchronizer l(*_accum_lock);
00212 return _sendings->length();
00213 }
00214
00215 int cromp_common::accumulated_bytes() const
00216 {
00217 auto_synchronizer l(*_accum_lock);
00218 return _accumulator->length();
00219 }
00220
00221 astring cromp_common::chew_hostname(const internet_address &addr,
00222 internet_address *resolved_form)
00223 {
00224 #ifdef DEBUG_CROMP_COMMON
00225 FUNCDEF("chew_hostname");
00226 LOG(astring("addr coming in ") + addr.text_form());
00227 #endif
00228 tcpip_stack stack;
00229 bool worked;
00230 internet_address res1 = stack.fill_and_resolve(addr.hostname, addr.port,
00231 worked);
00232 if (worked) {
00233 if (resolved_form) *resolved_form = res1;
00234 #ifdef DEBUG_CROMP_COMMON
00235 LOG(astring("resolved addr ") + res1.text_form());
00236 #endif
00237 } else {
00238 #ifdef DEBUG_CROMP_COMMON
00239 LOG(astring("failed to resolve host=") + addr.hostname);
00240 #endif
00241 }
00242
00243
00244 astring just_host = res1.normalize_host();
00245 while (just_host.length() < HOSTCHOP) just_host += "-";
00246 machine_uid converted = res1.convert();
00247 astring to_return = just_host.substring(0, HOSTCHOP - 1);
00248 to_return += converted.compact_form();
00249
00250 #ifdef DEBUG_CROMP_COMMON
00251 LOG(astring("returning machid ") + converted.text_form() + ", packed as "
00252 + parser_bits::platform_eol_to_chars()
00253 + byte_formatter::text_dump((abyte *)to_return.s(),
00254 to_return.length() + 1));
00255 #endif
00256
00257 return to_return;
00258 }
00259
00260 astring cromp_common::responses_text_form() const
00261 { return _requests->text_form(); }
00262
00263 internet_address cromp_common::other_side() const
00264 {
00265 if (!_commlink) return internet_address();
00266 return _commlink->where();
00267 }
00268
00269 int cromp_common::max_bytes_per_entity() const
00270 { return _requests->max_bytes_per_entity(); }
00271
00272 void cromp_common::max_bytes_per_entity(int max_bytes_per_entity)
00273 {
00274 _requests->max_bytes_per_entity(max_bytes_per_entity);
00275 _octopus->responses().max_bytes_per_entity(max_bytes_per_entity);
00276 }
00277
00278 void cromp_common::conditional_cleaning()
00279 {
00280
00281 if (time_stamp(-CLEANUP_INTERVAL) > *_last_cleanup) {
00282 _requests->clean_out_deadwood();
00283
00284 _last_cleanup->reset();
00285
00286 }
00287 }
00288
00289 outcome cromp_common::open_common(const internet_address &where)
00290 {
00291 #ifdef DEBUG_CROMP_COMMON
00292 FUNCDEF("open_common");
00293 #endif
00294 if (_singleton && _commlink)
00295 return OKAY;
00296
00297 if (_commlink) WHACK(_commlink);
00298
00299 internet_address other_side = where;
00300
00301 #ifdef DEBUG_CROMP_COMMON
00302 LOG(astring("opening at ") + other_side.text_form());
00303 #endif
00304 _commlink = new spocket(other_side);
00305
00306
00307 return OKAY;
00308 }
00309
00310 outcome cromp_common::close_common()
00311 {
00312 if (_commlink) _commlink->disconnect();
00313 return OKAY;
00314 }
00315
00316 const char *cromp_common::outcome_name(const outcome &to_name)
00317 {
00318 switch (to_name.value()) {
00319 case TOO_FULL: return "TOO_FULL";
00320 case PARTIAL: return "PARTIAL";
00321 default: return communication_commons::outcome_name(to_name);
00322 }
00323 }
00324
00325 outcome cromp_common::pack_and_ship(const infoton_list &requests,
00326 int max_tries)
00327 {
00328 FUNCDEF("pack_and_ship [multiple]");
00329 if (!_commlink) return BAD_INPUT;
00330 conditional_cleaning();
00331 {
00332 auto_synchronizer l(*_accum_lock);
00333 for (int i = 0; i < requests.elements(); i++) {
00334 if (!requests[i] || !requests[i]->_data) {
00335
00336 LOG("error in infoton_list; missing data element.");
00337 continue;
00338 }
00339 cromp_transaction::flatten(*_sendings, *requests[i]->_data,
00340 requests[i]->_id);
00341 }
00342 }
00343
00344 return push_outgoing(max_tries);
00345 }
00346
00347 bool cromp_common::buffer_clog(int max_buff) const
00348 {
00349 auto_synchronizer l(*_accum_lock);
00350 return _sendings->length() >= max_buff;
00351 }
00352
00353 outcome cromp_common::pack_and_ship(const infoton &request,
00354 const octopus_request_id &item_id, int max_tries)
00355 {
00356 #ifdef DEBUG_CROMP_COMMON
00357 FUNCDEF("pack_and_ship [single]");
00358 #endif
00359 if (!_commlink) return BAD_INPUT;
00360 conditional_cleaning();
00361
00362 #ifdef DEBUG_CROMP_COMMON
00363 LOG(astring("sending req ") + item_id.mangled_form());
00364 #endif
00365
00366 {
00367 auto_synchronizer l(*_accum_lock);
00368 cromp_transaction::flatten(*_sendings, request, item_id);
00369 }
00370
00371 return push_outgoing(max_tries);
00372 }
00373
00374 outcome cromp_common::push_outgoing(int max_tries)
00375 {
00376 FUNCDEF("push_outgoing");
00377
00378 if (!max_tries) return cromp_common::OKAY;
00379
00380
00381 grab_anything(false);
00382
00383 outcome to_return = cromp_common::TOO_FULL;
00384 int attempts = 0;
00385 while ( (attempts++ < max_tries) && (to_return == cromp_common::TOO_FULL) ) {
00386 to_return = send_buffer();
00387 grab_anything(false);
00388 if (to_return == cromp_common::OKAY)
00389 break;
00390 if (to_return == cromp_common::PARTIAL) {
00391
00392 attempts = 0;
00393 to_return = cromp_common::TOO_FULL;
00394
00395 continue;
00396 }
00397 if (to_return == cromp_common::TOO_FULL) {
00398
00399
00400 time_stamp stop_pausing(SEND_DELAY_TIME);
00401 while (time_stamp() < stop_pausing) {
00402 LOG("into too full looping...");
00403 if (!_commlink->connected()) break;
00404 grab_anything(true);
00405
00406 outcome ret = _commlink->await_writable(QUICK_CROMP_SNOOZE);
00407 if (ret != spocket::NONE_READY)
00408 break;
00409 }
00410 } else {
00411 LOG(astring("failed send: ") + cromp_common::outcome_name(to_return));
00412 break;
00413 }
00414 }
00415 return to_return;
00416 }
00417
00418
00419
00420
00421 outcome cromp_common::send_buffer()
00422 {
00423 #ifdef DEBUG_CROMP_COMMON
00424 FUNCDEF("send_buffer");
00425 #endif
00426 auto_synchronizer l(*_accum_lock);
00427
00428
00429 if (!_sendings->length())
00430 return OKAY;
00431
00432 int size_to_send = minimum(_sendings->length(), MAXIMUM_SEND);
00433 #ifdef DEBUG_CROMP_COMMON
00434
00435
00436 #endif
00437 int len_sent = 0;
00438 outcome to_return;
00439 outcome send_ret = _commlink->send(_sendings->observe(), size_to_send,
00440 len_sent);
00441 switch (send_ret.value()) {
00442 case spocket::OKAY: {
00443
00444 #ifdef DEBUG_CROMP_COMMON
00445
00446
00447 #endif
00448 _bytes_sent_total += len_sent;
00449 to_return = OKAY;
00450 break;
00451 }
00452 case spocket::PARTIAL: {
00453
00454 #ifdef DEBUG_CROMP_COMMON
00455 LOG(a_sprintf("partial send of %d bytes (of %d desired) on socket %d.",
00456 len_sent, size_to_send, _commlink->OS_socket()));
00457 #endif
00458 _bytes_sent_total += len_sent;
00459 to_return = PARTIAL;
00460 break;
00461 }
00462 case spocket::NONE_READY: {
00463
00464 #ifdef DEBUG_CROMP_COMMON
00465 LOG(a_sprintf("too full to send any on socket %d.",
00466 _commlink->OS_socket()));
00467 #endif
00468 len_sent = 0;
00469 to_return = TOO_FULL;
00470 break;
00471 }
00472 default: {
00473
00474 #ifdef DEBUG_CROMP_COMMON
00475 LOG(astring("failing send with ") + spocket::outcome_name(send_ret));
00476 #endif
00477 len_sent = 0;
00478
00479
00480 if (send_ret == spocket::NO_CONNECTION) to_return = NO_CONNECTION;
00481 else if (send_ret == spocket::TIMED_OUT) to_return = TIMED_OUT;
00482
00483 else to_return = DISALLOWED;
00484
00485 #ifdef DEBUG_CROMP_COMMON
00486 LOG(astring("failed to send--got error ") + outcome_name(to_return));
00487 #endif
00488 break;
00489 }
00490 }
00491
00492 if ( (to_return == PARTIAL) || (to_return == OKAY) ) {
00493
00494 _sendings->zap(0, len_sent - 1);
00495 }
00496
00497 return to_return;
00498 }
00499
00500 outcome cromp_common::retrieve_and_restore_root(bool get_anything,
00501 infoton * &item, octopus_request_id &req_id, int timeout)
00502 {
00503
00504 item = NIL;
00505 if (!_commlink) return BAD_INPUT;
00506 octopus_request_id tmp_id;
00507 time_stamp leaving_time(timeout);
00508
00509 conditional_cleaning();
00510
00511 do {
00512
00513 if (get_anything)
00514 item = _requests->acquire_for_any(req_id);
00515 else
00516 item = _requests->acquire_for_identifier(req_id);
00517 if (item)
00518 return OKAY;
00519
00520
00521 grab_anything(timeout? true : false);
00522
00523 push_outgoing(1);
00524
00525
00526
00527
00528 if (get_anything)
00529 item = _requests->acquire_for_any(req_id);
00530 else
00531 item = _requests->acquire_for_identifier(req_id);
00532 if (item)
00533 return OKAY;
00534
00535 if (!timeout) return TIMED_OUT;
00536
00537
00538 if (!_commlink->connected()) return NO_CONNECTION;
00539
00540
00541 } while (time_stamp() < leaving_time);
00542 return TIMED_OUT;
00543 }
00544
00545 outcome cromp_common::retrieve_and_restore(infoton * &item,
00546 const octopus_request_id &req_id_in, int timeout)
00547 {
00548 octopus_request_id req_id = req_id_in;
00549 return retrieve_and_restore_root(false, item, req_id, timeout);
00550 }
00551
00552 outcome cromp_common::retrieve_and_restore_any(infoton * &item,
00553 octopus_request_id &req_id, int timeout)
00554 { return retrieve_and_restore_root(true, item, req_id, timeout); }
00555
00556
00557
00558
00559 void cromp_common::snarf_from_socket(bool wait)
00560 {
00561 #ifdef DEBUG_CROMP_COMMON
00562 FUNCDEF("snarf_from_socket");
00563 #endif
00564 if (wait) {
00565 #ifdef DEBUG_CROMP_COMMON
00566
00567 #endif
00568
00569 time_stamp stop_pausing(DATA_AWAIT_SNOOZE);
00570 while (time_stamp() < stop_pausing) {
00571 if (!_commlink->connected()) return;
00572 outcome wait_ret = _commlink->await_readable(QUICK_CROMP_SNOOZE);
00573 if (wait_ret != spocket::NONE_READY)
00574 break;
00575 send_buffer();
00576 }
00577 }
00578
00579 outcome rcv_ret = spocket::OKAY;
00580
00581 int receptions = 0;
00582 while ( (rcv_ret == spocket::OKAY) && (receptions++ < MAXIMUM_RECEIVES) ) {
00583 int rcv_size = CROMP_BUFFER_CHUNK_SIZE;
00584 {
00585 auto_synchronizer l(*_accum_lock);
00586 _receive_buffer->reset();
00587 rcv_ret = _commlink->receive(*_receive_buffer, rcv_size);
00588 #ifdef DEBUG_CROMP_COMMON
00589 if ( (rcv_ret == spocket::OKAY) && rcv_size) {
00590 LOG(a_sprintf("received %d bytes on socket %d", rcv_size,
00591 _commlink->OS_socket()));
00592 } else if (rcv_ret != spocket::NONE_READY) {
00593 LOG(a_sprintf("no data on sock %d--outcome=", _commlink->OS_socket())
00594 + spocket::outcome_name(rcv_ret));
00595 }
00596 #endif
00597 if ( (rcv_ret == spocket::OKAY) && rcv_size) {
00598
00599 _bytes_received_total += _receive_buffer->length();
00600 *_accumulator += *_receive_buffer;
00601 _last_data_seen->reset();
00602 }
00603 }
00604
00605 send_buffer();
00606
00607 }
00608 }
00609
00610 void cromp_common::grab_anything(bool wait)
00611 {
00612 snarf_from_socket(wait);
00613 process_accumulator();
00614 }
00615
00616 #define CHECK_STALENESS \
00617 if (*_last_data_seen < time_stamp(-STALENESS_PERIOD)) { \
00618 LOG("would resynch data due to staleness."); \
00619 _accumulator->zap(0, 0); \
00620 cromp_transaction::resynchronize(*_accumulator); \
00621 _last_data_seen->reset(); \
00622 continue; \
00623 }
00624
00625
00626
00627
00628 void cromp_common::process_accumulator()
00629 {
00630 FUNCDEF("process_accumulator");
00631 infoton *item = NIL;
00632 octopus_request_id req_id;
00633
00634 string_array clas;
00635
00636 if (!_accumulator->length()) return;
00637
00638
00639 byte_array temp_chow_buffer(CROMP_BUFFER_CHUNK_SIZE, NIL);
00640 temp_chow_buffer.reset();
00641
00642 int cmds_found = 0;
00643
00644 while (_accumulator->length()) {
00645 LOG(a_sprintf("eating command %d", cmds_found++));
00646 {
00647
00648 auto_synchronizer l(*_accum_lock);
00649
00650 int packed_length = 0;
00651 outcome peek_ret = cromp_transaction::peek_header(*_accumulator,
00652 packed_length);
00653 if ( (peek_ret == cromp_transaction::WAY_TOO_SMALL)
00654 || (peek_ret == cromp_transaction::PARTIAL) ) {
00655
00656 CHECK_STALENESS;
00657 return;
00658 } else if (peek_ret != cromp_transaction::OKAY) {
00659 LOG(astring("error unpacking--peek error=")
00660 + cromp_transaction::outcome_name(peek_ret));
00661
00662 _accumulator->zap(0, 0);
00663 if (cromp_transaction::resynchronize(*_accumulator)) continue;
00664 return;
00665 }
00666
00667 #ifdef DEBUG_CROMP_COMMON
00668 LOG("seeing command ready");
00669 #endif
00670
00671 if (!cromp_transaction::unflatten(*_accumulator, *_still_flat, req_id)) {
00672 LOG("failed to unpack even though peek was happy!");
00673
00674 _accumulator->zap(0, 0);
00675 if (cromp_transaction::resynchronize(*_accumulator)) continue;
00676 return;
00677 }
00678 #ifdef DEBUG_CROMP_COMMON
00679 LOG(astring("got req id of ") + req_id.mangled_form());
00680 #endif
00681
00682
00683 if (!infoton::fast_unpack(*_still_flat, clas, temp_chow_buffer)) {
00684
00685 LOG("failed to get back a packed infoton!");
00686 _accumulator->zap(0, 0);
00687 if (cromp_transaction::resynchronize(*_accumulator)) continue;
00688 return;
00689 }
00690 #ifdef DEBUG_CROMP_COMMON
00691 LOG(astring("got classifier of ") + clas.text_form());
00692 #endif
00693 }
00694
00695
00696 outcome rest_ret = octo()->restore(clas, temp_chow_buffer, item);
00697 if (rest_ret != tentacle::OKAY) {
00698 #ifdef DEBUG_CROMP_COMMON
00699 LOG(astring("our octopus couldn't restore the packed data! ")
00700 + outcome_name(rest_ret));
00701 #endif
00702
00703 _requests->add_item(new unhandled_request(req_id, clas, rest_ret),
00704 req_id);
00705 } else {
00706
00707 if (_requests->add_item(item, req_id))
00708 cmds_found++;
00709 #ifdef DEBUG_CROMP_COMMON
00710 else
00711 LOG("failed to add item to bin due to space constraints.");
00712 #endif
00713 }
00714 LOG(a_sprintf("ate command %d", cmds_found));
00715 }
00717 }
00718
00719 bool cromp_common::decode_host(const astring &coded_host, astring &hostname,
00720 machine_uid &machine)
00721 {
00722 if (coded_host.length() < HOSTCHOP) return false;
00723 hostname = coded_host.substring(0, cromp_common::HOSTCHOP - 1);
00724 const astring compact_uid = coded_host.substring(cromp_common::HOSTCHOP,
00725 coded_host.length() - 1);
00726 machine = machine_uid::expand(compact_uid);
00727 if (!machine.valid()) return false;
00728 return true;
00729 }
00730
00731 }
00732