cromp_server.cpp

Go to the documentation of this file.
00001 /*****************************************************************************\
00002 *                                                                             *
00003 *  Name   : cromp_server                                                      *
00004 *  Author : Chris Koeritz                                                     *
00005 *                                                                             *
00006 *******************************************************************************
00007 * Copyright (c) 2000-$now By Author.  This program is free software; you can  *
00008 * redistribute it and/or modify it under the terms of the GNU General Public  *
00009 * License as published by the Free Software Foundation; either version 2 of   *
00010 * the License or (at your option) any later version.  This is online at:      *
00011 *     http://www.fsf.org/copyleft/gpl.html                                    *
00012 * Please send any updates to: fred@gruntose.com                               *
00013 \*****************************************************************************/
00014 
00015 #include "cromp_common.h"
00016 #include "cromp_security.h"
00017 #include "cromp_server.h"
00018 
00019 #include <basis/astring.h>
00020 #include <basis/functions.h>
00021 #include <basis/mutex.h>
00022 #include <loggers/program_wide_logger.h>
00023 #include <octopus/entity_data_bin.h>
00024 #include <octopus/entity_defs.h>
00025 #include <octopus/identity_infoton.h>
00026 #include <octopus/infoton.h>
00027 #include <octopus/tentacle.h>
00028 #include <octopus/unhandled_request.h>
00029 #include <processes/ethread.h>
00030 #include <processes/thread_cabinet.h>
00031 #include <sockets/internet_address.h>
00032 #include <sockets/tcpip_stack.h>
00033 #include <sockets/spocket.h>
00034 #include <structures/amorph.h>
00035 #include <structures/unique_id.h>
00036 #include <tentacles/key_repository.h>
00037 #include <tentacles/login_tentacle.h>
00038 #include <tentacles/encryption_tentacle.h>
00039 #include <tentacles/encryption_wrapper.h>
00040 #include <timely/time_control.h>
00041 
00042 using namespace basis;
00043 using namespace loggers;
00044 using namespace octopi;
00045 using namespace processes;
00046 using namespace sockets;
00047 using namespace structures;
00048 using namespace timely;
00049 
00050 namespace cromp {
00051 
00052 //#define DEBUG_CROMP_SERVER
00053   // uncomment for noisy version.
00054 
00055 const int DEAD_CLIENT_CLEANING_INTERVAL = 1 * SECOND_ms;
00056   // we will drop any clients that have disconnected this long ago.
00057 
00058 const int MAXIMUM_ACTIONS_PER_CLIENT = 4000;
00059   // this is the maximum number of things we'll do in one run for a
00060   // client, including both sends and receives.
00061 
00062 const int SEND_TRIES_ALLOWED = 1;
00063   // the number of attempts we will make to get outgoing data to send.
00064 
00065 const int SEND_THRESHOLD = 512 * KILOBYTE;
00066   // if we pile up some data to this point in our client gathering, we'll
00067   // go ahead and start pushing it to the client.
00068 
00069 const int EXTREME_SEND_TRIES_ALLOWED = 28;
00070   // if we're clogged, we'll push this many times to get data out.
00071 
00072 const int MAXIMUM_BYTES_PER_SEND = 2 * MEGABYTE;
00073   // the maximum size we want our buffer to grow.
00074 
00075 const int MAXIMUM_SIZE_BATCH = 384 * KILOBYTE;
00076   // the largest chunk of updates we'll try to grab at one time.
00077 
00078 const int DROPPING_INTERVAL = 500;
00079   // the rate at which we'll check for dead clients and clean up.
00080 
00081 const int DATA_AWAIT_TIMEOUT = 14;
00082   // how long the server zones out waiting for data.
00083 
00084 const int ACCEPTANCE_SNOOZE = 60;
00085   // if the server sees no clients, it will take a little nap.
00086 
00087 #undef LOG
00088 #define LOG(to_print) CLASS_EMERGENCY_LOG(program_wide_logger::get(), astring(to_print).s())
00089 
00091 
00092 // forward.
00093 class cromp_client_record;
00094 
00095 class cromp_data_grabber : public ethread
00096 {
00097 public:
00098   cromp_data_grabber(cromp_client_record &parent, octopus *octo)
00099       : ethread(), _parent(parent), _octo(octo) {}
00100 
00101   DEFINE_CLASS_NAME("cromp_data_grabber");
00102 
00103   virtual void perform_activity(void *);
00104 
00105 private:
00106   cromp_client_record &_parent;
00107   octopus *_octo;
00108 };
00109  
00111 
00112 class cromp_client_record : public cromp_common
00113 {
00114 public:
00115   cromp_client_record(cromp_server &parent, spocket *client, octopus *octo,
00116       login_tentacle &security)
00117   : cromp_common(client, octo),
00118     _parent(parent),
00119     _octo(octo),
00120     _ent(),
00121     _healthy(true),
00122     _fixated(false),
00123     _grabber(*this, octo),
00124     _waiting(),
00125     _still_connected(true),
00126     _security_arm(security)
00127   {
00128     internet_address local_addr = internet_address
00129         (internet_address::localhost(), client->stack().hostname(), 0);
00130     open_common(local_addr);  // open the common support for biz.
00131     _grabber.start(NIL);  // crank up our background data pump on the socket.
00132   }
00133 
00134   ~cromp_client_record() {
00135     croak();
00136   }
00137 
00138   DEFINE_CLASS_NAME("cromp_client_record");
00139 
00140   bool handle_client_needs(ethread &prompter) {
00141 #ifdef DEBUG_CROMP_SERVER
00142     FUNCDEF("handle_client_needs");
00143     time_stamp start;
00144 #endif
00145     if (!_healthy) return false;  // done.
00146     if (!spock()->connected()) {
00147       _still_connected = false;
00148       return false;  // need to stop now.
00149     }
00150     bool keep_going = true;
00151     int actions = 0;
00152     while (keep_going && (actions < MAXIMUM_ACTIONS_PER_CLIENT) ) {
00153       // make sure we don't overstay our welcome when the thread's supposed
00154       // to quit.
00155       if (prompter.should_stop()) return false;
00156       keep_going = false;  // only continue if there's a reason.
00157       bool ret = get_incoming_data(actions);  // look for requests.
00158       if (ret) keep_going = true;
00159       ret = push_client_replies(actions);  // send replies back to the client.
00160       if (ret) keep_going = true;
00161     }
00162 
00163 #ifdef DEBUG_CROMP_SERVER
00164     if (actions > 10) {
00165       LOG(a_sprintf("actions=%d", actions));
00166       LOG(a_sprintf("%d pending send bytes, %d bytes accumulated, bin has "
00167           "%d items.", pending_sends(), accumulated_bytes(),
00168           octo()->responses().items_held()));
00169     }
00170 
00171     int duration = int(time_stamp().value() - start.value());
00172     if (duration > 200) {
00173       LOG(a_sprintf("duration=%d ms.", duration));
00174     }
00175 #endif
00176 
00177     return true;
00178   }
00179 
00180   const octopus_entity &ent() const { return _ent; }
00181 
00182   // stops the background activity of this object and drops the connection
00183   // to the client.
00184   void croak() {
00185 //    FUNCDEF("croak");
00186     _grabber.stop();
00187     int actions = 0;
00188     while (get_incoming_data(actions)) {
00189       // keep receiving whatever's there already.  we are trying to drain
00190       // the socket before destroying it.
00191     }
00192     _healthy = false;
00193     // clean out any records for this goner.
00194     _security_arm.expunge(_ent);
00195     close_common();
00196   }
00197 
00198   bool healthy() const { return _healthy; }
00199     // this is true unless the object has been told to shut down.
00200 
00201   bool still_connected() const { return _still_connected; }
00202     // this is true unless the client side dropped the connection.
00203 
00204   cromp_server &parent() const { return _parent; }
00205 
00206   bool push_client_replies(int &actions) {
00207     FUNCDEF("push_client_replies");
00208     if (!healthy()) return false;
00209     if (ent().blank()) {
00210       // not pushing replies if we haven't even gotten a command yet.
00211 #ifdef DEBUG_CROMP_SERVER
00212       LOG("not pushing replies for blank.");
00213 #endif
00214       return false;
00215     }
00216 
00217     if (buffer_clog(MAXIMUM_BYTES_PER_SEND)) {
00218 LOG("buffer clog being cleared now.");
00219       // the buffers are pretty full; we'll try later.
00220       push_outgoing(EXTREME_SEND_TRIES_ALLOWED);
00221       // if we're still clogged, then leave.
00222       if (buffer_clog(MAXIMUM_BYTES_PER_SEND)) {
00223 LOG("could not completely clear buffer clog.");
00224         return true;
00225       }
00226 LOG("cleared out buffer clog.");
00227     }
00228 
00229     int any_left = true;
00230     while (actions++ < MAXIMUM_ACTIONS_PER_CLIENT) {
00231       // make sure we're not wasting our time.
00232       if (!_octo->responses().items_held()) {
00233         any_left = false;
00234         break;
00235       }
00236       // make sure we don't ignore receptions.
00237       grab_anything(false);
00238       // try to grab a result for this entity.
00239       int num_located = _octo->responses().acquire_for_entity(ent(),
00240           _waiting, MAXIMUM_SIZE_BATCH);
00241       if (!num_located) {
00242         any_left = false;
00243         break;
00244       }
00245 
00246       // if we're encrypting, we need to wrap these as well.
00247       if (_parent.encrypting()) {
00248         for (int i = 0; i < _waiting.elements(); i++) {
00249           infoton *curr = _waiting[i]->_data;
00250           infoton *processed = _parent.wrap_infoton(curr,
00251               _waiting[i]->_id._entity);
00252           if (processed) _waiting[i]->_data = processed;  // replace infoton.
00253         }
00254       }
00255 
00256       outcome ret = pack_and_ship(_waiting, 0);
00257         // no attempt to send yet; we're just stuffing the buffer.
00258       if ( (ret != cromp_common::OKAY) && (ret != cromp_common::TIMED_OUT) ) {
00259 //hmmm: what about keeping transmission as held in list; retry later on it?
00260 
00261 //#ifdef DEBUG_CROMP_SERVER
00262         LOG(astring("failed to send package back to client: ")
00263             + cromp_common::outcome_name(ret));
00264 //#endif
00265         any_left = false;
00266         break;
00267       }
00268 
00269       if (pending_sends() > SEND_THRESHOLD) {
00270 #ifdef DEBUG_CROMP_SERVER
00271         LOG(astring("over sending threshold on ") + _ent.text_form());
00272 #endif
00273         push_outgoing(SEND_TRIES_ALLOWED);
00274       }
00275 
00276     }
00277     // now that we've got a pile possibly, we'll try to send them out.
00278     push_outgoing(SEND_TRIES_ALLOWED);
00279     if (!spock()->connected()) {
00280 #ifdef DEBUG_CROMP_SERVER
00281       LOG("noticed disconnection of client.");
00282 #endif
00283       _still_connected = false;
00284     }
00285     return any_left;
00286   }
00287 
00288   bool get_incoming_data(int &actions) {
00289     FUNCDEF("get_incoming_data");
00290     if (!healthy()) return false;
00291     int first_one = true;
00292     bool saw_something = false;  // true if we got a packet.
00293     while (actions++ < MAXIMUM_ACTIONS_PER_CLIENT) {
00294       // pull in anything waiting.
00295       infoton *item = NIL;
00296       octopus_request_id req_id;
00297       outcome ret = retrieve_and_restore_any(item, req_id,
00298           first_one? DATA_AWAIT_TIMEOUT : 0);
00299       first_one = false;
00300       if (ret == cromp_common::TIMED_OUT) {
00301         actions--;  // didn't actually eat one.
00302         return false;
00303       } else if (ret != cromp_common::OKAY) {
00304 #ifdef DEBUG_CROMP_SERVER
00305         LOG(astring("got error ") + cromp_common::outcome_name(ret));
00306 #endif
00307         if (ret == cromp_common::NO_CONNECTION) {
00308 #ifdef DEBUG_CROMP_SERVER
00309           LOG("noticed disconnection of client.");
00310 #endif
00311           _still_connected = false;
00312         }
00313         actions--;  // didn't actually eat one.
00314         return false;  // get outa here.
00315       }
00316       // got a packet.
00317       saw_something = true;
00318       if (!_fixated) {
00319         if (req_id._entity.blank()) {
00320           LOG(astring("would have assigned ours to blank id! ")
00321               + req_id._entity.mangled_form());
00322           WHACK(item);
00323           continue;
00324         }
00325 #ifdef DEBUG_CROMP_SERVER
00326         LOG(astring("cmd with entity ") + req_id._entity.mangled_form());
00327 #endif
00328         if (_ent.blank()) {
00329           // assign the entity id now that we know it.
00330           _ent = req_id._entity;
00331 #ifdef DEBUG_CROMP_SERVER
00332           LOG(astring("assigned own entity to ") + _ent.mangled_form());
00333 #endif
00334         } else if (!_fixated && (_ent != req_id._entity) ) {
00335 #ifdef DEBUG_CROMP_SERVER
00336           LOG(astring("fixated on entity of ") + req_id._entity.mangled_form()
00337               + " where we used to have " + _ent.mangled_form());
00338 #endif
00339           _ent = req_id._entity;
00340           _fixated = true;
00341         }
00342       }  // connects to line after debug just below.
00343 #ifdef DEBUG_CROMP_SERVER
00344         else if (_ent != req_id._entity) {
00345         // this checks the validity of the entity.
00346 #ifdef DEBUG_CROMP_SERVER
00347         LOG(astring("seeing wrong entity of ") + req_id._entity.mangled_form()
00348             + " when we fixated on " + _ent.mangled_form());
00349 #endif
00350         WHACK(item);
00351         continue;
00352       }
00353 #endif
00354       // check again so we make sure we're still healthy; could have changed
00355       // state while getting a command.
00356       if (!healthy()) {
00357         WHACK(item);
00358         continue;
00359       }
00360       string_array classif = item->classifier();
00361         // hang onto the classifier since the next time we get a chance, the
00362         // object might be destroyed.
00363 
00364       // we pass responsibility for this item over to the octopus.  that's why
00365       // we're not deleting it once evaluate gets the item.
00366       ret = _octo->evaluate(item, req_id, _parent.instantaneous());
00367       if (ret != tentacle::OKAY) {
00368 #ifdef DEBUG_CROMP_SERVER
00369         LOG(astring("failed to evaluate the infoton we got: ")
00370             + classif.text_form());
00371 #endif
00372 //hmmm: we have upgraded this response to be for all errors, since otherwise
00373 //      clients will just time out waiting for something that's never coming.
00374 
00375         // we do a special type of handling when the tentacle is missing.  this
00376         // is almost always because the wrong type of request is being sent to
00377         // a server, or the server didn't register for all the objects it is
00378         // supposed to handle.
00380 //#ifdef DEBUG_CROMP_SERVER
00381           LOG(astring("injecting unhandled note into response stream for ")
00382               + req_id.text_form() + ", got outcome " + outcome_name(ret));
00383 //#endif
00384           _parent.send_to_client(req_id,
00385               new unhandled_request(req_id, classif, ret));
00386             // this will always work, although it's not a surety that the
00387             // client actually still exists.  probably though, since we're
00388             // just now handling this request.
00390       }
00391     }
00392     return saw_something;  // keep going if we actually did anything good.
00393   }
00394 
00395 private:
00396   cromp_server &_parent;  // the object that owns this client.
00397   octopus *_octo;
00398   octopus_entity _ent;  // the entity by which we know this client.
00399   bool _healthy;  // reports our current state of happiness.
00400   bool _fixated;  // true if the entity id has become firm.
00401   cromp_data_grabber _grabber;  // the data grabbing thread.
00402   infoton_list _waiting;
00403     // used by the push_client_replies() method; allocated once to avoid churn.
00404   bool _still_connected;
00405     // set to true up until we notice that the client disconnected.
00406   login_tentacle &_security_arm;  // provides login checking.
00407 };
00408 
00410 
00411 void cromp_data_grabber::perform_activity(void *)
00412 {
00413 #ifdef DEBUG_CROMP_SERVER
00414   FUNCDEF("perform_activity");
00415 #endif
00416   while (!should_stop()) {
00417 //    time_stamp started;
00418     bool ret = _parent.handle_client_needs(*this);
00419 //    int duration = int(time_stamp().value() - started.value());
00420     if (!ret) {
00421       // they said to stop.
00422 #ifdef DEBUG_CROMP_SERVER
00423       LOG("done handling client needs.");
00424 #endif
00425       _octo->expunge(_parent.ent());
00426       break;
00427     }
00428   }
00429 }
00430 
00432 
00433 class cromp_client_list : public amorph<cromp_client_record>
00434 {
00435 public:
00436   int find(const octopus_entity &to_find) const {
00437     for (int i = 0; i < elements(); i++)
00438       if (to_find == get(i)->ent()) return i;
00439     return common::NOT_FOUND;
00440   }
00441 };
00442 
00444 
00445 class client_dropping_thread : public ethread
00446 {
00447 public:
00448   client_dropping_thread (cromp_server &parent)
00449   : ethread(DROPPING_INTERVAL),
00450     _parent(parent) {}
00451 
00452   void perform_activity(void *formal(ptr)) {
00453 //    FUNCDEF("perform_activity");
00454     _parent.drop_dead_clients(); 
00455   }
00456 
00457 private:
00458   cromp_server &_parent;  // we perform tricks for this object.
00459 };
00460 
00462 
00463 class connection_management_thread : public ethread
00464 {
00465 public:
00466   connection_management_thread(cromp_server &parent)
00467   : ethread(),
00468     _parent(parent) {}
00469 
00470   void perform_activity(void *formal(ptr)) {
00471 //    FUNCDEF("perform_activity");
00472     _parent.look_for_clients(*this); 
00473   }
00474 
00475 private:
00476   cromp_server &_parent;  // we perform tricks for this object.
00477 };
00478 
00480 
00481 #undef LOCK_LISTS
00482 #define LOCK_LISTS auto_synchronizer l(*_list_lock)
00483   // takes over access to the client list and root socket.
00484 
00485 cromp_server::cromp_server(const internet_address &where,
00486     int accepting_threads, bool instantaneous, int max_per_ent)
00487 : cromp_common(cromp_common::chew_hostname(where), max_per_ent),
00488   _clients(new cromp_client_list),
00489   _accepters(new thread_cabinet),
00490   _list_lock(new mutex),
00491   _next_droppage(new time_stamp(DEAD_CLIENT_CLEANING_INTERVAL)),
00492   _instantaneous(instantaneous),
00493   _where(new internet_address(where)),
00494   _accepting_threads(accepting_threads),
00495   _dropper(new client_dropping_thread(*this)),
00496   _enabled(false),
00497   _encrypt_arm(NIL),
00498   _default_security(new cromp_security),
00499   _security_arm(NIL)
00500 {
00501 //  FUNCDEF("constructor");
00502 }
00503  
00504 cromp_server::~cromp_server()
00505 {
00506   disable_servers();
00507   WHACK(_accepters);
00508   WHACK(_dropper);
00509   WHACK(_clients);
00510   WHACK(_next_droppage);
00511   WHACK(_where);
00512   WHACK(_default_security);
00513   WHACK(_list_lock);
00514   _encrypt_arm = NIL;
00515   _security_arm = NIL;
00516 }
00517 
00518 internet_address cromp_server::location() const { return *_where; }
00519 
00520 bool cromp_server::get_sizes(const octopus_entity &id, int &items, int &bytes)
00521 { return octo()->responses().get_sizes(id, items, bytes); }
00522 
00523 internet_address cromp_server::any_address(int port)
00524 {
00525   const abyte any_list[] = { 0, 0, 0, 0 };
00526   return internet_address(byte_array(4, any_list), "", port);
00527 }
00528 
00529 astring cromp_server::responses_text_form() const
00530 { return octo()->responses().text_form(); }
00531 
00532 int cromp_server::DEFAULT_ACCEPTERS() {
00533   // default number of listening threads; this is the maximum number of mostly
00534   // simultaneous connections that the server can pick up at a time.
00535   return 7;  // others are not generally so limited on resources.
00536 }
00537 
00538 infoton *cromp_server::wrap_infoton(infoton * &request,
00539     const octopus_entity &ent)
00540 {
00541   FUNCDEF("wrap_infoton");
00542   if (!_enabled) return NIL;
00543   // identity is not wrapped with encryption; we need to establish and identity
00544   // to talk on a distinct channel with the server.  even if that identity were
00545   // compromised, the interloper should still not be able to listen in on the
00546   // establishment of an encryption channel.  also, the encryption startup
00547   // itself is not encrypted and we don't want to re-encrypt the wrapper.
00548   if (dynamic_cast<identity_infoton *>(request)
00549       || dynamic_cast<encryption_infoton *>(request)
00550       || dynamic_cast<encryption_wrapper *>(request)) return NIL;
00551 
00552 #ifdef DEBUG_CROMP_SERVER
00553   LOG(astring("encrypting ") + request->text_form());
00554 #endif
00555 
00556   octenc_key_record *key = _encrypt_arm->keys().lock(ent);
00557     // lock here is released a bit down below.
00558   if (!key) {
00559     LOG(astring("failed to locate key for entity ") + ent.text_form());
00560     return NIL;
00561   }
00562   byte_array packed_request;
00563   infoton::fast_pack(packed_request, *request);
00564   WHACK(request);
00565   encryption_wrapper *to_return = new encryption_wrapper;
00566   key->_key.encrypt(packed_request, to_return->_wrapped);
00567   _encrypt_arm->keys().unlock(key);
00568   return to_return;
00569 }
00570 
00571 outcome cromp_server::enable_servers(bool encrypt, cromp_security *security)
00572 {
00573   FUNCDEF("enable_servers");
00574   if (encrypt) {
00575     // add the tentacles needed for encryption.
00576 #ifdef DEBUG_CROMP_SERVER
00577     LOG(astring("enabling encryption for ") + class_name()
00578         + " on " + _where->text_form());
00579 #endif
00580     _encrypt_arm = new encryption_tentacle;
00581     add_tentacle(_encrypt_arm, true);
00582     add_tentacle(new unwrapping_tentacle, false);
00583   }
00584   WHACK(_security_arm);  // in case being reused.
00585   if (security) {
00586     _security_arm = new login_tentacle(*security);
00587     add_tentacle(_security_arm, true);
00588   } else {
00589     _security_arm = new login_tentacle(*_default_security);
00590     add_tentacle(_security_arm, true);
00591   }
00592   open_common(*_where);  // open the common ground.
00593 
00594   _enabled = true;
00595   // try first accept, no waiting.
00596   outcome to_return = accept_one_client(false);
00597   if ( (to_return != common::NOT_FOUND) && (to_return != common::OKAY) ) {
00598     LOG(astring("failure starting up server: ") + outcome_name(to_return));
00599     return to_return;
00600   }
00601 
00602 #ifdef DEBUG_CROMP_SERVER
00603   LOG(a_sprintf("adding %d accepting threads.", _accepting_threads));
00604 #endif
00605   for (int i = 0; i < _accepting_threads; i++) {
00606     // crank in a new thread and tell it yes on starting it.
00607     _accepters->add_thread(new connection_management_thread(*this), true, NIL);
00608   }
00609 
00610   _dropper->start(NIL);
00611   return OKAY;
00612 }
00613 
00614 void cromp_server::disable_servers()
00615 {
00616 //  FUNCDEF("disable_servers");
00617   if (!_enabled) return;
00618   _dropper->stop();  // signal the thread to leave when it can.
00619   _accepters->stop_all();  // signal the accepting threads to exit.
00620   if (_clients) {
00621     LOCK_LISTS;
00622       // make sure no one rearranges or uses the client list while we're
00623       // working on it.
00624     for (int i = 0; i < _clients->elements(); i++) {
00625       // stop the client's activities before the big shutdown.
00626       cromp_client_record *cli = (*_clients)[i];
00627       if (cli) cli->croak();
00628     }
00629   }
00630 
00631   close_common();  // zap the socket so that our blocked waiters get woken up.
00632 
00633   // now finalize the shutdown.  we don't grab the lock because we don't want
00634   // a deadlock, but we also shouldn't need to grab the lock.  by here, we have
00635   // cancelled all threads, no new clients should be able to be added, and the
00636   // destruction of this list will ensure that each client's thread really is
00637   // stopped.
00638   WHACK(_clients);
00639 
00640   _enabled = false;  // record our defunctivity.
00641 }
00642 
00643 int cromp_server::clients() const
00644 {
00645   LOCK_LISTS;
00646   return _clients? _clients->elements() : 0;
00647 }
00648 
00649 bool cromp_server::disconnect_entity(const octopus_entity &id)
00650 {
00651 //  FUNCDEF("disconnect_entity");
00652   if (!_enabled) return false;
00653   LOCK_LISTS;
00654   int indy = _clients->find(id);
00655   if (negative(indy)) return false;  // didn't find it.
00656   cromp_client_record *cli = (*_clients)[indy];
00657   // disconnect the client and zap its entity records.
00658   cli->croak();
00659   return true;
00660 }
00661 
00662 bool cromp_server::find_entity(const octopus_entity &id,
00663     internet_address &found)
00664 {
00665 //  FUNCDEF("find_entity");
00666   if (!_enabled) return false;
00667   found = internet_address();
00668   LOCK_LISTS;
00669   int indy = _clients->find(id);
00670   if (negative(indy)) return false;  // didn't find it.
00671   cromp_client_record *cli = (*_clients)[indy];
00672     // pull out the address from the record at that index.
00673   found = cli->spock()->remote();
00674   return true;
00675 }
00676 
00677 outcome cromp_server::accept_one_client(bool wait)
00678 {
00679 #ifdef DEBUG_CROMP_SERVER
00680   FUNCDEF("accept_one_client");
00681 #endif
00682   if (!_enabled) return common::INCOMPLETE;
00683   spocket *accepted = NIL;
00684 //printf((timestamp(true, true) + "into accept\n").s());
00685   outcome ret = spock()->accept(accepted, wait);
00686 //printf((timestamp(true, true) + "out of accept\n").s());
00687     // accept and wait for it to finish.
00688   if ( (ret == spocket::OKAY) && accepted) {
00689     // we got a new client to talk to.
00690     cromp_client_record *adding = new cromp_client_record(*this, accepted,
00691         octo(), *_security_arm);
00692 #ifdef DEBUG_CROMP_SERVER
00693     LOG(a_sprintf("found a new client on sock %d.", accepted->OS_socket()));
00694 #endif
00695     LOCK_LISTS;  // short term lock.
00696     _clients->append(adding);
00697     return OKAY;
00698   } else {
00699     if (ret == spocket::NO_CONNECTION)
00700       return NOT_FOUND;  // normal occurrence.
00701 #ifdef DEBUG_CROMP_SERVER
00702     LOG(astring("error accepting client: ") + spocket::outcome_name(ret));
00703 #endif
00704     return DISALLOWED;
00705   }
00706 }
00707 
00708 void cromp_server::look_for_clients(ethread &requestor)
00709 {
00710   FUNCDEF("look_for_clients");
00711   if (!_enabled) return;
00712   // see if any clients have been accepted.
00713   while (!requestor.should_stop()) {
00714     outcome ret = accept_one_client(false);
00715     if ( (ret != OKAY) && (ret != NOT_FOUND) ) {
00716       // we got an error condition besides our normal set.
00717 //#ifdef DEBUG_CROMP_SERVER
00718       LOG(astring("got real error on socket; leaving for good.")
00719           + spocket::outcome_name(ret));
00720 //#endif
00721       break;
00722     }
00723     // if we weren't told we got a client, then we'll sleep.  if we did get
00724     // a client, we'll try again right away.
00725     if (ret != OKAY)
00726       time_control::sleep_ms(ACCEPTANCE_SNOOZE);
00727   }
00728 }
00729 
00730 outcome cromp_server::send_to_client(const octopus_request_id &id,
00731     infoton *data)
00732 {
00733 #ifdef DEBUG_CROMP_SERVER
00734   FUNCDEF("send_to_client");
00735 #endif
00736   if (!_enabled) return common::INCOMPLETE;
00737   if (!octo()->responses().add_item(data, id)) {
00738 #ifdef DEBUG_CROMP_SERVER
00739     LOG("failed to store result for client--no space left currently.");
00740 #endif
00741     return TOO_FULL;
00742   }
00743   return OKAY;
00744 }
00745 
00746 /*outcome cromp_server::get_any_from_client(const octopus_entity &ent,
00747     infoton * &data, int timeout)
00748 {
00749   FUNCDEF("get_from_client");
00750 //hmmm: this implementation locks the lists; can't we get the client to do
00751 //      most of the work for this?
00752   LOCK_LISTS;
00753   int indy = _clients->find(id._entity);
00754   if (negative(indy)) return NOT_FOUND;  // didn't find it.
00755   cromp_client_record *cli = (*_clients)[indy];
00756   octopus_request_id id;
00757   return cli->retrieve_and_restore_any(data, ent, timeout);
00758 }
00759 */
00760 
00761 outcome cromp_server::get_from_client(const octopus_request_id &id,
00762     infoton * &data, int timeout)
00763 {
00764 //  FUNCDEF("get_from_client");
00765   if (!_enabled) return common::INCOMPLETE;
00766 //hmmm: this implementation locks the lists; can't we get the client to do
00767 //      most of the work for this?
00768   LOCK_LISTS;
00769   int indy = _clients->find(id._entity);
00770   if (negative(indy)) return NOT_FOUND;  // didn't find it.
00771   cromp_client_record *cli = (*_clients)[indy];
00772   return cli->retrieve_and_restore(data, id, timeout);
00773 }
00774 
00775 void cromp_server::drop_dead_clients()
00776 {
00777 #ifdef DEBUG_CROMP_SERVER
00778   FUNCDEF("drop_dead_clients");
00779 #endif
00780   if (!_enabled) return;
00781   // clean out any dead clients.
00782 
00783   {
00784     LOCK_LISTS;
00785     if (time_stamp() < *_next_droppage) return;  // not time yet.
00786   }
00787 
00788   LOCK_LISTS;  // keep locked from now on.
00789   for (int i = 0; i < _clients->elements(); i++) {
00790     cromp_client_record *cli = (*_clients)[i];
00791     if (!cli) {
00792 #ifdef DEBUG_CROMP_SERVER
00793       LOG(astring("error in list structure."));
00794 #endif
00795       _clients->zap(i, i);
00796       i--;   // skip back before deleted guy.
00797       continue;
00798     }
00799     if (!cli->still_connected() || !cli->healthy()) {
00800 #ifdef DEBUG_CROMP_SERVER
00801       LOG(astring("dropping disconnected client ") + cli->ent().mangled_form());
00802 #endif
00803       cli->croak();  // stop it from operating.
00804 
00805 //hmmm: check if it has data waiting and complain about it perhaps.
00806       _clients->zap(i, i);
00807       i--;   // skip back before deleted guy.
00808       continue;
00809     }
00810   }
00811 
00812   _next_droppage->reset(DEAD_CLIENT_CLEANING_INTERVAL);
00813 }
00814 
00815 } //namespace.
00816 
Generated on Sat Jan 28 04:22:42 2012 for hoople2 project by  doxygen 1.6.3