00001 #ifndef CROMP_SERVER_IMPLEMENTATION_FILE
00002 #define CROMP_SERVER_IMPLEMENTATION_FILE
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018 #include "cromp_common.h"
00019 #include "cromp_security.h"
00020 #include "cromp_server.h"
00021
00022 #include <basis/function.h>
00023 #include <basis/istring.h>
00024 #include <basis/log_base.h>
00025 #include <basis/portable.h>
00026 #include <basis/mutex.h>
00027 #include <data_struct/amorph.h>
00028 #include <data_struct/unique_id.h>
00029 #include <mechanisms/ithread.h>
00030 #include <mechanisms/thread_cabinet.h>
00031 #include <octopus/entity_data_bin.h>
00032 #include <octopus/entity_defs.h>
00033 #include <octopus/identity_infoton.h>
00034 #include <octopus/infoton.h>
00035 #include <octopus/tentacle.h>
00036 #include <octopus/unhandled_request.h>
00037 #include <sockets/address.h>
00038 #include <sockets/tcpip_stack.h>
00039 #include <sockets/spocket.h>
00040 #include <tentacles/key_repository.h>
00041 #include <tentacles/login_tentacle.h>
00042
00043 #ifndef OMIT_CRYPTO_SUPPORT
00044 #include <tentacles/encryption_tentacle.h>
00045 #include <tentacles/encryption_wrapper.h>
00046 #endif
00047
00048
00049
00050
00051 const int DEAD_CLIENT_CLEANING_INTERVAL = 1 * SECOND_ms;
00052
00053
00054 const int MAXIMUM_ACTIONS_PER_CLIENT = 4000;
00055
00056
00057
00058 const int SEND_TRIES_ALLOWED = 1;
00059
00060
00061 const int SEND_THRESHOLD = 512 * KILOBYTE;
00062
00063
00064
00065 const int EXTREME_SEND_TRIES_ALLOWED = 28;
00066
00067
00068 const int MAXIMUM_BYTES_PER_SEND = 2 * MEGABYTE;
00069
00070
00071 const int MAXIMUM_SIZE_BATCH = 384 * KILOBYTE;
00072
00073
00074 const int DROPPING_INTERVAL = 500;
00075
00076
00077 const int DATA_AWAIT_TIMEOUT = 14;
00078
00079
00080 const int ACCEPTANCE_SNOOZE = 60;
00081
00082
00083 #undef LOG
00084 #define LOG(to_print) CLASS_EMERGENCY_LOG(program_wide_logger(), to_print)
00085
00087
00088
00089 class cromp_client_record;
00090
00091 class cromp_data_grabber : public ithread
00092 {
00093 public:
00094 cromp_data_grabber(cromp_client_record &parent, octopus *octo)
00095 : ithread(), _parent(parent), _octo(octo) {}
00096
00097 IMPLEMENT_CLASS_NAME("cromp_data_grabber");
00098
00099 virtual void perform_activity(void *);
00100
00101 private:
00102 cromp_client_record &_parent;
00103 octopus *_octo;
00104 };
00105
00107
00108 class cromp_client_record : public cromp_common
00109 {
00110 public:
00111 cromp_client_record(cromp_server &parent, spocket *client, octopus *octo,
00112 login_tentacle &security)
00113 : cromp_common(client, octo),
00114 _parent(parent),
00115 _octo(octo),
00116 _ent(),
00117 _healthy(true),
00118 _fixated(false),
00119 _grabber(*this, octo),
00120 _waiting(),
00121 _still_connected(true),
00122 _security_arm(security)
00123 {
00124 internet_address local_addr = internet_address
00125 (internet_address::localhost(), client->stack().hostname(), 0);
00126 open_common(local_addr);
00127 _grabber.start(NIL);
00128 }
00129
00130 ~cromp_client_record() {
00131 croak();
00132 }
00133
00134 IMPLEMENT_CLASS_NAME("cromp_client_record");
00135
00136 bool handle_client_needs(ithread &prompter) {
00137 FUNCDEF("handle_client_needs");
00138 #ifdef DEBUG_CROMP_SERVER
00139 time_stamp start;
00140 #endif
00141 if (!_healthy) return false;
00142 if (!spock()->connected()) {
00143 _still_connected = false;
00144 return false;
00145 }
00146 bool keep_going = true;
00147 int actions = 0;
00148 while (keep_going && (actions < MAXIMUM_ACTIONS_PER_CLIENT) ) {
00149
00150
00151 if (prompter.should_stop()) return false;
00152 keep_going = false;
00153 bool ret = get_incoming_data(actions);
00154 if (ret) keep_going = true;
00155 ret = push_client_replies(actions);
00156 if (ret) keep_going = true;
00157 }
00158
00159
00160 #ifdef DEBUG_CROMP_SERVER
00161 if (actions > 10) {
00162 LOG(isprintf("actions=%d", actions));
00163 LOG(isprintf("%d pending send bytes, %d bytes accumulated, bin has "
00164 "%d items.", pending_sends(), accumulated_bytes(),
00165 octo()->responses().items_held()));
00166 }
00167
00168 int duration = int(time_stamp().value() - start.value());
00169 if (duration > 200) {
00170 LOG(isprintf("duration=%d ms.", duration));
00171 }
00172 #endif
00173
00174 return true;
00175 }
00176
00177 const octopus_entity &ent() const { return _ent; }
00178
00179
00180
00181 void croak() {
00182 FUNCDEF("croak");
00183 _grabber.stop();
00184 int actions = 0;
00185 while (get_incoming_data(actions)) {
00186
00187
00188 }
00189 _healthy = false;
00190
00191 _security_arm.expunge(_ent);
00192 close_common();
00193 }
00194
00195 bool healthy() const { return _healthy; }
00196
00197
00198 bool still_connected() const { return _still_connected; }
00199
00200
00201 cromp_server &parent() const { return _parent; }
00202
00203 bool push_client_replies(int &actions) {
00204 FUNCDEF("push_client_replies");
00205 if (!healthy()) return false;
00206 if (ent().blank()) {
00207
00208 #ifdef DEBUG_CROMP_SERVER
00209 LOG("not pushing replies for blank.");
00210 #endif
00211 return false;
00212 }
00213
00214 if (buffer_clog(MAXIMUM_BYTES_PER_SEND)) {
00215 LOG("buffer clog being cleared now.");
00216
00217 push_outgoing(EXTREME_SEND_TRIES_ALLOWED);
00218
00219 if (buffer_clog(MAXIMUM_BYTES_PER_SEND)) {
00220 LOG("could not completely clear buffer clog.");
00221 return true;
00222 }
00223 LOG("cleared out buffer clog.");
00224 }
00225
00226 int any_left = true;
00227 while (actions++ < MAXIMUM_ACTIONS_PER_CLIENT) {
00228
00229 if (!_octo->responses().items_held()) {
00230 any_left = false;
00231 break;
00232 }
00233
00234 grab_anything(false);
00235
00236 int num_located = _octo->responses().acquire_for_entity(ent(),
00237 _waiting, MAXIMUM_SIZE_BATCH);
00238 if (!num_located) {
00239 any_left = false;
00240 break;
00241 }
00242
00243 #ifndef OMIT_CRYPTO_SUPPORT
00244
00245 if (_parent.encrypting()) {
00246 for (int i = 0; i < _waiting.elements(); i++) {
00247 infoton *curr = _waiting[i]->_data;
00248 infoton *processed = _parent.wrap_infoton(curr,
00249 _waiting[i]->_id._entity);
00250 if (processed) _waiting[i]->_data = processed;
00251 }
00252 }
00253 #endif
00254
00255 outcome ret = pack_and_ship(_waiting, 0);
00256
00257 if ( (ret != cromp_common::OKAY) && (ret != cromp_common::TIMED_OUT) ) {
00258
00259
00260
00261 LOG(istring("failed to send package back to client: ")
00262 + cromp_common::outcome_name(ret));
00263
00264 any_left = false;
00265 break;
00266 }
00267
00268 if (pending_sends() > SEND_THRESHOLD) {
00269 #ifdef DEBUG_CROMP_SERVER
00270 LOG(istring("over sending threshold on ") + _ent.text_form());
00271 #endif
00272 push_outgoing(SEND_TRIES_ALLOWED);
00273 }
00274
00275 }
00276
00277 push_outgoing(SEND_TRIES_ALLOWED);
00278 if (!spock()->connected()) {
00279 #ifdef DEBUG_CROMP_SERVER
00280 LOG("noticed disconnection of client.");
00281 #endif
00282 _still_connected = false;
00283 }
00284 return any_left;
00285 }
00286
00287 bool get_incoming_data(int &actions) {
00288 FUNCDEF("get_incoming_data");
00289 if (!healthy()) return false;
00290 int first_one = true;
00291 bool saw_something = false;
00292 while (actions++ < MAXIMUM_ACTIONS_PER_CLIENT) {
00293
00294 infoton *item = NIL;
00295 octopus_request_id req_id;
00296 outcome ret = retrieve_and_restore_any(item, req_id,
00297 first_one? DATA_AWAIT_TIMEOUT : 0);
00298 first_one = false;
00299 if (ret == cromp_common::TIMED_OUT) {
00300 actions--;
00301 return false;
00302 } else if (ret != cromp_common::OKAY) {
00303 #ifdef DEBUG_CROMP_SERVER
00304 LOG(istring("got error ") + cromp_common::outcome_name(ret));
00305 #endif
00306 if (ret == cromp_common::NO_CONNECTION) {
00307 #ifdef DEBUG_CROMP_SERVER
00308 LOG("noticed disconnection of client.");
00309 #endif
00310 _still_connected = false;
00311 }
00312 actions--;
00313 return false;
00314 }
00315
00316 saw_something = true;
00317 if (!_fixated) {
00318 if (req_id._entity.blank()) {
00319 LOG(istring("would have assigned ours to blank id! ")
00320 + req_id._entity.mangled_form());
00321 WHACK(item);
00322 continue;
00323 }
00324 #ifdef DEBUG_CROMP_SERVER
00325 LOG(istring("cmd with entity ") + req_id._entity.mangled_form());
00326 #endif
00327 if (_ent.blank()) {
00328
00329 _ent = req_id._entity;
00330 #ifdef DEBUG_CROMP_SERVER
00331 LOG(istring("assigned own entity to ") + _ent.mangled_form());
00332 #endif
00333 } else if (!_fixated && (_ent != req_id._entity) ) {
00334 #ifdef DEBUG_CROMP_SERVER
00335 LOG(istring("fixated on entity of ") + req_id._entity.mangled_form()
00336 + " where we used to have " + _ent.mangled_form());
00337 #endif
00338 _ent = req_id._entity;
00339 _fixated = true;
00340 }
00341 }
00342 #ifdef DEBUG_CROMP_SERVER
00343 else if (_ent != req_id._entity) {
00344
00345 #ifdef DEBUG_CROMP_SERVER
00346 LOG(istring("seeing wrong entity of ") + req_id._entity.mangled_form()
00347 + " when we fixated on " + _ent.mangled_form());
00348 #endif
00349 WHACK(item);
00350 continue;
00351 }
00352 #endif
00353
00354
00355 if (!healthy()) {
00356 WHACK(item);
00357 continue;
00358 }
00359 string_array classif = item->classifier();
00360
00361
00362
00363
00364
00365 ret = _octo->evaluate(item, req_id, _parent.instantaneous());
00366 if (ret != tentacle::OKAY) {
00367 #ifdef DEBUG_CROMP_SERVER
00368 LOG(istring("failed to evaluate the infoton we got: ")
00369 + classif.text_form());
00370 #endif
00371
00372
00373
00374
00375
00376
00377
00379
00380 LOG(istring("injecting unhandled note into response stream for ")
00381 + req_id.text_form() + ", got outcome " + outcome_name(ret));
00382
00383 _parent.send_to_client(req_id,
00384 new unhandled_request(req_id, classif, ret));
00385
00386
00387
00389 }
00390 }
00391 return saw_something;
00392 }
00393
00394 private:
00395 cromp_server &_parent;
00396 octopus *_octo;
00397 octopus_entity _ent;
00398 bool _healthy;
00399 bool _fixated;
00400 cromp_data_grabber _grabber;
00401 infoton_list _waiting;
00402
00403 bool _still_connected;
00404
00405 login_tentacle &_security_arm;
00406 };
00407
00409
00410 void cromp_data_grabber::perform_activity(void *)
00411 {
00412 FUNCDEF("perform_activity");
00413 while (!should_stop()) {
00414
00415 bool ret = _parent.handle_client_needs(*this);
00416
00417 if (!ret) {
00418
00419 #ifdef DEBUG_CROMP_SERVER
00420 LOG("done handling client needs.");
00421 #endif
00422 _octo->expunge(_parent.ent());
00423 break;
00424 }
00425 }
00426 }
00427
00429
00430 class cromp_client_list : public amorph<cromp_client_record>
00431 {
00432 public:
00433 int find(const octopus_entity &to_find) const {
00434 for (int i = 0; i < elements(); i++)
00435 if (to_find == get(i)->ent()) return i;
00436 return common::NOT_FOUND;
00437 }
00438 };
00439
00441
00442 class client_dropping_thread : public ithread
00443 {
00444 public:
00445 client_dropping_thread (cromp_server &parent)
00446 : ithread(DROPPING_INTERVAL),
00447 _parent(parent) {}
00448
00449 void perform_activity(void *formal(ptr)) {
00450 FUNCDEF("perform_activity");
00451 _parent.drop_dead_clients();
00452 }
00453
00454 private:
00455 cromp_server &_parent;
00456 };
00457
00459
00460 class connection_management_thread : public ithread
00461 {
00462 public:
00463 connection_management_thread(cromp_server &parent)
00464 : ithread(),
00465 _parent(parent) {}
00466
00467 void perform_activity(void *formal(ptr)) {
00468 FUNCDEF("perform_activity");
00469 _parent.look_for_clients(*this);
00470 }
00471
00472 private:
00473 cromp_server &_parent;
00474 };
00475
00477
00478 #undef LOCK_LISTS
00479 #define LOCK_LISTS auto_synchronizer l(*_list_lock)
00480
00481
00482 cromp_server::cromp_server(const internet_address &where,
00483 int accepting_threads, bool instantaneous, int max_per_ent)
00484 : cromp_common(cromp_common::chew_hostname(where), max_per_ent),
00485 _clients(new cromp_client_list),
00486 _accepters(new thread_cabinet),
00487 _list_lock(new mutex),
00488 _next_droppage(new time_stamp(DEAD_CLIENT_CLEANING_INTERVAL)),
00489 _instantaneous(instantaneous),
00490 _where(new internet_address(where)),
00491 _accepting_threads(accepting_threads),
00492 _dropper(new client_dropping_thread(*this)),
00493 _enabled(false),
00494 _encrypt_arm(NIL),
00495 _default_security(new cromp_security),
00496 _security_arm(NIL)
00497 {
00498 FUNCDEF("constructor");
00499 }
00500
00501 cromp_server::~cromp_server()
00502 {
00503 disable_servers();
00504 WHACK(_accepters);
00505 WHACK(_dropper);
00506 WHACK(_clients);
00507 WHACK(_next_droppage);
00508 WHACK(_where);
00509 WHACK(_default_security);
00510 WHACK(_list_lock);
00511 _encrypt_arm = NIL;
00512 _security_arm = NIL;
00513 }
00514
00515 internet_address cromp_server::location() const { return *_where; }
00516
00517 bool cromp_server::get_sizes(const octopus_entity &id, int &items, int &bytes)
00518 { return octo()->responses().get_sizes(id, items, bytes); }
00519
00520 internet_address cromp_server::any_address(int port)
00521 {
00522 const byte any_list[] = { 0, 0, 0, 0 };
00523 return internet_address(byte_array(4, any_list), "", port);
00524 }
00525
00526 istring cromp_server::responses_text_form() const
00527 { return octo()->responses().text_form(); }
00528
00529 int cromp_server::DEFAULT_ACCEPTERS() {
00530
00531
00532 #ifdef EMBEDDED_BUILD
00533 return 1;
00534 #else
00535 return 7;
00536 #endif
00537 }
00538
00539 #ifndef OMIT_CRYPTO_SUPPORT
00540 infoton *cromp_server::wrap_infoton(infoton * &request,
00541 const octopus_entity &ent)
00542 {
00543 FUNCDEF("wrap_infoton");
00544 if (!_enabled) return NIL;
00545
00546
00547
00548
00549
00550 if (dynamic_cast<identity_infoton *>(request)
00551 || dynamic_cast<encryption_infoton *>(request)
00552 || dynamic_cast<encryption_wrapper *>(request)) return NIL;
00553
00554 #ifdef DEBUG_CROMP_SERVER
00555 LOG(istring("encrypting ") + request->text_form());
00556 #endif
00557
00558 octenc_key_record *key = _encrypt_arm->keys().lock(ent);
00559
00560 if (!key) {
00561 LOG(istring("failed to locate key for entity ") + ent.text_form());
00562 return NIL;
00563 }
00564 byte_array packed_request;
00565 infoton::fast_pack(packed_request, *request);
00566 WHACK(request);
00567 encryption_wrapper *to_return = new encryption_wrapper;
00568 key->_key.encrypt(packed_request, to_return->_wrapped);
00569 _encrypt_arm->keys().unlock(key);
00570 return to_return;
00571 }
00572 #endif
00573
00574 outcome cromp_server::enable_servers(bool encrypt, cromp_security *security)
00575 {
00576 FUNCDEF("enable_servers");
00577 if (encrypt) {
00578 #ifndef OMIT_CRYPTO_SUPPORT
00579
00580 #ifdef DEBUG_CROMP_SERVER
00581 LOG(istring("enabling encryption for ") + class_name()
00582 + " on " + _where->text_form());
00583 #endif
00584 _encrypt_arm = new encryption_tentacle;
00585 add_tentacle(_encrypt_arm, true);
00586 add_tentacle(new unwrapping_tentacle, false);
00587 #endif
00588 }
00589 WHACK(_security_arm);
00590 if (security) {
00591 _security_arm = new login_tentacle(*security);
00592 add_tentacle(_security_arm, true);
00593 } else {
00594 _security_arm = new login_tentacle(*_default_security);
00595 add_tentacle(_security_arm, true);
00596 }
00597 open_common(*_where);
00598
00599 _enabled = true;
00600 outcome to_return = accept_one_client(false);
00601 if (to_return != common::NOT_FOUND) {
00602 LOG(istring("failure starting up server: ") + outcome_name(to_return));
00603 return to_return;
00604 }
00605
00606 #ifdef DEBUG_CROMP_SERVER
00607 LOG(isprintf("adding %d accepting threads.", _accepting_threads));
00608 #endif
00609 for (int i = 0; i < _accepting_threads; i++) {
00610
00611 _accepters->add_thread(new connection_management_thread(*this), true, NIL);
00612 }
00613
00614 _dropper->start(NIL);
00615 return OKAY;
00616 }
00617
00618 void cromp_server::disable_servers()
00619 {
00620 FUNCDEF("disable_servers");
00621 if (!_enabled) return;
00622 _dropper->stop();
00623 _accepters->stop_all();
00624 if (_clients) {
00625 LOCK_LISTS;
00626
00627
00628 for (int i = 0; i < _clients->elements(); i++) {
00629
00630 cromp_client_record *cli = (*_clients)[i];
00631 if (cli) cli->croak();
00632 }
00633 }
00634
00635 close_common();
00636
00637
00638
00639
00640
00641
00642 WHACK(_clients);
00643
00644 _enabled = false;
00645 }
00646
00647 int cromp_server::clients() const
00648 {
00649 LOCK_LISTS;
00650 return _clients? _clients->elements() : 0;
00651 }
00652
00653 bool cromp_server::disconnect_entity(const octopus_entity &id)
00654 {
00655 FUNCDEF("disconnect_entity");
00656 if (!_enabled) return false;
00657 LOCK_LISTS;
00658 int indy = _clients->find(id);
00659 if (negative(indy)) return false;
00660 cromp_client_record *cli = (*_clients)[indy];
00661
00662 cli->croak();
00663 return true;
00664 }
00665
00666 bool cromp_server::find_entity(const octopus_entity &id,
00667 internet_address &found)
00668 {
00669 FUNCDEF("find_entity");
00670 if (!_enabled) return false;
00671 found = internet_address();
00672 LOCK_LISTS;
00673 int indy = _clients->find(id);
00674 if (negative(indy)) return false;
00675 cromp_client_record *cli = (*_clients)[indy];
00676
00677 found = cli->spock()->remote();
00678 return true;
00679 }
00680
00681 outcome cromp_server::accept_one_client(bool wait)
00682 {
00683 FUNCDEF("accept_one_client");
00684 if (!_enabled) return common::INCOMPLETE;
00685 spocket *accepted = NIL;
00686
00687 outcome ret = spock()->accept(accepted, wait);
00688
00689
00690 if ( (ret == spocket::OKAY) && accepted) {
00691
00692 cromp_client_record *adding = new cromp_client_record(*this, accepted,
00693 octo(), *_security_arm);
00694 #ifdef DEBUG_CROMP_SERVER
00695 LOG(isprintf("found a new client on sock %d.", accepted->OS_socket()));
00696 #endif
00697 LOCK_LISTS;
00698 _clients->append(adding);
00699 return OKAY;
00700 } else {
00701 if (ret == spocket::NO_CONNECTION)
00702 return NOT_FOUND;
00703 #ifdef DEBUG_CROMP_SERVER
00704 LOG(istring("error accepting client: ") + spocket::outcome_name(ret));
00705 #endif
00706 return DISALLOWED;
00707 }
00708 }
00709
00710 void cromp_server::look_for_clients(ithread &requestor)
00711 {
00712 FUNCDEF("look_for_clients");
00713 if (!_enabled) return;
00714
00715 while (!requestor.should_stop()) {
00716 outcome ret = accept_one_client(false);
00717 if ( (ret != OKAY) && (ret != NOT_FOUND) ) {
00718
00719
00720 LOG(istring("got real error on socket; leaving for good.")
00721 + spocket::outcome_name(ret));
00722
00723 break;
00724 }
00725
00726
00727 if (ret != OKAY)
00728 portable::sleep_ms(ACCEPTANCE_SNOOZE);
00729 }
00730 }
00731
00732 outcome cromp_server::send_to_client(const octopus_request_id &id,
00733 infoton *data)
00734 {
00735 FUNCDEF("send_to_client");
00736 if (!_enabled) return common::INCOMPLETE;
00737 if (!octo()->responses().add_item(data, id)) {
00738 #ifdef DEBUG_CROMP_SERVER
00739 LOG("failed to store result for client--no space left currently.");
00740 #endif
00741 return TOO_FULL;
00742 }
00743 return OKAY;
00744 }
00745
00746
00747
00748
00749
00750
00751
00752
00753
00754
00755
00756
00757
00758
00759
00760
00761 outcome cromp_server::get_from_client(const octopus_request_id &id,
00762 infoton * &data, int timeout)
00763 {
00764 FUNCDEF("get_from_client");
00765 if (!_enabled) return common::INCOMPLETE;
00766
00767
00768 LOCK_LISTS;
00769 int indy = _clients->find(id._entity);
00770 if (negative(indy)) return NOT_FOUND;
00771 cromp_client_record *cli = (*_clients)[indy];
00772 return cli->retrieve_and_restore(data, id, timeout);
00773 }
00774
00775 void cromp_server::drop_dead_clients()
00776 {
00777 FUNCDEF("drop_dead_clients");
00778 if (!_enabled) return;
00779
00780
00781 {
00782 LOCK_LISTS;
00783 if (time_stamp() < *_next_droppage) return;
00784 }
00785
00786 LOCK_LISTS;
00787 for (int i = 0; i < _clients->elements(); i++) {
00788 cromp_client_record *cli = (*_clients)[i];
00789 if (!cli) {
00790 #ifdef DEBUG_CROMP_SERVER
00791 LOG(istring("error in list structure."));
00792 #endif
00793 _clients->zap(i, i);
00794 i--;
00795 continue;
00796 }
00797 if (!cli->still_connected() || !cli->healthy()) {
00798 #ifdef DEBUG_CROMP_SERVER
00799 LOG(istring("dropping disconnected client ") + cli->ent().mangled_form());
00800 #endif
00801 cli->croak();
00802
00803
00804 _clients->zap(i, i);
00805 i--;
00806 continue;
00807 }
00808 }
00809
00810 _next_droppage->reset(DEAD_CLIENT_CLEANING_INTERVAL);
00811 }
00812
00813
00814 #endif //CROMP_SERVER_IMPLEMENTATION_FILE
00815