00001 #ifndef CROMP_COMMON_IMPLEMENTATION_FILE
00002 #define CROMP_COMMON_IMPLEMENTATION_FILE
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034 #include "cromp_common.h"
00035 #include "cromp_transaction.h"
00036
00037 #include <basis/byte_array.h>
00038 #include <basis/function.h>
00039 #include <basis/istring.h>
00040 #include <basis/log_base.h>
00041 #include <basis/mutex.h>
00042 #include <basis/portable.h>
00043 #include <data_struct/static_memory_gremlin.h>
00044 #include <mechanisms/time_stamp.h>
00045 #include <octopus/entity_data_bin.h>
00046 #include <octopus/entity_defs.h>
00047 #include <octopus/infoton.h>
00048 #include <octopus/octopus.h>
00049 #include <octopus/tentacle.h>
00050 #include <octopus/unhandled_request.h>
00051 #include <loggers/file_logger.h>
00052 #include <sockets/address.h>
00053 #include <sockets/machine_uid.h>
00054 #include <sockets/spocket.h>
00055 #include <sockets/tcpip_stack.h>
00056 #include <textual/byte_format.h>
00057
00058
00059
00060
00061 #undef LOG
00062 #ifdef DEBUG_CROMP_COMMON
00063
00064
00065
00066 #define LOG(s) CLASS_EMERGENCY_LOG(file_logger(portable::env_string("TMP") + "/cromp_common.log"), s)
00067 #else
00068 #define LOG(s)
00069 #endif
00070
00071 const int STALENESS_PERIOD = 2 * MINUTE_ms;
00072
00073
00074
00075 const int SEND_DELAY_TIME = 200;
00076
00077
00078 const int DATA_AWAIT_SNOOZE = 80;
00079
00080
00081 const int QUICK_CROMP_SNOOZE = 28;
00082
00083
00084
00085 const int CROMP_BUFFER_CHUNK_SIZE = 256 * KILOBYTE;
00086
00087
00088 const int MAXIMUM_RECEIVES = 70;
00089
00090
00091 const int MAXIMUM_SEND = 128 * KILOBYTE;
00092
00093
00094
00095 const int CLEANUP_INTERVAL = 28 * SECOND_ms;
00096
00097
00098
00099 const int cromp_common::HOSTCHOP = 6;
00100
00101
00102
00103 double cromp_common::_bytes_sent_total = 0.0;
00104 double cromp_common::_bytes_received_total = 0.0;
00105
00106 #ifndef OMIT_CRYPTO_SUPPORT
00107 #include <crypto/rsa_crypto.h>
00108 #include <tentacles/encryption_infoton.h>
00109 SAFE_STATIC_CONST(RSA_crypto, _hidden_localhost_only_key,
00110 (encryption_infoton::RSA_KEY_SIZE))
00111 const RSA_crypto &cromp_common::localhost_only_key() {
00112 FUNCDEF("localhost_only_key");
00113 static bool initted = false;
00114 #ifdef DEBUG_CROMP_COMMON
00115 bool was_initted = initted;
00116 #endif
00117 initted = true;
00118 #ifdef DEBUG_CROMP_COMMON
00119 if (!was_initted)
00120 LOG("started creating localhost RSA key.");
00121 #endif
00122 const RSA_crypto &to_return = _hidden_localhost_only_key();
00123 #ifdef DEBUG_CROMP_COMMON
00124 if (!was_initted)
00125 LOG("done creating localhost RSA key.");
00126 #endif
00127 return to_return;
00128 }
00129 #endif
00130
00131 cromp_common::cromp_common(const istring &host, int max_per_ent)
00132 : _commlink(NIL),
00133 _octopus(new octopus(host, max_per_ent)),
00134 _singleton(NIL),
00135 _requests(new entity_data_bin(max_per_ent)),
00136 _accum_lock(new mutex),
00137 _last_data_seen(new time_stamp),
00138 _accumulator(new byte_array(CROMP_BUFFER_CHUNK_SIZE, NIL)),
00139 _sendings(new byte_array(CROMP_BUFFER_CHUNK_SIZE, NIL)),
00140 _receive_buffer(new byte_array(CROMP_BUFFER_CHUNK_SIZE, NIL)),
00141 _still_flat(new byte_array(CROMP_BUFFER_CHUNK_SIZE, NIL)),
00142 _last_cleanup(new time_stamp)
00143 {
00144 FUNCDEF("constructor [host/max_per_ent]");
00145
00146 _accumulator->reset();
00147 _sendings->reset();
00148 _receive_buffer->reset();
00149 _still_flat->reset();
00150 }
00151
00152 cromp_common::cromp_common(spocket *preexisting, octopus *singleton)
00153 : _commlink(preexisting),
00154 _octopus(singleton),
00155 _singleton(singleton),
00156 _requests(new entity_data_bin(singleton?
00157 singleton->responses().max_bytes_per_entity()
00158 : DEFAULT_MAX_ENTITY_QUEUE)),
00159 _accum_lock(new mutex),
00160 _last_data_seen(new time_stamp),
00161 _accumulator(new byte_array(CROMP_BUFFER_CHUNK_SIZE, NIL)),
00162 _sendings(new byte_array(CROMP_BUFFER_CHUNK_SIZE, NIL)),
00163 _receive_buffer(new byte_array(CROMP_BUFFER_CHUNK_SIZE, NIL)),
00164 _still_flat(new byte_array(CROMP_BUFFER_CHUNK_SIZE, NIL)),
00165 _last_cleanup(new time_stamp)
00166 {
00167 FUNCDEF("constructor [preexisting/singleton]");
00168 if (!_octopus) {
00169
00170 LOG("singleton passed as NIL; constructing new octopus instead.");
00171 internet_address local(internet_address::localhost(), "localhost", 0);
00172 _octopus = new octopus(chew_hostname(local), DEFAULT_MAX_ENTITY_QUEUE);
00173 }
00174
00175 _accumulator->reset();
00176 _sendings->reset();
00177 _receive_buffer->reset();
00178 _still_flat->reset();
00179 }
00180
00181 cromp_common::~cromp_common()
00182 {
00183 FUNCDEF("destructor");
00184 close_common();
00185 if (_singleton) {
00186 _singleton = NIL;
00187 _octopus = NIL;
00188 } else {
00189
00190 WHACK(_octopus);
00191 }
00192 WHACK(_accumulator);
00193 WHACK(_sendings);
00194 WHACK(_commlink);
00195 WHACK(_requests);
00196 WHACK(_last_cleanup);
00197 WHACK(_last_data_seen);
00198 WHACK(_receive_buffer);
00199 WHACK(_still_flat);
00200 WHACK(_accum_lock);
00201 }
00202
00203 spocket *cromp_common::spock() const { return _commlink; }
00204
00205 int cromp_common::default_port() { return 10008; }
00206
00207 outcome cromp_common::add_tentacle(tentacle *to_add, bool filter)
00208 { return _octopus->add_tentacle(to_add, filter); }
00209
00210 int cromp_common::pending_sends() const
00211 {
00212 auto_synchronizer l(*_accum_lock);
00213 return _sendings->length();
00214 }
00215
00216 int cromp_common::accumulated_bytes() const
00217 {
00218 auto_synchronizer l(*_accum_lock);
00219 return _accumulator->length();
00220 }
00221
00222 istring cromp_common::chew_hostname(const internet_address &addr,
00223 internet_address *resolved_form)
00224 {
00225 FUNCDEF("chew_hostname");
00226 #ifdef DEBUG_CROMP_COMMON
00227 LOG(istring("addr coming in ") + addr.text_form());
00228 #endif
00229 tcpip_stack stack;
00230 bool worked;
00231 internet_address res1 = stack.fill_and_resolve(addr.hostname, addr.port,
00232 worked);
00233 if (worked) {
00234 if (resolved_form) *resolved_form = res1;
00235 #ifdef DEBUG_CROMP_COMMON
00236 LOG(istring("resolved addr ") + res1.text_form());
00237 #endif
00238 } else {
00239 #ifdef DEBUG_CROMP_COMMON
00240 LOG(istring("failed to resolve host=") + addr.hostname);
00241 #endif
00242 }
00243
00244
00245 istring just_host = res1.normalize_host();
00246 while (just_host.length() < HOSTCHOP) just_host += "-";
00247 machine_uid converted = res1.convert();
00248 istring to_return = just_host.substring(0, HOSTCHOP - 1);
00249 to_return += converted.compact_form();
00250
00251 #ifdef DEBUG_CROMP_COMMON
00252 LOG(istring("returning machid ") + converted.text_form() + ", packed as "
00253 + log_base::platform_ending()
00254 + byte_format::text_dump((byte *)to_return.s(),
00255 to_return.length() + 1));
00256 #endif
00257
00258 return to_return;
00259 }
00260
00261 istring cromp_common::responses_text_form() const
00262 { return _requests->text_form(); }
00263
00264 internet_address cromp_common::other_side() const
00265 {
00266 if (!_commlink) return internet_address();
00267 return _commlink->where();
00268 }
00269
00270 int cromp_common::max_bytes_per_entity() const
00271 { return _requests->max_bytes_per_entity(); }
00272
00273 void cromp_common::max_bytes_per_entity(int max_bytes_per_entity)
00274 {
00275 _requests->max_bytes_per_entity(max_bytes_per_entity);
00276 _octopus->responses().max_bytes_per_entity(max_bytes_per_entity);
00277 }
00278
00279 void cromp_common::conditional_cleaning()
00280 {
00281 FUNCDEF("conditional_cleaning");
00282 if (time_stamp(-CLEANUP_INTERVAL) > *_last_cleanup) {
00283 _requests->clean_out_deadwood();
00284
00285 _last_cleanup->reset();
00286
00287 }
00288 }
00289
00290 outcome cromp_common::open_common(const internet_address &where)
00291 {
00292 FUNCDEF("open_common");
00293 if (_singleton && _commlink)
00294 return OKAY;
00295
00296 if (_commlink) WHACK(_commlink);
00297
00298 internet_address other_side = where;
00299
00300 #ifdef DEBUG_CROMP_COMMON
00301 LOG(istring("opening at ") + other_side.text_form());
00302 #endif
00303 _commlink = new spocket(other_side);
00304
00305
00306 return OKAY;
00307 }
00308
00309 outcome cromp_common::close_common()
00310 {
00311 if (_commlink) _commlink->disconnect();
00312 return OKAY;
00313 }
00314
00315 const char *cromp_common::outcome_name(const outcome &to_name)
00316 {
00317 switch (to_name.value()) {
00318 case TOO_FULL: return "TOO_FULL";
00319 case PARTIAL: return "PARTIAL";
00320 default: return communication_commons::outcome_name(to_name);
00321 }
00322 }
00323
00324 outcome cromp_common::pack_and_ship(const infoton_list &requests,
00325 int max_tries)
00326 {
00327 FUNCDEF("pack_and_ship [multiple]");
00328 if (!_commlink) return BAD_INPUT;
00329 conditional_cleaning();
00330 {
00331 auto_synchronizer l(*_accum_lock);
00332 for (int i = 0; i < requests.elements(); i++) {
00333 if (!requests[i] || !requests[i]->_data) {
00334
00335 LOG("error in infoton_list; missing data element.");
00336 continue;
00337 }
00338 cromp_transaction::flatten(*_sendings, *requests[i]->_data,
00339 requests[i]->_id);
00340 }
00341 }
00342
00343 return push_outgoing(max_tries);
00344 }
00345
00346 bool cromp_common::buffer_clog(int max_buff) const
00347 {
00348 auto_synchronizer l(*_accum_lock);
00349 return _sendings->length() >= max_buff;
00350 }
00351
00352 outcome cromp_common::pack_and_ship(const infoton &request,
00353 const octopus_request_id &item_id, int max_tries)
00354 {
00355 FUNCDEF("pack_and_ship [single]");
00356 if (!_commlink) return BAD_INPUT;
00357 conditional_cleaning();
00358
00359 #ifdef DEBUG_CROMP_COMMON
00360 LOG(istring("sending req ") + item_id.mangled_form());
00361 #endif
00362
00363 {
00364 auto_synchronizer l(*_accum_lock);
00365 cromp_transaction::flatten(*_sendings, request, item_id);
00366 }
00367
00368 return push_outgoing(max_tries);
00369 }
00370
00371 outcome cromp_common::push_outgoing(int max_tries)
00372 {
00373 FUNCDEF("push_outgoing");
00374
00375 if (!max_tries) return cromp_common::OKAY;
00376
00377
00378 grab_anything(false);
00379
00380 outcome to_return = cromp_common::TOO_FULL;
00381 int attempts = 0;
00382 while ( (attempts++ < max_tries) && (to_return == cromp_common::TOO_FULL) ) {
00383 to_return = send_buffer();
00384 grab_anything(false);
00385 if (to_return == cromp_common::OKAY)
00386 break;
00387 if (to_return == cromp_common::PARTIAL) {
00388
00389 attempts = 0;
00390 to_return = cromp_common::TOO_FULL;
00391
00392 continue;
00393 }
00394 if (to_return == cromp_common::TOO_FULL) {
00395
00396
00397 time_stamp stop_pausing(SEND_DELAY_TIME);
00398 while (time_stamp() < stop_pausing) {
00399 LOG("into too full looping...");
00400 if (!_commlink->connected()) break;
00401 grab_anything(true);
00402
00403 outcome ret = _commlink->await_writable(QUICK_CROMP_SNOOZE);
00404 if (ret != spocket::NONE_READY)
00405 break;
00406 }
00407 } else {
00408 LOG(istring("failed send: ") + cromp_common::outcome_name(to_return));
00409 break;
00410 }
00411 }
00412 return to_return;
00413 }
00414
00415
00416
00417
00418 outcome cromp_common::send_buffer()
00419 {
00420 FUNCDEF("send_buffer");
00421 auto_synchronizer l(*_accum_lock);
00422
00423
00424 if (!_sendings->length())
00425 return OKAY;
00426
00427 int size_to_send = minimum(_sendings->length(), MAXIMUM_SEND);
00428 #ifdef DEBUG_CROMP_COMMON
00429
00430
00431 #endif
00432 int len_sent = 0;
00433 outcome to_return;
00434 outcome send_ret = _commlink->send(_sendings->observe(), size_to_send,
00435 len_sent);
00436 switch (send_ret.value()) {
00437 case spocket::OKAY: {
00438
00439 #ifdef DEBUG_CROMP_COMMON
00440
00441
00442 #endif
00443 _bytes_sent_total += len_sent;
00444 to_return = OKAY;
00445 break;
00446 }
00447 case spocket::PARTIAL: {
00448
00449 #ifdef DEBUG_CROMP_COMMON
00450 LOG(isprintf("partial send of %d bytes (of %d desired) on socket %d.",
00451 len_sent, size_to_send, _commlink->OS_socket()));
00452 #endif
00453 _bytes_sent_total += len_sent;
00454 to_return = PARTIAL;
00455 break;
00456 }
00457 case spocket::NONE_READY: {
00458
00459 #ifdef DEBUG_CROMP_COMMON
00460 LOG(isprintf("too full to send any on socket %d.",
00461 _commlink->OS_socket()));
00462 #endif
00463 len_sent = 0;
00464 to_return = TOO_FULL;
00465 break;
00466 }
00467 default: {
00468
00469 #ifdef DEBUG_CROMP_COMMON
00470 LOG(istring("failing send with ") + spocket::outcome_name(send_ret));
00471 #endif
00472 len_sent = 0;
00473
00474
00475 if (send_ret == spocket::NO_CONNECTION) to_return = NO_CONNECTION;
00476 else if (send_ret == spocket::TIMED_OUT) to_return = TIMED_OUT;
00477
00478 else to_return = DISALLOWED;
00479
00480 #ifdef DEBUG_CROMP_COMMON
00481 LOG(istring("failed to send--got error ") + outcome_name(to_return));
00482 #endif
00483 break;
00484 }
00485 }
00486
00487 if ( (to_return == PARTIAL) || (to_return == OKAY) ) {
00488
00489 _sendings->zap(0, len_sent - 1);
00490 }
00491
00492 return to_return;
00493 }
00494
00495 outcome cromp_common::retrieve_and_restore_root(bool get_anything,
00496 infoton * &item, octopus_request_id &req_id, int timeout)
00497 {
00498 FUNCDEF("retrieve_and_restore_root");
00499 item = NIL;
00500 if (!_commlink) return BAD_INPUT;
00501 octopus_request_id tmp_id;
00502 outcome to_return;
00503 time_stamp leaving_time(timeout);
00504
00505 conditional_cleaning();
00506
00507 do {
00508
00509 if (get_anything)
00510 item = _requests->acquire_for_any(req_id);
00511 else
00512 item = _requests->acquire_for_identifier(req_id);
00513 if (item)
00514 return OKAY;
00515
00516
00517 grab_anything(timeout? true : false);
00518
00519 push_outgoing(1);
00520
00521
00522
00523 if (get_anything)
00524 item = _requests->acquire_for_any(req_id);
00525 else
00526 item = _requests->acquire_for_identifier(req_id);
00527 if (item)
00528 return OKAY;
00529
00530 if (!timeout) return TIMED_OUT;
00531
00532
00533 if (!_commlink->connected()) return NO_CONNECTION;
00534
00535
00536 } while (time_stamp() < leaving_time);
00537 return TIMED_OUT;
00538 }
00539
00540 outcome cromp_common::retrieve_and_restore(infoton * &item,
00541 const octopus_request_id &req_id_in, int timeout)
00542 {
00543 octopus_request_id req_id = req_id_in;
00544 return retrieve_and_restore_root(false, item, req_id, timeout);
00545 }
00546
00547 outcome cromp_common::retrieve_and_restore_any(infoton * &item,
00548 octopus_request_id &req_id, int timeout)
00549 { return retrieve_and_restore_root(true, item, req_id, timeout); }
00550
00551
00552
00553
00554 void cromp_common::snarf_from_socket(bool wait)
00555 {
00556 FUNCDEF("snarf_from_socket");
00557 if (wait) {
00558 #ifdef DEBUG_CROMP_COMMON
00559
00560 #endif
00561
00562 time_stamp stop_pausing(DATA_AWAIT_SNOOZE);
00563 while (time_stamp() < stop_pausing) {
00564 if (!_commlink->connected()) return;
00565 outcome wait_ret = _commlink->await_readable(QUICK_CROMP_SNOOZE);
00566 if (wait_ret != spocket::NONE_READY)
00567 break;
00568 send_buffer();
00569 }
00570 }
00571
00572 outcome rcv_ret = spocket::OKAY;
00573
00574 int receptions = 0;
00575 while ( (rcv_ret == spocket::OKAY) && (receptions++ < MAXIMUM_RECEIVES) ) {
00576 int rcv_size = CROMP_BUFFER_CHUNK_SIZE;
00577 {
00578 auto_synchronizer l(*_accum_lock);
00579 _receive_buffer->reset();
00580 rcv_ret = _commlink->receive(*_receive_buffer, rcv_size);
00581 #ifdef DEBUG_CROMP_COMMON
00582 if ( (rcv_ret == spocket::OKAY) && rcv_size) {
00583 LOG(isprintf("received %d bytes on socket %d", rcv_size,
00584 _commlink->OS_socket()));
00585 } else if (rcv_ret != spocket::NONE_READY) {
00586 LOG(isprintf("no data on sock %d--outcome=", _commlink->OS_socket())
00587 + spocket::outcome_name(rcv_ret));
00588 }
00589 #endif
00590 if ( (rcv_ret == spocket::OKAY) && rcv_size) {
00591
00592 _bytes_received_total += _receive_buffer->length();
00593 *_accumulator += *_receive_buffer;
00594 _last_data_seen->reset();
00595 }
00596 }
00597
00598 send_buffer();
00599
00600 }
00601 }
00602
00603 void cromp_common::grab_anything(bool wait)
00604 {
00605 snarf_from_socket(wait);
00606 process_accumulator();
00607 }
00608
00609 #define CHECK_STALENESS \
00610 if (*_last_data_seen < time_stamp(-STALENESS_PERIOD)) { \
00611 LOG("would resynch data due to staleness."); \
00612 _accumulator->zap(0, 0); \
00613 cromp_transaction::resynchronize(*_accumulator); \
00614 _last_data_seen->reset(); \
00615 continue; \
00616 }
00617
00618
00619
00620
00621 void cromp_common::process_accumulator()
00622 {
00623 FUNCDEF("process_accumulator");
00624 infoton *item = NIL;
00625 octopus_request_id req_id;
00626
00627 string_array clas;
00628
00629 if (!_accumulator->length()) return;
00630
00631
00632 byte_array temp_chow_buffer(CROMP_BUFFER_CHUNK_SIZE, NIL);
00633 temp_chow_buffer.reset();
00634
00635 int cmds_found = 0;
00636
00637 while (_accumulator->length()) {
00638 LOG(isprintf("eating command %d", cmds_found++));
00639 {
00640
00641 auto_synchronizer l(*_accum_lock);
00642
00643 int packed_length = 0;
00644 outcome peek_ret = cromp_transaction::peek_header(*_accumulator,
00645 packed_length);
00646 if ( (peek_ret == cromp_transaction::WAY_TOO_SMALL)
00647 || (peek_ret == cromp_transaction::PARTIAL) ) {
00648
00649 CHECK_STALENESS;
00650 return;
00651 } else if (peek_ret != cromp_transaction::OKAY) {
00652 LOG(istring("error unpacking--peek error=")
00653 + cromp_transaction::outcome_name(peek_ret));
00654
00655 _accumulator->zap(0, 0);
00656 if (cromp_transaction::resynchronize(*_accumulator)) continue;
00657 return;
00658 }
00659
00660 #ifdef DEBUG_CROMP_COMMON
00661 LOG("seeing command ready");
00662 #endif
00663
00664 if (!cromp_transaction::unflatten(*_accumulator, *_still_flat, req_id)) {
00665 LOG("failed to unpack even though peek was happy!");
00666
00667 _accumulator->zap(0, 0);
00668 if (cromp_transaction::resynchronize(*_accumulator)) continue;
00669 return;
00670 }
00671 #ifdef DEBUG_CROMP_COMMON
00672 LOG(istring("got req id of ") + req_id.mangled_form());
00673 #endif
00674
00675
00676 if (!infoton::fast_unpack(*_still_flat, clas, temp_chow_buffer)) {
00677
00678 LOG("failed to get back a packed infoton!");
00679 _accumulator->zap(0, 0);
00680 if (cromp_transaction::resynchronize(*_accumulator)) continue;
00681 return;
00682 }
00683 #ifdef DEBUG_CROMP_COMMON
00684 LOG(istring("got classifier of ") + clas.text_form());
00685 #endif
00686 }
00687
00688
00689 outcome rest_ret = octo()->restore(clas, temp_chow_buffer, item);
00690 if (rest_ret != tentacle::OKAY) {
00691 #ifdef DEBUG_CROMP_COMMON
00692 LOG(istring("our octopus couldn't restore the packed data! ")
00693 + outcome_name(rest_ret));
00694 #endif
00695
00696 _requests->add_item(new unhandled_request(req_id, clas, rest_ret),
00697 req_id);
00698 } else {
00699
00700 if (_requests->add_item(item, req_id))
00701 cmds_found++;
00702 #ifdef DEBUG_CROMP_COMMON
00703 else
00704 LOG("failed to add item to bin due to space constraints.");
00705 #endif
00706 }
00707 LOG(isprintf("ate command %d", cmds_found));
00708 }
00710 }
00711
00712 bool cromp_common::decode_host(const istring &coded_host, istring &hostname,
00713 machine_uid &machine)
00714 {
00715 if (coded_host.length() < HOSTCHOP) return false;
00716 hostname = coded_host.substring(0, cromp_common::HOSTCHOP - 1);
00717 const istring compact_uid = coded_host.substring(cromp_common::HOSTCHOP,
00718 coded_host.length() - 1);
00719 machine = machine_uid::expand(compact_uid);
00720 if (!machine.valid()) return false;
00721 return true;
00722 }
00723
00724
00725 #endif //CROMP_COMMON_IMPLEMENTATION_FILE
00726