00001 #ifndef CROMP_SERVER_IMPLEMENTATION_FILE
00002 #define CROMP_SERVER_IMPLEMENTATION_FILE
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018 #include "cromp_common.h"
00019 #include "cromp_security.h"
00020 #include "cromp_server.h"
00021
00022 #include <basis/function.h>
00023 #include <basis/istring.h>
00024 #include <basis/log_base.h>
00025 #include <basis/portable.h>
00026 #include <basis/mutex.h>
00027 #include <data_struct/amorph.cpp>
00028 #include <data_struct/unique_id.h>
00029 #include <mechanisms/ithread.h>
00030 #include <mechanisms/thread_cabinet.h>
00031 #include <octopus/entity_data_bin.h>
00032 #include <octopus/entity_defs.h>
00033 #include <octopus/identity_infoton.h>
00034 #include <octopus/infoton.h>
00035 #include <octopus/tentacle.h>
00036 #include <octopus/unhandled_request.h>
00037 #include <sockets/address.h>
00038 #include <sockets/tcpip_stack.h>
00039 #include <sockets/spocket.h>
00040 #include <tentacles/key_repository.h>
00041 #include <tentacles/login_tentacle.h>
00042
00043 #ifndef OMIT_CRYPTO_SUPPORT
00044 #include <tentacles/encryption_tentacle.h>
00045 #include <tentacles/encryption_wrapper.h>
00046 #endif
00047
00048
00049
00050
00051 const int DEAD_CLIENT_CLEANING_INTERVAL = 1 * SECOND_ms;
00052
00053
00054 const int MAXIMUM_ACTIONS_PER_CLIENT = 4000;
00055
00056
00057
00058 const int SEND_TRIES_ALLOWED = 1;
00059
00060
00061 const int SEND_THRESHOLD = 512 * KILOBYTE;
00062
00063
00064
00065 const int EXTREME_SEND_TRIES_ALLOWED = 28;
00066
00067
00068 const int MAXIMUM_BYTES_PER_SEND = 2 * MEGABYTE;
00069
00070
00071 const int MAXIMUM_SIZE_BATCH = 384 * KILOBYTE;
00072
00073
00074 const int DROPPING_INTERVAL = 500;
00075
00076
00077 const int DATA_AWAIT_TIMEOUT = 14;
00078
00079
00080 const int ACCEPTANCE_SNOOZE = 60;
00081
00082
00083 #undef LOG
00084 #define LOG(to_print) CLASS_EMERGENCY_LOG(program_wide_logger(), to_print)
00085
00087
00088
00089 class cromp_client_record;
00090
00091 class cromp_data_grabber : public ithread
00092 {
00093 public:
00094 cromp_data_grabber(cromp_client_record &parent, octopus *octo)
00095 : ithread(), _parent(parent), _octo(octo) {}
00096
00097 IMPLEMENT_CLASS_NAME("cromp_data_grabber");
00098
00099 virtual void perform_activity(void *);
00100
00101 private:
00102 cromp_client_record &_parent;
00103 octopus *_octo;
00104 };
00105
00107
00108 class cromp_client_record : public cromp_common
00109 {
00110 public:
00111 cromp_client_record(cromp_server &parent, spocket *client, octopus *octo)
00112 : cromp_common(client, octo),
00113 _parent(parent),
00114 _octo(octo),
00115 _ent(),
00116 _healthy(true),
00117 _fixated(false),
00118 _grabber(*this, octo),
00119 _waiting(),
00120 _still_connected(true)
00121 {
00122 internet_address local_addr = internet_address
00123 (internet_address::localhost(), client->stack().hostname(), 0);
00124 open_common(local_addr);
00125 _grabber.start(NIL);
00126 }
00127
00128 ~cromp_client_record() {
00129 croak();
00130 }
00131
00132 IMPLEMENT_CLASS_NAME("cromp_client_record");
00133
00134 bool handle_client_needs(ithread &prompter) {
00135 FUNCDEF("handle_client_needs");
00136 #ifdef DEBUG_CROMP_SERVER
00137 time_stamp start;
00138 #endif
00139 if (!_healthy) return false;
00140 if (!spock()->connected()) {
00141 _still_connected = false;
00142 return false;
00143 }
00144 bool keep_going = true;
00145 int actions = 0;
00146 while (keep_going && (actions < MAXIMUM_ACTIONS_PER_CLIENT) ) {
00147
00148
00149 if (prompter.should_stop()) return false;
00150 keep_going = false;
00151 bool ret = get_incoming_data(actions);
00152 if (ret) keep_going = true;
00153 ret = push_client_replies(actions);
00154 if (ret) keep_going = true;
00155 }
00156
00157
00158 #ifdef DEBUG_CROMP_SERVER
00159 if (actions > 10) {
00160 LOG(isprintf("actions=%d", actions));
00161 LOG(isprintf("%d pending send bytes, %d bytes accumulated, bin has "
00162 "%d items.", pending_sends(), accumulated_bytes(),
00163 octo()->responses().items_held()));
00164
00165 int duration = int(time_stamp().value() - start.value());
00166 if (duration > 200) {
00167 LOG(isprintf("duration=%d ms.", duration));
00168 }
00169 #endif
00170
00171 return true;
00172 }
00173
00174 const octopus_entity &ent() const { return _ent; }
00175
00176
00177
00178 void croak() {
00179 FUNCDEF("croak");
00180 _grabber.stop();
00181 int actions = 0;
00182 while (get_incoming_data(actions)) {
00183
00184
00185 }
00186 _healthy = false;
00187 close_common();
00188 }
00189
00190 bool healthy() const { return _healthy; }
00191
00192
00193 bool still_connected() const { return _still_connected; }
00194
00195
00196 cromp_server &parent() const { return _parent; }
00197
00198 bool push_client_replies(int &actions) {
00199 FUNCDEF("push_client_replies");
00200 if (!healthy()) return false;
00201 if (ent().blank()) {
00202
00203 #ifdef DEBUG_CROMP_SERVER
00204 LOG("not pushing replies for blank.");
00205 #endif
00206 return false;
00207 }
00208
00209 if (buffer_clog(MAXIMUM_BYTES_PER_SEND)) {
00210 LOG("buffer clog being cleared now.");
00211
00212 push_outgoing(EXTREME_SEND_TRIES_ALLOWED);
00213
00214 if (buffer_clog(MAXIMUM_BYTES_PER_SEND)) {
00215 LOG("could not completely clear buffer clog.");
00216 return true;
00217 }
00218 LOG("cleared out buffer clog.");
00219 }
00220
00221 int any_left = true;
00222 while (actions++ < MAXIMUM_ACTIONS_PER_CLIENT) {
00223
00224 if (!_octo->responses().items_held()) {
00225 any_left = false;
00226 break;
00227 }
00228
00229 grab_anything(false);
00230
00231 int num_located = _octo->responses().acquire_for_entity(ent(),
00232 _waiting, MAXIMUM_SIZE_BATCH);
00233 if (!num_located) {
00234 any_left = false;
00235 break;
00236 }
00237
00238 #ifndef OMIT_CRYPTO_SUPPORT
00239
00240 if (_parent.encrypting()) {
00241 for (int i = 0; i < _waiting.elements(); i++) {
00242 infoton *curr = _waiting[i]->_data;
00243 infoton *processed = _parent.wrap_infoton(curr,
00244 _waiting[i]->_id._entity);
00245 if (processed) _waiting[i]->_data = processed;
00246 }
00247 }
00248 #endif
00249
00250 outcome ret = pack_and_ship(_waiting, 0);
00251
00252 if ( (ret != cromp_common::OKAY) && (ret != cromp_common::TIMED_OUT) ) {
00253
00254
00255
00256 LOG(istring("failed to send package back to client: ")
00257 + cromp_common::outcome_name(ret));
00258
00259 any_left = false;
00260 break;
00261 }
00262
00263 if (pending_sends() > SEND_THRESHOLD) {
00264 #ifdef DEBUG_CROMP_SERVER
00265 LOG(istring("over sending threshold on ") + _ent.text_form());
00266 #endif
00267 push_outgoing(SEND_TRIES_ALLOWED);
00268 }
00269
00270 }
00271
00272 push_outgoing(SEND_TRIES_ALLOWED);
00273 if (!spock()->connected()) {
00274 #ifdef DEBUG_CROMP_SERVER
00275 LOG("noticed disconnection of client.");
00276 #endif
00277 _still_connected = false;
00278 }
00279 return any_left;
00280 }
00281
00282 bool get_incoming_data(int &actions) {
00283 FUNCDEF("get_incoming_data");
00284 if (!healthy()) return false;
00285 int first_one = true;
00286 bool saw_something = false;
00287 while (actions++ < MAXIMUM_ACTIONS_PER_CLIENT) {
00288
00289 infoton *item = NIL;
00290 octopus_request_id req_id;
00291 outcome ret = retrieve_and_restore_any(item, req_id,
00292 first_one? DATA_AWAIT_TIMEOUT : 0);
00293 first_one = false;
00294 if (ret == cromp_common::TIMED_OUT) {
00295 actions--;
00296 return false;
00297 } else if (ret != cromp_common::OKAY) {
00298 #ifdef DEBUG_CROMP_SERVER
00299 LOG(istring("got error ") + cromp_common::outcome_name(ret));
00300 #endif
00301 if (ret == cromp_common::NO_CONNECTION) {
00302 #ifdef DEBUG_CROMP_SERVER
00303 LOG("noticed disconnection of client.");
00304 #endif
00305 _still_connected = false;
00306 }
00307 actions--;
00308 return false;
00309 }
00310
00311 saw_something = true;
00312 if (!_fixated) {
00313 if (req_id._entity.blank()) {
00314 LOG(istring("would have assigned ours to blank id! ")
00315 + req_id._entity.mangled_form());
00316 WHACK(item);
00317 continue;
00318 }
00319 #ifdef DEBUG_CROMP_SERVER
00320 LOG(istring("cmd with entity ") + req_id._entity.mangled_form());
00321 #endif
00322 if (_ent.blank()) {
00323
00324 _ent = req_id._entity;
00325 #ifdef DEBUG_CROMP_SERVER
00326 LOG(istring("assigned own entity to ") + _ent.mangled_form());
00327 #endif
00328 } else if (!_fixated && (_ent != req_id._entity) ) {
00329 #ifdef DEBUG_CROMP_SERVER
00330 LOG(istring("fixated on entity of ") + req_id._entity.mangled_form()
00331 + " where we used to have " + _ent.mangled_form());
00332 #endif
00333 _ent = req_id._entity;
00334 _fixated = true;
00335 }
00336 }
00337 #ifdef DEBUG_CROMP_SERVER
00338 else if (_ent != req_id._entity) {
00339
00340 #ifdef DEBUG_CROMP_SERVER
00341 LOG(istring("seeing wrong entity of ") + req_id._entity.mangled_form()
00342 + " when we fixated on " + _ent.mangled_form());
00343 #endif
00344 WHACK(item);
00345 continue;
00346 }
00347 #endif
00348
00349
00350 if (!healthy()) {
00351 WHACK(item);
00352 continue;
00353 }
00354 string_array classif = item->classifier();
00355
00356
00357
00358
00359
00360 ret = _octo->evaluate(item, req_id, _parent.instantaneous());
00361 if (ret != tentacle::OKAY) {
00362 #ifdef DEBUG_CROMP_SERVER
00363 LOG(istring("failed to evaluate the infoton we got: ")
00364 + classif.text_form());
00365 #endif
00366
00367
00368
00369
00370
00371
00372
00374
00375 LOG(istring("injecting unhandled note into response stream for ")
00376 + req_id.text_form() + ", got outcome " + outcome_name(ret));
00377
00378 _parent.send_to_client(req_id,
00379 new unhandled_request(req_id, classif, ret));
00380
00381
00382
00384 }
00385 }
00386 return saw_something;
00387 }
00388
00389 private:
00390 cromp_server &_parent;
00391 octopus *_octo;
00392 octopus_entity _ent;
00393 bool _healthy;
00394 bool _fixated;
00395 cromp_data_grabber _grabber;
00396 infoton_list _waiting;
00397
00398 bool _still_connected;
00399
00400 };
00401
00403
00404 void cromp_data_grabber::perform_activity(void *)
00405 {
00406 FUNCDEF("perform_activity");
00407 while (!should_stop()) {
00408
00409 bool ret = _parent.handle_client_needs(*this);
00410
00411 if (!ret) {
00412
00413 #ifdef DEBUG_CROMP_SERVER
00414 LOG("done handling client needs.");
00415 #endif
00416 _octo->expunge(_parent.ent());
00417 break;
00418 }
00419 }
00420 }
00421
00423
00424 class cromp_client_list : public amorph<cromp_client_record>
00425 {
00426 public:
00427 int find(const octopus_entity &to_find) const {
00428 for (int i = 0; i < elements(); i++)
00429 if (to_find == get(i)->ent()) return i;
00430 return common::NOT_FOUND;
00431 }
00432 };
00433
00435
00436 class client_dropping_thread : public ithread
00437 {
00438 public:
00439 client_dropping_thread (cromp_server &parent)
00440 : ithread(DROPPING_INTERVAL),
00441 _parent(parent) {}
00442
00443 void perform_activity(void *formal(ptr)) {
00444 FUNCDEF("perform_activity");
00445 _parent.drop_dead_clients();
00446 }
00447
00448 private:
00449 cromp_server &_parent;
00450 };
00451
00453
00454 class connection_management_thread : public ithread
00455 {
00456 public:
00457 connection_management_thread(cromp_server &parent)
00458 : ithread(),
00459 _parent(parent) {}
00460
00461 void perform_activity(void *formal(ptr)) {
00462 FUNCDEF("perform_activity");
00463 _parent.look_for_clients(*this);
00464 }
00465
00466 private:
00467 cromp_server &_parent;
00468 };
00469
00471
00472 #undef LOCK_LISTS
00473 #define LOCK_LISTS auto_synchronizer l(*_list_lock)
00474
00475
00476 cromp_server::cromp_server(const internet_address &where,
00477 int accepting_threads, bool instantaneous, int max_per_ent)
00478 : cromp_common(cromp_common::chew_hostname(where), max_per_ent),
00479 _clients(new cromp_client_list),
00480 _accepters(new thread_cabinet),
00481 _list_lock(new mutex),
00482 _next_droppage(new time_stamp(DEAD_CLIENT_CLEANING_INTERVAL)),
00483 _instantaneous(instantaneous),
00484 _where(new internet_address(where)),
00485 _accepting_threads(accepting_threads),
00486 _dropper(new client_dropping_thread(*this)),
00487 _enabled(false),
00488 _encrypt_arm(NIL),
00489 _default_security(new cromp_security)
00490 {
00491 FUNCDEF("constructor");
00492 }
00493
00494 cromp_server::~cromp_server()
00495 {
00496 disable_servers();
00497 _encrypt_arm = NIL;
00498 WHACK(_accepters);
00499 WHACK(_dropper);
00500 WHACK(_clients);
00501 WHACK(_next_droppage);
00502 WHACK(_where);
00503 WHACK(_default_security);
00504 WHACK(_list_lock);
00505 }
00506
00507 internet_address cromp_server::location() const { return *_where; }
00508
00509 bool cromp_server::get_sizes(const octopus_entity &id, int &items, int &bytes)
00510 { return octo()->responses().get_sizes(id, items, bytes); }
00511
00512 internet_address cromp_server::any_address(int port)
00513 {
00514 const byte any_list[] = { 0, 0, 0, 0 };
00515 return internet_address(byte_array(4, any_list), "", port);
00516 }
00517
00518 istring cromp_server::responses_text_form() const
00519 { return octo()->responses().text_form(); }
00520
00521 int cromp_server::DEFAULT_ACCEPTERS() {
00522
00523
00524 #ifdef EMBEDDED_BUILD
00525 return 1;
00526 #else
00527 return 7;
00528 #endif
00529 }
00530
00531 #ifndef OMIT_CRYPTO_SUPPORT
00532 infoton *cromp_server::wrap_infoton(infoton * &request,
00533 const octopus_entity &ent)
00534 {
00535 FUNCDEF("wrap_infoton");
00536
00537
00538
00539
00540
00541 if (dynamic_cast<identity_infoton *>(request)
00542 || dynamic_cast<encryption_infoton *>(request)
00543 || dynamic_cast<encryption_wrapper *>(request)) return NIL;
00544
00545 #ifdef DEBUG_CROMP_SERVER
00546 LOG(istring("encrypting ") + request->text_form());
00547 #endif
00548
00549 octenc_key_record *key = _encrypt_arm->keys().lock(ent);
00550
00551 if (!key) {
00552 LOG(istring("failed to locate key for entity ") + ent.text_form());
00553 return NIL;
00554 }
00555 byte_array packed_request;
00556 infoton::fast_pack(packed_request, *request);
00557 WHACK(request);
00558 encryption_wrapper *to_return = new encryption_wrapper;
00559 key->_key.encrypt(packed_request, to_return->_wrapped);
00560 _encrypt_arm->keys().unlock(key);
00561 return to_return;
00562 }
00563 #endif
00564
00565 void cromp_server::enable_servers(bool encrypt, cromp_security *security)
00566 {
00567 FUNCDEF("enable_servers");
00568 if (encrypt) {
00569 #ifndef OMIT_CRYPTO_SUPPORT
00570
00571 #ifdef DEBUG_CROMP_SERVER
00572 LOG(istring("enabling encryption for ") + class_name()
00573 + " on " + _where->text_form());
00574 #endif
00575 _encrypt_arm = new encryption_tentacle;
00576 add_tentacle(_encrypt_arm, true);
00577 add_tentacle(new unwrapping_tentacle, false);
00578 #endif
00579 }
00580 if (security) {
00581 add_tentacle(new login_tentacle(*security), true);
00582 } else {
00583 add_tentacle(new login_tentacle(*_default_security), true);
00584 }
00585 open_common(*_where);
00586
00587 accept_one_client(false);
00588
00589 #ifdef DEBUG_CROMP_SERVER
00590 LOG(isprintf("adding %d accepting threads.", _accepting_threads));
00591 #endif
00592 for (int i = 0; i < _accepting_threads; i++) {
00593
00594 _accepters->add_thread(new connection_management_thread(*this), true, NIL);
00595 }
00596
00597 _dropper->start(NIL);
00598 _enabled = true;
00599 }
00600
00601 void cromp_server::disable_servers()
00602 {
00603 FUNCDEF("disable_servers");
00604 _dropper->stop();
00605 _accepters->stop_all();
00606 if (_clients) {
00607 LOCK_LISTS;
00608
00609
00610 for (int i = 0; i < _clients->elements(); i++) {
00611
00612 cromp_client_record *cli = (*_clients)[i];
00613 if (cli) cli->croak();
00614 }
00615 }
00616
00617 close_common();
00618
00619
00620
00621
00622
00623
00624 WHACK(_clients);
00625
00626 _enabled = false;
00627 }
00628
00629 int cromp_server::clients() const
00630 {
00631 LOCK_LISTS;
00632 return _clients? _clients->elements() : 0;
00633 }
00634
00635 bool cromp_server::find_entity(const octopus_entity &id,
00636 internet_address &found)
00637 {
00638 FUNCDEF("find_entity");
00639 found = internet_address();
00640 LOCK_LISTS;
00641 int indy = _clients->find(id);
00642 if (negative(indy)) return false;
00643 cromp_client_record *cli = (*_clients)[indy];
00644
00645 found = cli->spock()->remote();
00646 return true;
00647 }
00648
00649 outcome cromp_server::accept_one_client(bool wait)
00650 {
00651 FUNCDEF("accept_one_client");
00652 spocket *accepted = NIL;
00653
00654 outcome ret = spock()->accept(accepted, wait);
00655
00656
00657 if ( (ret == spocket::OKAY) && accepted) {
00658
00659 cromp_client_record *adding = new cromp_client_record(*this, accepted,
00660 octo());
00661 #ifdef DEBUG_CROMP_SERVER
00662 LOG(isprintf("found a new client on sock %d.", accepted->OS_socket()));
00663 #endif
00664 LOCK_LISTS;
00665 _clients->append(adding);
00666 return OKAY;
00667 } else {
00668 if (ret == spocket::NO_CONNECTION)
00669 return NOT_FOUND;
00670 #ifdef DEBUG_CROMP_SERVER
00671 LOG(istring("error accepting client: ") + spocket::outcome_name(ret));
00672 #endif
00673 return DISALLOWED;
00674 }
00675 }
00676
00677 void cromp_server::look_for_clients(ithread &requestor)
00678 {
00679 FUNCDEF("look_for_clients");
00680
00681 while (!requestor.should_stop()) {
00682 outcome ret = accept_one_client(false);
00683 if ( (ret != OKAY) && (ret != NOT_FOUND) ) {
00684
00685
00686 LOG(istring("got real error on socket; leaving for good.")
00687 + spocket::outcome_name(ret));
00688
00689 break;
00690 }
00691
00692
00693 if (ret != OKAY)
00694 portable::sleep_ms(ACCEPTANCE_SNOOZE);
00695 }
00696 }
00697
00698 outcome cromp_server::send_to_client(const octopus_request_id &id,
00699 infoton *data)
00700 {
00701 FUNCDEF("send_to_client");
00702 if (!octo()->responses().add_item(data, id)) {
00703 #ifdef DEBUG_CROMP_SERVER
00704 LOG("failed to store result for client--no space left currently.");
00705 #endif
00706 return TOO_FULL;
00707 }
00708 return OKAY;
00709 }
00710
00711 outcome cromp_server::get_from_client(const octopus_request_id &id,
00712 infoton * &data, int timeout)
00713 {
00714 FUNCDEF("get_from_client");
00715
00716
00717 LOCK_LISTS;
00718 int indy = _clients->find(id._entity);
00719 if (negative(indy)) return NOT_FOUND;
00720 cromp_client_record *cli = (*_clients)[indy];
00721 return cli->retrieve_and_restore(data, id, timeout);
00722 }
00723
00724 void cromp_server::drop_dead_clients()
00725 {
00726 FUNCDEF("drop_dead_clients");
00727
00728
00729 {
00730 LOCK_LISTS;
00731 if (time_stamp() < *_next_droppage) return;
00732 }
00733
00734 LOCK_LISTS;
00735 for (int i = 0; i < _clients->elements(); i++) {
00736 cromp_client_record *cli = (*_clients)[i];
00737 if (!cli) {
00738 #ifdef DEBUG_CROMP_SERVER
00739 LOG(istring("error in list structure."));
00740 #endif
00741 _clients->zap(i, i);
00742 i--;
00743 continue;
00744 }
00745 if (!cli->still_connected() || !cli->healthy()) {
00746 #ifdef DEBUG_CROMP_SERVER
00747 LOG(istring("dropping disconnected client ") + cli->ent().mangled_form());
00748 #endif
00749 cli->croak();
00750
00751
00752 _clients->zap(i, i);
00753 i--;
00754 continue;
00755 }
00756 }
00757
00758 _next_droppage->reset(DEAD_CLIENT_CLEANING_INTERVAL);
00759 }
00760
00761
00762 #endif //CROMP_SERVER_IMPLEMENTATION_FILE
00763