00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015 #include "cromp_common.h"
00016 #include "cromp_security.h"
00017 #include "cromp_server.h"
00018
00019 #include <basis/astring.h>
00020 #include <basis/functions.h>
00021 #include <basis/mutex.h>
00022 #include <loggers/program_wide_logger.h>
00023 #include <octopus/entity_data_bin.h>
00024 #include <octopus/entity_defs.h>
00025 #include <octopus/identity_infoton.h>
00026 #include <octopus/infoton.h>
00027 #include <octopus/tentacle.h>
00028 #include <octopus/unhandled_request.h>
00029 #include <processes/ethread.h>
00030 #include <processes/thread_cabinet.h>
00031 #include <sockets/internet_address.h>
00032 #include <sockets/tcpip_stack.h>
00033 #include <sockets/spocket.h>
00034 #include <structures/amorph.h>
00035 #include <structures/unique_id.h>
00036 #include <tentacles/key_repository.h>
00037 #include <tentacles/login_tentacle.h>
00038 #include <tentacles/encryption_tentacle.h>
00039 #include <tentacles/encryption_wrapper.h>
00040 #include <timely/time_control.h>
00041
00042 using namespace basis;
00043 using namespace loggers;
00044 using namespace octopi;
00045 using namespace processes;
00046 using namespace sockets;
00047 using namespace structures;
00048 using namespace timely;
00049
00050 namespace cromp {
00051
00052
00053
00054
00055 const int DEAD_CLIENT_CLEANING_INTERVAL = 1 * SECOND_ms;
00056
00057
00058 const int MAXIMUM_ACTIONS_PER_CLIENT = 4000;
00059
00060
00061
00062 const int SEND_TRIES_ALLOWED = 1;
00063
00064
00065 const int SEND_THRESHOLD = 512 * KILOBYTE;
00066
00067
00068
00069 const int EXTREME_SEND_TRIES_ALLOWED = 28;
00070
00071
00072 const int MAXIMUM_BYTES_PER_SEND = 2 * MEGABYTE;
00073
00074
00075 const int MAXIMUM_SIZE_BATCH = 384 * KILOBYTE;
00076
00077
00078 const int DROPPING_INTERVAL = 500;
00079
00080
00081 const int DATA_AWAIT_TIMEOUT = 14;
00082
00083
00084 const int ACCEPTANCE_SNOOZE = 60;
00085
00086
00087 #undef LOG
00088 #define LOG(to_print) CLASS_EMERGENCY_LOG(program_wide_logger::get(), astring(to_print).s())
00089
00091
00092
00093 class cromp_client_record;
00094
00095 class cromp_data_grabber : public ethread
00096 {
00097 public:
00098 cromp_data_grabber(cromp_client_record &parent, octopus *octo)
00099 : ethread(), _parent(parent), _octo(octo) {}
00100
00101 DEFINE_CLASS_NAME("cromp_data_grabber");
00102
00103 virtual void perform_activity(void *);
00104
00105 private:
00106 cromp_client_record &_parent;
00107 octopus *_octo;
00108 };
00109
00111
00112 class cromp_client_record : public cromp_common
00113 {
00114 public:
00115 cromp_client_record(cromp_server &parent, spocket *client, octopus *octo,
00116 login_tentacle &security)
00117 : cromp_common(client, octo),
00118 _parent(parent),
00119 _octo(octo),
00120 _ent(),
00121 _healthy(true),
00122 _fixated(false),
00123 _grabber(*this, octo),
00124 _waiting(),
00125 _still_connected(true),
00126 _security_arm(security)
00127 {
00128 internet_address local_addr = internet_address
00129 (internet_address::localhost(), client->stack().hostname(), 0);
00130 open_common(local_addr);
00131 _grabber.start(NIL);
00132 }
00133
00134 ~cromp_client_record() {
00135 croak();
00136 }
00137
00138 DEFINE_CLASS_NAME("cromp_client_record");
00139
00140 bool handle_client_needs(ethread &prompter) {
00141 #ifdef DEBUG_CROMP_SERVER
00142 FUNCDEF("handle_client_needs");
00143 time_stamp start;
00144 #endif
00145 if (!_healthy) return false;
00146 if (!spock()->connected()) {
00147 _still_connected = false;
00148 return false;
00149 }
00150 bool keep_going = true;
00151 int actions = 0;
00152 while (keep_going && (actions < MAXIMUM_ACTIONS_PER_CLIENT) ) {
00153
00154
00155 if (prompter.should_stop()) return false;
00156 keep_going = false;
00157 bool ret = get_incoming_data(actions);
00158 if (ret) keep_going = true;
00159 ret = push_client_replies(actions);
00160 if (ret) keep_going = true;
00161 }
00162
00163 #ifdef DEBUG_CROMP_SERVER
00164 if (actions > 10) {
00165 LOG(a_sprintf("actions=%d", actions));
00166 LOG(a_sprintf("%d pending send bytes, %d bytes accumulated, bin has "
00167 "%d items.", pending_sends(), accumulated_bytes(),
00168 octo()->responses().items_held()));
00169 }
00170
00171 int duration = int(time_stamp().value() - start.value());
00172 if (duration > 200) {
00173 LOG(a_sprintf("duration=%d ms.", duration));
00174 }
00175 #endif
00176
00177 return true;
00178 }
00179
00180 const octopus_entity &ent() const { return _ent; }
00181
00182
00183
00184 void croak() {
00185
00186 _grabber.stop();
00187 int actions = 0;
00188 while (get_incoming_data(actions)) {
00189
00190
00191 }
00192 _healthy = false;
00193
00194 _security_arm.expunge(_ent);
00195 close_common();
00196 }
00197
00198 bool healthy() const { return _healthy; }
00199
00200
00201 bool still_connected() const { return _still_connected; }
00202
00203
00204 cromp_server &parent() const { return _parent; }
00205
00206 bool push_client_replies(int &actions) {
00207 FUNCDEF("push_client_replies");
00208 if (!healthy()) return false;
00209 if (ent().blank()) {
00210
00211 #ifdef DEBUG_CROMP_SERVER
00212 LOG("not pushing replies for blank.");
00213 #endif
00214 return false;
00215 }
00216
00217 if (buffer_clog(MAXIMUM_BYTES_PER_SEND)) {
00218 LOG("buffer clog being cleared now.");
00219
00220 push_outgoing(EXTREME_SEND_TRIES_ALLOWED);
00221
00222 if (buffer_clog(MAXIMUM_BYTES_PER_SEND)) {
00223 LOG("could not completely clear buffer clog.");
00224 return true;
00225 }
00226 LOG("cleared out buffer clog.");
00227 }
00228
00229 int any_left = true;
00230 while (actions++ < MAXIMUM_ACTIONS_PER_CLIENT) {
00231
00232 if (!_octo->responses().items_held()) {
00233 any_left = false;
00234 break;
00235 }
00236
00237 grab_anything(false);
00238
00239 int num_located = _octo->responses().acquire_for_entity(ent(),
00240 _waiting, MAXIMUM_SIZE_BATCH);
00241 if (!num_located) {
00242 any_left = false;
00243 break;
00244 }
00245
00246
00247 if (_parent.encrypting()) {
00248 for (int i = 0; i < _waiting.elements(); i++) {
00249 infoton *curr = _waiting[i]->_data;
00250 infoton *processed = _parent.wrap_infoton(curr,
00251 _waiting[i]->_id._entity);
00252 if (processed) _waiting[i]->_data = processed;
00253 }
00254 }
00255
00256 outcome ret = pack_and_ship(_waiting, 0);
00257
00258 if ( (ret != cromp_common::OKAY) && (ret != cromp_common::TIMED_OUT) ) {
00259
00260
00261
00262 LOG(astring("failed to send package back to client: ")
00263 + cromp_common::outcome_name(ret));
00264
00265 any_left = false;
00266 break;
00267 }
00268
00269 if (pending_sends() > SEND_THRESHOLD) {
00270 #ifdef DEBUG_CROMP_SERVER
00271 LOG(astring("over sending threshold on ") + _ent.text_form());
00272 #endif
00273 push_outgoing(SEND_TRIES_ALLOWED);
00274 }
00275
00276 }
00277
00278 push_outgoing(SEND_TRIES_ALLOWED);
00279 if (!spock()->connected()) {
00280 #ifdef DEBUG_CROMP_SERVER
00281 LOG("noticed disconnection of client.");
00282 #endif
00283 _still_connected = false;
00284 }
00285 return any_left;
00286 }
00287
00288 bool get_incoming_data(int &actions) {
00289 FUNCDEF("get_incoming_data");
00290 if (!healthy()) return false;
00291 int first_one = true;
00292 bool saw_something = false;
00293 while (actions++ < MAXIMUM_ACTIONS_PER_CLIENT) {
00294
00295 infoton *item = NIL;
00296 octopus_request_id req_id;
00297 outcome ret = retrieve_and_restore_any(item, req_id,
00298 first_one? DATA_AWAIT_TIMEOUT : 0);
00299 first_one = false;
00300 if (ret == cromp_common::TIMED_OUT) {
00301 actions--;
00302 return false;
00303 } else if (ret != cromp_common::OKAY) {
00304 #ifdef DEBUG_CROMP_SERVER
00305 LOG(astring("got error ") + cromp_common::outcome_name(ret));
00306 #endif
00307 if (ret == cromp_common::NO_CONNECTION) {
00308 #ifdef DEBUG_CROMP_SERVER
00309 LOG("noticed disconnection of client.");
00310 #endif
00311 _still_connected = false;
00312 }
00313 actions--;
00314 return false;
00315 }
00316
00317 saw_something = true;
00318 if (!_fixated) {
00319 if (req_id._entity.blank()) {
00320 LOG(astring("would have assigned ours to blank id! ")
00321 + req_id._entity.mangled_form());
00322 WHACK(item);
00323 continue;
00324 }
00325 #ifdef DEBUG_CROMP_SERVER
00326 LOG(astring("cmd with entity ") + req_id._entity.mangled_form());
00327 #endif
00328 if (_ent.blank()) {
00329
00330 _ent = req_id._entity;
00331 #ifdef DEBUG_CROMP_SERVER
00332 LOG(astring("assigned own entity to ") + _ent.mangled_form());
00333 #endif
00334 } else if (!_fixated && (_ent != req_id._entity) ) {
00335 #ifdef DEBUG_CROMP_SERVER
00336 LOG(astring("fixated on entity of ") + req_id._entity.mangled_form()
00337 + " where we used to have " + _ent.mangled_form());
00338 #endif
00339 _ent = req_id._entity;
00340 _fixated = true;
00341 }
00342 }
00343 #ifdef DEBUG_CROMP_SERVER
00344 else if (_ent != req_id._entity) {
00345
00346 #ifdef DEBUG_CROMP_SERVER
00347 LOG(astring("seeing wrong entity of ") + req_id._entity.mangled_form()
00348 + " when we fixated on " + _ent.mangled_form());
00349 #endif
00350 WHACK(item);
00351 continue;
00352 }
00353 #endif
00354
00355
00356 if (!healthy()) {
00357 WHACK(item);
00358 continue;
00359 }
00360 string_array classif = item->classifier();
00361
00362
00363
00364
00365
00366 ret = _octo->evaluate(item, req_id, _parent.instantaneous());
00367 if (ret != tentacle::OKAY) {
00368 #ifdef DEBUG_CROMP_SERVER
00369 LOG(astring("failed to evaluate the infoton we got: ")
00370 + classif.text_form());
00371 #endif
00372
00373
00374
00375
00376
00377
00378
00380
00381 LOG(astring("injecting unhandled note into response stream for ")
00382 + req_id.text_form() + ", got outcome " + outcome_name(ret));
00383
00384 _parent.send_to_client(req_id,
00385 new unhandled_request(req_id, classif, ret));
00386
00387
00388
00390 }
00391 }
00392 return saw_something;
00393 }
00394
00395 private:
00396 cromp_server &_parent;
00397 octopus *_octo;
00398 octopus_entity _ent;
00399 bool _healthy;
00400 bool _fixated;
00401 cromp_data_grabber _grabber;
00402 infoton_list _waiting;
00403
00404 bool _still_connected;
00405
00406 login_tentacle &_security_arm;
00407 };
00408
00410
00411 void cromp_data_grabber::perform_activity(void *)
00412 {
00413 #ifdef DEBUG_CROMP_SERVER
00414 FUNCDEF("perform_activity");
00415 #endif
00416 while (!should_stop()) {
00417
00418 bool ret = _parent.handle_client_needs(*this);
00419
00420 if (!ret) {
00421
00422 #ifdef DEBUG_CROMP_SERVER
00423 LOG("done handling client needs.");
00424 #endif
00425 _octo->expunge(_parent.ent());
00426 break;
00427 }
00428 }
00429 }
00430
00432
00433 class cromp_client_list : public amorph<cromp_client_record>
00434 {
00435 public:
00436 int find(const octopus_entity &to_find) const {
00437 for (int i = 0; i < elements(); i++)
00438 if (to_find == get(i)->ent()) return i;
00439 return common::NOT_FOUND;
00440 }
00441 };
00442
00444
00445 class client_dropping_thread : public ethread
00446 {
00447 public:
00448 client_dropping_thread (cromp_server &parent)
00449 : ethread(DROPPING_INTERVAL),
00450 _parent(parent) {}
00451
00452 void perform_activity(void *formal(ptr)) {
00453
00454 _parent.drop_dead_clients();
00455 }
00456
00457 private:
00458 cromp_server &_parent;
00459 };
00460
00462
00463 class connection_management_thread : public ethread
00464 {
00465 public:
00466 connection_management_thread(cromp_server &parent)
00467 : ethread(),
00468 _parent(parent) {}
00469
00470 void perform_activity(void *formal(ptr)) {
00471
00472 _parent.look_for_clients(*this);
00473 }
00474
00475 private:
00476 cromp_server &_parent;
00477 };
00478
00480
00481 #undef LOCK_LISTS
00482 #define LOCK_LISTS auto_synchronizer l(*_list_lock)
00483
00484
00485 cromp_server::cromp_server(const internet_address &where,
00486 int accepting_threads, bool instantaneous, int max_per_ent)
00487 : cromp_common(cromp_common::chew_hostname(where), max_per_ent),
00488 _clients(new cromp_client_list),
00489 _accepters(new thread_cabinet),
00490 _list_lock(new mutex),
00491 _next_droppage(new time_stamp(DEAD_CLIENT_CLEANING_INTERVAL)),
00492 _instantaneous(instantaneous),
00493 _where(new internet_address(where)),
00494 _accepting_threads(accepting_threads),
00495 _dropper(new client_dropping_thread(*this)),
00496 _enabled(false),
00497 _encrypt_arm(NIL),
00498 _default_security(new cromp_security),
00499 _security_arm(NIL)
00500 {
00501
00502 }
00503
00504 cromp_server::~cromp_server()
00505 {
00506 disable_servers();
00507 WHACK(_accepters);
00508 WHACK(_dropper);
00509 WHACK(_clients);
00510 WHACK(_next_droppage);
00511 WHACK(_where);
00512 WHACK(_default_security);
00513 WHACK(_list_lock);
00514 _encrypt_arm = NIL;
00515 _security_arm = NIL;
00516 }
00517
00518 internet_address cromp_server::location() const { return *_where; }
00519
00520 bool cromp_server::get_sizes(const octopus_entity &id, int &items, int &bytes)
00521 { return octo()->responses().get_sizes(id, items, bytes); }
00522
00523 internet_address cromp_server::any_address(int port)
00524 {
00525 const abyte any_list[] = { 0, 0, 0, 0 };
00526 return internet_address(byte_array(4, any_list), "", port);
00527 }
00528
00529 astring cromp_server::responses_text_form() const
00530 { return octo()->responses().text_form(); }
00531
00532 int cromp_server::DEFAULT_ACCEPTERS() {
00533
00534
00535 return 7;
00536 }
00537
00538 infoton *cromp_server::wrap_infoton(infoton * &request,
00539 const octopus_entity &ent)
00540 {
00541 FUNCDEF("wrap_infoton");
00542 if (!_enabled) return NIL;
00543
00544
00545
00546
00547
00548 if (dynamic_cast<identity_infoton *>(request)
00549 || dynamic_cast<encryption_infoton *>(request)
00550 || dynamic_cast<encryption_wrapper *>(request)) return NIL;
00551
00552 #ifdef DEBUG_CROMP_SERVER
00553 LOG(astring("encrypting ") + request->text_form());
00554 #endif
00555
00556 octenc_key_record *key = _encrypt_arm->keys().lock(ent);
00557
00558 if (!key) {
00559 LOG(astring("failed to locate key for entity ") + ent.text_form());
00560 return NIL;
00561 }
00562 byte_array packed_request;
00563 infoton::fast_pack(packed_request, *request);
00564 WHACK(request);
00565 encryption_wrapper *to_return = new encryption_wrapper;
00566 key->_key.encrypt(packed_request, to_return->_wrapped);
00567 _encrypt_arm->keys().unlock(key);
00568 return to_return;
00569 }
00570
00571 outcome cromp_server::enable_servers(bool encrypt, cromp_security *security)
00572 {
00573 FUNCDEF("enable_servers");
00574 if (encrypt) {
00575
00576 #ifdef DEBUG_CROMP_SERVER
00577 LOG(astring("enabling encryption for ") + class_name()
00578 + " on " + _where->text_form());
00579 #endif
00580 _encrypt_arm = new encryption_tentacle;
00581 add_tentacle(_encrypt_arm, true);
00582 add_tentacle(new unwrapping_tentacle, false);
00583 }
00584 WHACK(_security_arm);
00585 if (security) {
00586 _security_arm = new login_tentacle(*security);
00587 add_tentacle(_security_arm, true);
00588 } else {
00589 _security_arm = new login_tentacle(*_default_security);
00590 add_tentacle(_security_arm, true);
00591 }
00592 open_common(*_where);
00593
00594 _enabled = true;
00595
00596 outcome to_return = accept_one_client(false);
00597 if ( (to_return != common::NOT_FOUND) && (to_return != common::OKAY) ) {
00598 LOG(astring("failure starting up server: ") + outcome_name(to_return));
00599 return to_return;
00600 }
00601
00602 #ifdef DEBUG_CROMP_SERVER
00603 LOG(a_sprintf("adding %d accepting threads.", _accepting_threads));
00604 #endif
00605 for (int i = 0; i < _accepting_threads; i++) {
00606
00607 _accepters->add_thread(new connection_management_thread(*this), true, NIL);
00608 }
00609
00610 _dropper->start(NIL);
00611 return OKAY;
00612 }
00613
00614 void cromp_server::disable_servers()
00615 {
00616
00617 if (!_enabled) return;
00618 _dropper->stop();
00619 _accepters->stop_all();
00620 if (_clients) {
00621 LOCK_LISTS;
00622
00623
00624 for (int i = 0; i < _clients->elements(); i++) {
00625
00626 cromp_client_record *cli = (*_clients)[i];
00627 if (cli) cli->croak();
00628 }
00629 }
00630
00631 close_common();
00632
00633
00634
00635
00636
00637
00638 WHACK(_clients);
00639
00640 _enabled = false;
00641 }
00642
00643 int cromp_server::clients() const
00644 {
00645 LOCK_LISTS;
00646 return _clients? _clients->elements() : 0;
00647 }
00648
00649 bool cromp_server::disconnect_entity(const octopus_entity &id)
00650 {
00651
00652 if (!_enabled) return false;
00653 LOCK_LISTS;
00654 int indy = _clients->find(id);
00655 if (negative(indy)) return false;
00656 cromp_client_record *cli = (*_clients)[indy];
00657
00658 cli->croak();
00659 return true;
00660 }
00661
00662 bool cromp_server::find_entity(const octopus_entity &id,
00663 internet_address &found)
00664 {
00665
00666 if (!_enabled) return false;
00667 found = internet_address();
00668 LOCK_LISTS;
00669 int indy = _clients->find(id);
00670 if (negative(indy)) return false;
00671 cromp_client_record *cli = (*_clients)[indy];
00672
00673 found = cli->spock()->remote();
00674 return true;
00675 }
00676
00677 outcome cromp_server::accept_one_client(bool wait)
00678 {
00679 #ifdef DEBUG_CROMP_SERVER
00680 FUNCDEF("accept_one_client");
00681 #endif
00682 if (!_enabled) return common::INCOMPLETE;
00683 spocket *accepted = NIL;
00684
00685 outcome ret = spock()->accept(accepted, wait);
00686
00687
00688 if ( (ret == spocket::OKAY) && accepted) {
00689
00690 cromp_client_record *adding = new cromp_client_record(*this, accepted,
00691 octo(), *_security_arm);
00692 #ifdef DEBUG_CROMP_SERVER
00693 LOG(a_sprintf("found a new client on sock %d.", accepted->OS_socket()));
00694 #endif
00695 LOCK_LISTS;
00696 _clients->append(adding);
00697 return OKAY;
00698 } else {
00699 if (ret == spocket::NO_CONNECTION)
00700 return NOT_FOUND;
00701 #ifdef DEBUG_CROMP_SERVER
00702 LOG(astring("error accepting client: ") + spocket::outcome_name(ret));
00703 #endif
00704 return DISALLOWED;
00705 }
00706 }
00707
00708 void cromp_server::look_for_clients(ethread &requestor)
00709 {
00710 FUNCDEF("look_for_clients");
00711 if (!_enabled) return;
00712
00713 while (!requestor.should_stop()) {
00714 outcome ret = accept_one_client(false);
00715 if ( (ret != OKAY) && (ret != NOT_FOUND) ) {
00716
00717
00718 LOG(astring("got real error on socket; leaving for good.")
00719 + spocket::outcome_name(ret));
00720
00721 break;
00722 }
00723
00724
00725 if (ret != OKAY)
00726 time_control::sleep_ms(ACCEPTANCE_SNOOZE);
00727 }
00728 }
00729
00730 outcome cromp_server::send_to_client(const octopus_request_id &id,
00731 infoton *data)
00732 {
00733 #ifdef DEBUG_CROMP_SERVER
00734 FUNCDEF("send_to_client");
00735 #endif
00736 if (!_enabled) return common::INCOMPLETE;
00737 if (!octo()->responses().add_item(data, id)) {
00738 #ifdef DEBUG_CROMP_SERVER
00739 LOG("failed to store result for client--no space left currently.");
00740 #endif
00741 return TOO_FULL;
00742 }
00743 return OKAY;
00744 }
00745
00746
00747
00748
00749
00750
00751
00752
00753
00754
00755
00756
00757
00758
00759
00760
00761 outcome cromp_server::get_from_client(const octopus_request_id &id,
00762 infoton * &data, int timeout)
00763 {
00764
00765 if (!_enabled) return common::INCOMPLETE;
00766
00767
00768 LOCK_LISTS;
00769 int indy = _clients->find(id._entity);
00770 if (negative(indy)) return NOT_FOUND;
00771 cromp_client_record *cli = (*_clients)[indy];
00772 return cli->retrieve_and_restore(data, id, timeout);
00773 }
00774
00775 void cromp_server::drop_dead_clients()
00776 {
00777 #ifdef DEBUG_CROMP_SERVER
00778 FUNCDEF("drop_dead_clients");
00779 #endif
00780 if (!_enabled) return;
00781
00782
00783 {
00784 LOCK_LISTS;
00785 if (time_stamp() < *_next_droppage) return;
00786 }
00787
00788 LOCK_LISTS;
00789 for (int i = 0; i < _clients->elements(); i++) {
00790 cromp_client_record *cli = (*_clients)[i];
00791 if (!cli) {
00792 #ifdef DEBUG_CROMP_SERVER
00793 LOG(astring("error in list structure."));
00794 #endif
00795 _clients->zap(i, i);
00796 i--;
00797 continue;
00798 }
00799 if (!cli->still_connected() || !cli->healthy()) {
00800 #ifdef DEBUG_CROMP_SERVER
00801 LOG(astring("dropping disconnected client ") + cli->ent().mangled_form());
00802 #endif
00803 cli->croak();
00804
00805
00806 _clients->zap(i, i);
00807 i--;
00808 continue;
00809 }
00810 }
00811
00812 _next_droppage->reset(DEAD_CLIENT_CLEANING_INTERVAL);
00813 }
00814
00815 }
00816