cromp_server.cpp

Go to the documentation of this file.
00001 #ifndef CROMP_SERVER_IMPLEMENTATION_FILE
00002 #define CROMP_SERVER_IMPLEMENTATION_FILE
00003 
00004 /*****************************************************************************\
00005 *                                                                             *
00006 *  Name   : cromp_server                                                      *
00007 *  Author : Chris Koeritz                                                     *
00008 *                                                                             *
00009 *******************************************************************************
00010 * Copyright (c) 2000-$now By Author.  This program is free software; you can  *
00011 * redistribute it and/or modify it under the terms of the GNU General Public  *
00012 * License as published by the Free Software Foundation; either version 2 of   *
00013 * the License or (at your option) any later version.  This is online at:      *
00014 *     http://www.fsf.org/copyleft/gpl.html                                    *
00015 * Please send any updates to: fred@gruntose.com                               *
00016 \*****************************************************************************/
00017 
00018 #include "cromp_common.h"
00019 #include "cromp_security.h"
00020 #include "cromp_server.h"
00021 
00022 #include <basis/function.h>
00023 #include <basis/istring.h>
00024 #include <basis/log_base.h>
00025 #include <basis/portable.h>
00026 #include <basis/mutex.h>
00027 #include <data_struct/amorph.h>
00028 #include <data_struct/unique_id.h>
00029 #include <mechanisms/ithread.h>
00030 #include <mechanisms/thread_cabinet.h>
00031 #include <octopus/entity_data_bin.h>
00032 #include <octopus/entity_defs.h>
00033 #include <octopus/identity_infoton.h>
00034 #include <octopus/infoton.h>
00035 #include <octopus/tentacle.h>
00036 #include <octopus/unhandled_request.h>
00037 #include <sockets/address.h>
00038 #include <sockets/tcpip_stack.h>
00039 #include <sockets/spocket.h>
00040 #include <tentacles/key_repository.h>
00041 #include <tentacles/login_tentacle.h>
00042 
00043 #ifndef OMIT_CRYPTO_SUPPORT
00044   #include <tentacles/encryption_tentacle.h>
00045   #include <tentacles/encryption_wrapper.h>
00046 #endif
00047 
00048 //#define DEBUG_CROMP_SERVER
00049   // uncomment for noisy version.
00050 
00051 const int DEAD_CLIENT_CLEANING_INTERVAL = 1 * SECOND_ms;
00052   // we will drop any clients that have disconnected this long ago.
00053 
00054 const int MAXIMUM_ACTIONS_PER_CLIENT = 4000;
00055   // this is the maximum number of things we'll do in one run for a
00056   // client, including both sends and receives.
00057 
00058 const int SEND_TRIES_ALLOWED = 1;
00059   // the number of attempts we will make to get outgoing data to send.
00060 
00061 const int SEND_THRESHOLD = 512 * KILOBYTE;
00062   // if we pile up some data to this point in our client gathering, we'll
00063   // go ahead and start pushing it to the client.
00064 
00065 const int EXTREME_SEND_TRIES_ALLOWED = 28;
00066   // if we're clogged, we'll push this many times to get data out.
00067 
00068 const int MAXIMUM_BYTES_PER_SEND = 2 * MEGABYTE;
00069   // the maximum size we want our buffer to grow.
00070 
00071 const int MAXIMUM_SIZE_BATCH = 384 * KILOBYTE;
00072   // the largest chunk of updates we'll try to grab at one time.
00073 
00074 const int DROPPING_INTERVAL = 500;
00075   // the rate at which we'll check for dead clients and clean up.
00076 
00077 const int DATA_AWAIT_TIMEOUT = 14;
00078   // how long the server zones out waiting for data.
00079 
00080 const int ACCEPTANCE_SNOOZE = 60;
00081   // if the server sees no clients, it will take a little nap.
00082 
00083 #undef LOG
00084 #define LOG(to_print) CLASS_EMERGENCY_LOG(program_wide_logger(), to_print)
00085 
00087 
00088 // forward.
00089 class cromp_client_record;
00090 
00091 class cromp_data_grabber : public ithread
00092 {
00093 public:
00094   cromp_data_grabber(cromp_client_record &parent, octopus *octo)
00095       : ithread(), _parent(parent), _octo(octo) {}
00096 
00097   IMPLEMENT_CLASS_NAME("cromp_data_grabber");
00098 
00099   virtual void perform_activity(void *);
00100 
00101 private:
00102   cromp_client_record &_parent;
00103   octopus *_octo;
00104 };
00105  
00107 
00108 class cromp_client_record : public cromp_common
00109 {
00110 public:
00111   cromp_client_record(cromp_server &parent, spocket *client, octopus *octo,
00112       login_tentacle &security)
00113   : cromp_common(client, octo),
00114     _parent(parent),
00115     _octo(octo),
00116     _ent(),
00117     _healthy(true),
00118     _fixated(false),
00119     _grabber(*this, octo),
00120     _waiting(),
00121     _still_connected(true),
00122     _security_arm(security)
00123   {
00124     internet_address local_addr = internet_address
00125         (internet_address::localhost(), client->stack().hostname(), 0);
00126     open_common(local_addr);  // open the common support for biz.
00127     _grabber.start(NIL);  // crank up our background data pump on the socket.
00128   }
00129 
00130   ~cromp_client_record() {
00131     croak();
00132   }
00133 
00134   IMPLEMENT_CLASS_NAME("cromp_client_record");
00135 
00136   bool handle_client_needs(ithread &prompter) {
00137     FUNCDEF("handle_client_needs");
00138 #ifdef DEBUG_CROMP_SERVER
00139     time_stamp start;
00140 #endif
00141     if (!_healthy) return false;  // done.
00142     if (!spock()->connected()) {
00143       _still_connected = false;
00144       return false;  // need to stop now.
00145     }
00146     bool keep_going = true;
00147     int actions = 0;
00148     while (keep_going && (actions < MAXIMUM_ACTIONS_PER_CLIENT) ) {
00149       // make sure we don't overstay our welcome when the thread's supposed
00150       // to quit.
00151       if (prompter.should_stop()) return false;
00152       keep_going = false;  // only continue if there's a reason.
00153       bool ret = get_incoming_data(actions);  // look for requests.
00154       if (ret) keep_going = true;
00155       ret = push_client_replies(actions);  // send replies back to the client.
00156       if (ret) keep_going = true;
00157     }
00158 
00159 
00160 #ifdef DEBUG_CROMP_SERVER
00161     if (actions > 10) {
00162       LOG(isprintf("actions=%d", actions));
00163       LOG(isprintf("%d pending send bytes, %d bytes accumulated, bin has "
00164           "%d items.", pending_sends(), accumulated_bytes(),
00165           octo()->responses().items_held()));
00166     }
00167 
00168     int duration = int(time_stamp().value() - start.value());
00169     if (duration > 200) {
00170       LOG(isprintf("duration=%d ms.", duration));
00171     }
00172 #endif
00173 
00174     return true;
00175   }
00176 
00177   const octopus_entity &ent() const { return _ent; }
00178 
00179   // stops the background activity of this object and drops the connection
00180   // to the client.
00181   void croak() {
00182     FUNCDEF("croak");
00183     _grabber.stop();
00184     int actions = 0;
00185     while (get_incoming_data(actions)) {
00186       // keep receiving whatever's there already.  we are trying to drain
00187       // the socket before destroying it.
00188     }
00189     _healthy = false;
00190     // clean out any records for this goner.
00191     _security_arm.expunge(_ent);
00192     close_common();
00193   }
00194 
00195   bool healthy() const { return _healthy; }
00196     // this is true unless the object has been told to shut down.
00197 
00198   bool still_connected() const { return _still_connected; }
00199     // this is true unless the client side dropped the connection.
00200 
00201   cromp_server &parent() const { return _parent; }
00202 
00203   bool push_client_replies(int &actions) {
00204     FUNCDEF("push_client_replies");
00205     if (!healthy()) return false;
00206     if (ent().blank()) {
00207       // not pushing replies if we haven't even gotten a command yet.
00208 #ifdef DEBUG_CROMP_SERVER
00209       LOG("not pushing replies for blank.");
00210 #endif
00211       return false;
00212     }
00213 
00214     if (buffer_clog(MAXIMUM_BYTES_PER_SEND)) {
00215 LOG("buffer clog being cleared now.");
00216       // the buffers are pretty full; we'll try later.
00217       push_outgoing(EXTREME_SEND_TRIES_ALLOWED);
00218       // if we're still clogged, then leave.
00219       if (buffer_clog(MAXIMUM_BYTES_PER_SEND)) {
00220 LOG("could not completely clear buffer clog.");
00221         return true;
00222       }
00223 LOG("cleared out buffer clog.");
00224     }
00225 
00226     int any_left = true;
00227     while (actions++ < MAXIMUM_ACTIONS_PER_CLIENT) {
00228       // make sure we're not wasting our time.
00229       if (!_octo->responses().items_held()) {
00230         any_left = false;
00231         break;
00232       }
00233       // make sure we don't ignore receptions.
00234       grab_anything(false);
00235       // try to grab a result for this entity.
00236       int num_located = _octo->responses().acquire_for_entity(ent(),
00237           _waiting, MAXIMUM_SIZE_BATCH);
00238       if (!num_located) {
00239         any_left = false;
00240         break;
00241       }
00242 
00243 #ifndef OMIT_CRYPTO_SUPPORT
00244       // if we're encrypting, we need to wrap these as well.
00245       if (_parent.encrypting()) {
00246         for (int i = 0; i < _waiting.elements(); i++) {
00247           infoton *curr = _waiting[i]->_data;
00248           infoton *processed = _parent.wrap_infoton(curr,
00249               _waiting[i]->_id._entity);
00250           if (processed) _waiting[i]->_data = processed;  // replace infoton.
00251         }
00252       }
00253 #endif
00254 
00255       outcome ret = pack_and_ship(_waiting, 0);
00256         // no attempt to send yet; we're just stuffing the buffer.
00257       if ( (ret != cromp_common::OKAY) && (ret != cromp_common::TIMED_OUT) ) {
00258 //hmmm: what about keeping transmission as held in list; retry later on it?
00259 
00260 //#ifdef DEBUG_CROMP_SERVER
00261         LOG(istring("failed to send package back to client: ")
00262             + cromp_common::outcome_name(ret));
00263 //#endif
00264         any_left = false;
00265         break;
00266       }
00267 
00268       if (pending_sends() > SEND_THRESHOLD) {
00269 #ifdef DEBUG_CROMP_SERVER
00270         LOG(istring("over sending threshold on ") + _ent.text_form());
00271 #endif
00272         push_outgoing(SEND_TRIES_ALLOWED);
00273       }
00274 
00275     }
00276     // now that we've got a pile possibly, we'll try to send them out.
00277     push_outgoing(SEND_TRIES_ALLOWED);
00278     if (!spock()->connected()) {
00279 #ifdef DEBUG_CROMP_SERVER
00280       LOG("noticed disconnection of client.");
00281 #endif
00282       _still_connected = false;
00283     }
00284     return any_left;
00285   }
00286 
00287   bool get_incoming_data(int &actions) {
00288     FUNCDEF("get_incoming_data");
00289     if (!healthy()) return false;
00290     int first_one = true;
00291     bool saw_something = false;  // true if we got a packet.
00292     while (actions++ < MAXIMUM_ACTIONS_PER_CLIENT) {
00293       // pull in anything waiting.
00294       infoton *item = NIL;
00295       octopus_request_id req_id;
00296       outcome ret = retrieve_and_restore_any(item, req_id,
00297           first_one? DATA_AWAIT_TIMEOUT : 0);
00298       first_one = false;
00299       if (ret == cromp_common::TIMED_OUT) {
00300         actions--;  // didn't actually eat one.
00301         return false;
00302       } else if (ret != cromp_common::OKAY) {
00303 #ifdef DEBUG_CROMP_SERVER
00304         LOG(istring("got error ") + cromp_common::outcome_name(ret));
00305 #endif
00306         if (ret == cromp_common::NO_CONNECTION) {
00307 #ifdef DEBUG_CROMP_SERVER
00308           LOG("noticed disconnection of client.");
00309 #endif
00310           _still_connected = false;
00311         }
00312         actions--;  // didn't actually eat one.
00313         return false;  // get outa here.
00314       }
00315       // got a packet.
00316       saw_something = true;
00317       if (!_fixated) {
00318         if (req_id._entity.blank()) {
00319           LOG(istring("would have assigned ours to blank id! ")
00320               + req_id._entity.mangled_form());
00321           WHACK(item);
00322           continue;
00323         }
00324 #ifdef DEBUG_CROMP_SERVER
00325         LOG(istring("cmd with entity ") + req_id._entity.mangled_form());
00326 #endif
00327         if (_ent.blank()) {
00328           // assign the entity id now that we know it.
00329           _ent = req_id._entity;
00330 #ifdef DEBUG_CROMP_SERVER
00331           LOG(istring("assigned own entity to ") + _ent.mangled_form());
00332 #endif
00333         } else if (!_fixated && (_ent != req_id._entity) ) {
00334 #ifdef DEBUG_CROMP_SERVER
00335           LOG(istring("fixated on entity of ") + req_id._entity.mangled_form()
00336               + " where we used to have " + _ent.mangled_form());
00337 #endif
00338           _ent = req_id._entity;
00339           _fixated = true;
00340         }
00341       }  // connects to line after debug just below.
00342 #ifdef DEBUG_CROMP_SERVER
00343         else if (_ent != req_id._entity) {
00344         // this checks the validity of the entity.
00345 #ifdef DEBUG_CROMP_SERVER
00346         LOG(istring("seeing wrong entity of ") + req_id._entity.mangled_form()
00347             + " when we fixated on " + _ent.mangled_form());
00348 #endif
00349         WHACK(item);
00350         continue;
00351       }
00352 #endif
00353       // check again so we make sure we're still healthy; could have changed
00354       // state while getting a command.
00355       if (!healthy()) {
00356         WHACK(item);
00357         continue;
00358       }
00359       string_array classif = item->classifier();
00360         // hang onto the classifier since the next time we get a chance, the
00361         // object might be destroyed.
00362 
00363       // we pass responsibility for this item over to the octopus.  that's why
00364       // we're not deleting it once evaluate gets the item.
00365       ret = _octo->evaluate(item, req_id, _parent.instantaneous());
00366       if (ret != tentacle::OKAY) {
00367 #ifdef DEBUG_CROMP_SERVER
00368         LOG(istring("failed to evaluate the infoton we got: ")
00369             + classif.text_form());
00370 #endif
00371 //hmmm: we have upgraded this response to be for all errors, since otherwise
00372 //      clients will just time out waiting for something that's never coming.
00373 
00374         // we do a special type of handling when the tentacle is missing.  this
00375         // is almost always because the wrong type of request is being sent to
00376         // a server, or the server didn't register for all the objects it is
00377         // supposed to handle.
00379 //#ifdef DEBUG_CROMP_SERVER
00380           LOG(istring("injecting unhandled note into response stream for ")
00381               + req_id.text_form() + ", got outcome " + outcome_name(ret));
00382 //#endif
00383           _parent.send_to_client(req_id,
00384               new unhandled_request(req_id, classif, ret));
00385             // this will always work, although it's not a surety that the
00386             // client actually still exists.  probably though, since we're
00387             // just now handling this request.
00389       }
00390     }
00391     return saw_something;  // keep going if we actually did anything good.
00392   }
00393 
00394 private:
00395   cromp_server &_parent;  // the object that owns this client.
00396   octopus *_octo;
00397   octopus_entity _ent;  // the entity by which we know this client.
00398   bool _healthy;  // reports our current state of happiness.
00399   bool _fixated;  // true if the entity id has become firm.
00400   cromp_data_grabber _grabber;  // the data grabbing thread.
00401   infoton_list _waiting;
00402     // used by the push_client_replies() method; allocated once to avoid churn.
00403   bool _still_connected;
00404     // set to true up until we notice that the client disconnected.
00405   login_tentacle &_security_arm;  // provides login checking.
00406 };
00407 
00409 
00410 void cromp_data_grabber::perform_activity(void *)
00411 {
00412   FUNCDEF("perform_activity");
00413   while (!should_stop()) {
00414 //    time_stamp started;
00415     bool ret = _parent.handle_client_needs(*this);
00416 //    int duration = int(time_stamp().value() - started.value());
00417     if (!ret) {
00418       // they said to stop.
00419 #ifdef DEBUG_CROMP_SERVER
00420       LOG("done handling client needs.");
00421 #endif
00422       _octo->expunge(_parent.ent());
00423       break;
00424     }
00425   }
00426 }
00427 
00429 
00430 class cromp_client_list : public amorph<cromp_client_record>
00431 {
00432 public:
00433   int find(const octopus_entity &to_find) const {
00434     for (int i = 0; i < elements(); i++)
00435       if (to_find == get(i)->ent()) return i;
00436     return common::NOT_FOUND;
00437   }
00438 };
00439 
00441 
00442 class client_dropping_thread : public ithread
00443 {
00444 public:
00445   client_dropping_thread (cromp_server &parent)
00446   : ithread(DROPPING_INTERVAL),
00447     _parent(parent) {}
00448 
00449   void perform_activity(void *formal(ptr)) {
00450     FUNCDEF("perform_activity");
00451     _parent.drop_dead_clients(); 
00452   }
00453 
00454 private:
00455   cromp_server &_parent;  // we perform tricks for this object.
00456 };
00457 
00459 
00460 class connection_management_thread : public ithread
00461 {
00462 public:
00463   connection_management_thread(cromp_server &parent)
00464   : ithread(),
00465     _parent(parent) {}
00466 
00467   void perform_activity(void *formal(ptr)) {
00468     FUNCDEF("perform_activity");
00469     _parent.look_for_clients(*this); 
00470   }
00471 
00472 private:
00473   cromp_server &_parent;  // we perform tricks for this object.
00474 };
00475 
00477 
00478 #undef LOCK_LISTS
00479 #define LOCK_LISTS auto_synchronizer l(*_list_lock)
00480   // takes over access to the client list and root socket.
00481 
00482 cromp_server::cromp_server(const internet_address &where,
00483     int accepting_threads, bool instantaneous, int max_per_ent)
00484 : cromp_common(cromp_common::chew_hostname(where), max_per_ent),
00485   _clients(new cromp_client_list),
00486   _accepters(new thread_cabinet),
00487   _list_lock(new mutex),
00488   _next_droppage(new time_stamp(DEAD_CLIENT_CLEANING_INTERVAL)),
00489   _instantaneous(instantaneous),
00490   _where(new internet_address(where)),
00491   _accepting_threads(accepting_threads),
00492   _dropper(new client_dropping_thread(*this)),
00493   _enabled(false),
00494   _encrypt_arm(NIL),
00495   _default_security(new cromp_security),
00496   _security_arm(NIL)
00497 {
00498   FUNCDEF("constructor");
00499 }
00500  
00501 cromp_server::~cromp_server()
00502 {
00503   disable_servers();
00504   WHACK(_accepters);
00505   WHACK(_dropper);
00506   WHACK(_clients);
00507   WHACK(_next_droppage);
00508   WHACK(_where);
00509   WHACK(_default_security);
00510   WHACK(_list_lock);
00511   _encrypt_arm = NIL;
00512   _security_arm = NIL;
00513 }
00514 
00515 internet_address cromp_server::location() const { return *_where; }
00516 
00517 bool cromp_server::get_sizes(const octopus_entity &id, int &items, int &bytes)
00518 { return octo()->responses().get_sizes(id, items, bytes); }
00519 
00520 internet_address cromp_server::any_address(int port)
00521 {
00522   const byte any_list[] = { 0, 0, 0, 0 };
00523   return internet_address(byte_array(4, any_list), "", port);
00524 }
00525 
00526 istring cromp_server::responses_text_form() const
00527 { return octo()->responses().text_form(); }
00528 
00529 int cromp_server::DEFAULT_ACCEPTERS() {
00530   // default number of listening threads; this is the maximum number of mostly
00531   // simultaneous connections that the server can pick up at a time.
00532 #ifdef EMBEDDED_BUILD
00533   return 1;  // calm number of threads for embedded systems.
00534 #else
00535   return 7;  // others are not generally so limited on resources.
00536 #endif
00537 }
00538 
00539 #ifndef OMIT_CRYPTO_SUPPORT
00540 infoton *cromp_server::wrap_infoton(infoton * &request,
00541     const octopus_entity &ent)
00542 {
00543   FUNCDEF("wrap_infoton");
00544   if (!_enabled) return NIL;
00545   // identity is not wrapped with encryption; we need to establish and identity
00546   // to talk on a distinct channel with the server.  even if that identity were
00547   // compromised, the interloper should still not be able to listen in on the
00548   // establishment of an encryption channel.  also, the encryption startup
00549   // itself is not encrypted and we don't want to re-encrypt the wrapper.
00550   if (dynamic_cast<identity_infoton *>(request)
00551       || dynamic_cast<encryption_infoton *>(request)
00552       || dynamic_cast<encryption_wrapper *>(request)) return NIL;
00553 
00554 #ifdef DEBUG_CROMP_SERVER
00555   LOG(istring("encrypting ") + request->text_form());
00556 #endif
00557 
00558   octenc_key_record *key = _encrypt_arm->keys().lock(ent);
00559     // lock here is released a bit down below.
00560   if (!key) {
00561     LOG(istring("failed to locate key for entity ") + ent.text_form());
00562     return NIL;
00563   }
00564   byte_array packed_request;
00565   infoton::fast_pack(packed_request, *request);
00566   WHACK(request);
00567   encryption_wrapper *to_return = new encryption_wrapper;
00568   key->_key.encrypt(packed_request, to_return->_wrapped);
00569   _encrypt_arm->keys().unlock(key);
00570   return to_return;
00571 }
00572 #endif
00573 
00574 outcome cromp_server::enable_servers(bool encrypt, cromp_security *security)
00575 {
00576   FUNCDEF("enable_servers");
00577   if (encrypt) {
00578 #ifndef OMIT_CRYPTO_SUPPORT
00579     // add the tentacles needed for encryption.
00580 #ifdef DEBUG_CROMP_SERVER
00581     LOG(istring("enabling encryption for ") + class_name()
00582         + " on " + _where->text_form());
00583 #endif
00584     _encrypt_arm = new encryption_tentacle;
00585     add_tentacle(_encrypt_arm, true);
00586     add_tentacle(new unwrapping_tentacle, false);
00587 #endif
00588   }
00589   WHACK(_security_arm);  // in case being reused.
00590   if (security) {
00591     _security_arm = new login_tentacle(*security);
00592     add_tentacle(_security_arm, true);
00593   } else {
00594     _security_arm = new login_tentacle(*_default_security);
00595     add_tentacle(_security_arm, true);
00596   }
00597   open_common(*_where);  // open the common ground.
00598 
00599   _enabled = true;
00600   outcome to_return = accept_one_client(false);  // try first accept, no waiting.
00601   if (to_return != common::NOT_FOUND) {
00602     LOG(istring("failure starting up server: ") + outcome_name(to_return));
00603     return to_return;
00604   }
00605 
00606 #ifdef DEBUG_CROMP_SERVER
00607   LOG(isprintf("adding %d accepting threads.", _accepting_threads));
00608 #endif
00609   for (int i = 0; i < _accepting_threads; i++) {
00610     // crank in a new thread and tell it yes on starting it.
00611     _accepters->add_thread(new connection_management_thread(*this), true, NIL);
00612   }
00613 
00614   _dropper->start(NIL);
00615   return OKAY;
00616 }
00617 
00618 void cromp_server::disable_servers()
00619 {
00620   FUNCDEF("disable_servers");
00621   if (!_enabled) return;
00622   _dropper->stop();  // signal the thread to leave when it can.
00623   _accepters->stop_all();  // signal the accepting threads to exit.
00624   if (_clients) {
00625     LOCK_LISTS;
00626       // make sure no one rearranges or uses the client list while we're
00627       // working on it.
00628     for (int i = 0; i < _clients->elements(); i++) {
00629       // stop the client's activities before the big shutdown.
00630       cromp_client_record *cli = (*_clients)[i];
00631       if (cli) cli->croak();
00632     }
00633   }
00634 
00635   close_common();  // zap the socket so that our blocked waiters get woken up.
00636 
00637   // now finalize the shutdown.  we don't grab the lock because we don't want
00638   // a deadlock, but we also shouldn't need to grab the lock.  by here, we have
00639   // cancelled all threads, no new clients should be able to be added, and the
00640   // destruction of this list will ensure that each client's thread really is
00641   // stopped.
00642   WHACK(_clients);
00643 
00644   _enabled = false;  // record our defunctivity.
00645 }
00646 
00647 int cromp_server::clients() const
00648 {
00649   LOCK_LISTS;
00650   return _clients? _clients->elements() : 0;
00651 }
00652 
00653 bool cromp_server::disconnect_entity(const octopus_entity &id)
00654 {
00655   FUNCDEF("disconnect_entity");
00656   if (!_enabled) return false;
00657   LOCK_LISTS;
00658   int indy = _clients->find(id);
00659   if (negative(indy)) return false;  // didn't find it.
00660   cromp_client_record *cli = (*_clients)[indy];
00661   // disconnect the client and zap its entity records.
00662   cli->croak();
00663   return true;
00664 }
00665 
00666 bool cromp_server::find_entity(const octopus_entity &id,
00667     internet_address &found)
00668 {
00669   FUNCDEF("find_entity");
00670   if (!_enabled) return false;
00671   found = internet_address();
00672   LOCK_LISTS;
00673   int indy = _clients->find(id);
00674   if (negative(indy)) return false;  // didn't find it.
00675   cromp_client_record *cli = (*_clients)[indy];
00676     // pull out the address from the record at that index.
00677   found = cli->spock()->remote();
00678   return true;
00679 }
00680 
00681 outcome cromp_server::accept_one_client(bool wait)
00682 {
00683   FUNCDEF("accept_one_client");
00684   if (!_enabled) return common::INCOMPLETE;
00685   spocket *accepted = NIL;
00686 //printf((timestamp(true, true) + "into accept\n").s());
00687   outcome ret = spock()->accept(accepted, wait);
00688 //printf((timestamp(true, true) + "out of accept\n").s());
00689     // accept and wait for it to finish.
00690   if ( (ret == spocket::OKAY) && accepted) {
00691     // we got a new client to talk to.
00692     cromp_client_record *adding = new cromp_client_record(*this, accepted,
00693         octo(), *_security_arm);
00694 #ifdef DEBUG_CROMP_SERVER
00695     LOG(isprintf("found a new client on sock %d.", accepted->OS_socket()));
00696 #endif
00697     LOCK_LISTS;  // short term lock.
00698     _clients->append(adding);
00699     return OKAY;
00700   } else {
00701     if (ret == spocket::NO_CONNECTION)
00702       return NOT_FOUND;  // normal occurrence.
00703 #ifdef DEBUG_CROMP_SERVER
00704     LOG(istring("error accepting client: ") + spocket::outcome_name(ret));
00705 #endif
00706     return DISALLOWED;
00707   }
00708 }
00709 
00710 void cromp_server::look_for_clients(ithread &requestor)
00711 {
00712   FUNCDEF("look_for_clients");
00713   if (!_enabled) return;
00714   // see if any clients have been accepted.
00715   while (!requestor.should_stop()) {
00716     outcome ret = accept_one_client(false);
00717     if ( (ret != OKAY) && (ret != NOT_FOUND) ) {
00718       // we got an error condition besides our normal set.
00719 //#ifdef DEBUG_CROMP_SERVER
00720       LOG(istring("got real error on socket; leaving for good.")
00721           + spocket::outcome_name(ret));
00722 //#endif
00723       break;
00724     }
00725     // if we weren't told we got a client, then we'll sleep.  if we did get
00726     // a client, we'll try again right away.
00727     if (ret != OKAY)
00728       portable::sleep_ms(ACCEPTANCE_SNOOZE);
00729   }
00730 }
00731 
00732 outcome cromp_server::send_to_client(const octopus_request_id &id,
00733     infoton *data)
00734 {
00735   FUNCDEF("send_to_client");
00736   if (!_enabled) return common::INCOMPLETE;
00737   if (!octo()->responses().add_item(data, id)) {
00738 #ifdef DEBUG_CROMP_SERVER
00739     LOG("failed to store result for client--no space left currently.");
00740 #endif
00741     return TOO_FULL;
00742   }
00743   return OKAY;
00744 }
00745 
00746 /*outcome cromp_server::get_any_from_client(const octopus_entity &ent,
00747     infoton * &data, int timeout)
00748 {
00749   FUNCDEF("get_from_client");
00750 //hmmm: this implementation locks the lists; can't we get the client to do
00751 //      most of the work for this?
00752   LOCK_LISTS;
00753   int indy = _clients->find(id._entity);
00754   if (negative(indy)) return NOT_FOUND;  // didn't find it.
00755   cromp_client_record *cli = (*_clients)[indy];
00756   octopus_request_id id;
00757   return cli->retrieve_and_restore_any(data, ent, timeout);
00758 }
00759 */
00760 
00761 outcome cromp_server::get_from_client(const octopus_request_id &id,
00762     infoton * &data, int timeout)
00763 {
00764   FUNCDEF("get_from_client");
00765   if (!_enabled) return common::INCOMPLETE;
00766 //hmmm: this implementation locks the lists; can't we get the client to do
00767 //      most of the work for this?
00768   LOCK_LISTS;
00769   int indy = _clients->find(id._entity);
00770   if (negative(indy)) return NOT_FOUND;  // didn't find it.
00771   cromp_client_record *cli = (*_clients)[indy];
00772   return cli->retrieve_and_restore(data, id, timeout);
00773 }
00774 
00775 void cromp_server::drop_dead_clients()
00776 {
00777   FUNCDEF("drop_dead_clients");
00778   if (!_enabled) return;
00779   // clean out any dead clients.
00780 
00781   {
00782     LOCK_LISTS;
00783     if (time_stamp() < *_next_droppage) return;  // not time yet.
00784   }
00785 
00786   LOCK_LISTS;  // keep locked from now on.
00787   for (int i = 0; i < _clients->elements(); i++) {
00788     cromp_client_record *cli = (*_clients)[i];
00789     if (!cli) {
00790 #ifdef DEBUG_CROMP_SERVER
00791       LOG(istring("error in list structure."));
00792 #endif
00793       _clients->zap(i, i);
00794       i--;   // skip back before deleted guy.
00795       continue;
00796     }
00797     if (!cli->still_connected() || !cli->healthy()) {
00798 #ifdef DEBUG_CROMP_SERVER
00799       LOG(istring("dropping disconnected client ") + cli->ent().mangled_form());
00800 #endif
00801       cli->croak();  // stop it from operating.
00802 
00803 //hmmm: check if it has data waiting and complain about it perhaps.
00804       _clients->zap(i, i);
00805       i--;   // skip back before deleted guy.
00806       continue;
00807     }
00808   }
00809 
00810   _next_droppage->reset(DEAD_CLIENT_CLEANING_INTERVAL);
00811 }
00812 
00813 
00814 #endif //CROMP_SERVER_IMPLEMENTATION_FILE
00815 

Generated on Mon Jul 26 04:22:29 2010 for HOOPLE Libraries by  doxygen 1.5.6