cromp_server.cpp

Go to the documentation of this file.
00001 #ifndef CROMP_SERVER_IMPLEMENTATION_FILE
00002 #define CROMP_SERVER_IMPLEMENTATION_FILE
00003 
00004 /*****************************************************************************\
00005 *                                                                             *
00006 *  Name   : cromp_server                                                      *
00007 *  Author : Chris Koeritz                                                     *
00008 *                                                                             *
00009 *******************************************************************************
00010 * Copyright (c) 2000-$now By Author.  This program is free software; you can  *
00011 * redistribute it and/or modify it under the terms of the GNU General Public  *
00012 * License as published by the Free Software Foundation; either version 2 of   *
00013 * the License or (at your option) any later version.  This is online at:      *
00014 *     http://www.fsf.org/copyleft/gpl.html                                    *
00015 * Please send any updates to: fred@gruntose.com                               *
00016 \*****************************************************************************/
00017 
00018 #include "cromp_common.h"
00019 #include "cromp_security.h"
00020 #include "cromp_server.h"
00021 
00022 #include <basis/function.h>
00023 #include <basis/istring.h>
00024 #include <basis/log_base.h>
00025 #include <basis/portable.h>
00026 #include <basis/mutex.h>
00027 #include <data_struct/amorph.cpp>
00028 #include <data_struct/unique_id.h>
00029 #include <mechanisms/ithread.h>
00030 #include <mechanisms/thread_cabinet.h>
00031 #include <octopus/entity_data_bin.h>
00032 #include <octopus/entity_defs.h>
00033 #include <octopus/identity_infoton.h>
00034 #include <octopus/infoton.h>
00035 #include <octopus/tentacle.h>
00036 #include <octopus/unhandled_request.h>
00037 #include <sockets/address.h>
00038 #include <sockets/tcpip_stack.h>
00039 #include <sockets/spocket.h>
00040 #include <tentacles/key_repository.h>
00041 #include <tentacles/login_tentacle.h>
00042 
00043 #ifndef OMIT_CRYPTO_SUPPORT
00044   #include <tentacles/encryption_tentacle.h>
00045   #include <tentacles/encryption_wrapper.h>
00046 #endif
00047 
00048 //#define DEBUG_CROMP_SERVER
00049   // uncomment for noisy version.
00050 
00051 const int DEAD_CLIENT_CLEANING_INTERVAL = 1 * SECOND_ms;
00052   // we will drop any clients that have disconnected this long ago.
00053 
00054 const int MAXIMUM_ACTIONS_PER_CLIENT = 4000;
00055   // this is the maximum number of things we'll do in one run for a
00056   // client, including both sends and receives.
00057 
00058 const int SEND_TRIES_ALLOWED = 1;
00059   // the number of attempts we will make to get outgoing data to send.
00060 
00061 const int SEND_THRESHOLD = 512 * KILOBYTE;
00062   // if we pile up some data to this point in our client gathering, we'll
00063   // go ahead and start pushing it to the client.
00064 
00065 const int EXTREME_SEND_TRIES_ALLOWED = 28;
00066   // if we're clogged, we'll push this many times to get data out.
00067 
00068 const int MAXIMUM_BYTES_PER_SEND = 2 * MEGABYTE;
00069   // the maximum size we want our buffer to grow.
00070 
00071 const int MAXIMUM_SIZE_BATCH = 384 * KILOBYTE;
00072   // the largest chunk of updates we'll try to grab at one time.
00073 
00074 const int DROPPING_INTERVAL = 500;
00075   // the rate at which we'll check for dead clients and clean up.
00076 
00077 const int DATA_AWAIT_TIMEOUT = 14;
00078   // how long the server zones out waiting for data.
00079 
00080 const int ACCEPTANCE_SNOOZE = 60;
00081   // if the server sees no clients, it will take a little nap.
00082 
00083 #undef LOG
00084 #define LOG(to_print) CLASS_EMERGENCY_LOG(program_wide_logger(), to_print)
00085 
00087 
00088 // forward.
00089 class cromp_client_record;
00090 
00091 class cromp_data_grabber : public ithread
00092 {
00093 public:
00094   cromp_data_grabber(cromp_client_record &parent, octopus *octo)
00095       : ithread(), _parent(parent), _octo(octo) {}
00096 
00097   IMPLEMENT_CLASS_NAME("cromp_data_grabber");
00098 
00099   virtual void perform_activity(void *);
00100 
00101 private:
00102   cromp_client_record &_parent;
00103   octopus *_octo;
00104 };
00105  
00107 
00108 class cromp_client_record : public cromp_common
00109 {
00110 public:
00111   cromp_client_record(cromp_server &parent, spocket *client, octopus *octo)
00112   : cromp_common(client, octo),
00113     _parent(parent),
00114     _octo(octo),
00115     _ent(),
00116     _healthy(true),
00117     _fixated(false),
00118     _grabber(*this, octo),
00119     _waiting(),
00120     _still_connected(true)
00121   {
00122     internet_address local_addr = internet_address
00123         (internet_address::localhost(), client->stack().hostname(), 0);
00124     open_common(local_addr);  // open the common support for biz.
00125     _grabber.start(NIL);  // crank up our background data pump on the socket.
00126   }
00127 
00128   ~cromp_client_record() {
00129     croak();
00130   }
00131 
00132   IMPLEMENT_CLASS_NAME("cromp_client_record");
00133 
00134   bool handle_client_needs(ithread &prompter) {
00135     FUNCDEF("handle_client_needs");
00136 #ifdef DEBUG_CROMP_SERVER
00137     time_stamp start;
00138 #endif
00139     if (!_healthy) return false;  // done.
00140     if (!spock()->connected()) {
00141       _still_connected = false;
00142       return false;  // need to stop now.
00143     }
00144     bool keep_going = true;
00145     int actions = 0;
00146     while (keep_going && (actions < MAXIMUM_ACTIONS_PER_CLIENT) ) {
00147       // make sure we don't overstay our welcome when the thread's supposed
00148       // to quit.
00149       if (prompter.should_stop()) return false;
00150       keep_going = false;  // only continue if there's a reason.
00151       bool ret = get_incoming_data(actions);  // look for requests.
00152       if (ret) keep_going = true;
00153       ret = push_client_replies(actions);  // send replies back to the client.
00154       if (ret) keep_going = true;
00155     }
00156 
00157 
00158 #ifdef DEBUG_CROMP_SERVER
00159     if (actions > 10) {
00160       LOG(isprintf("actions=%d", actions));
00161       LOG(isprintf("%d pending send bytes, %d bytes accumulated, bin has "
00162           "%d items.", pending_sends(), accumulated_bytes(),
00163           octo()->responses().items_held()));
00164 
00165     int duration = int(time_stamp().value() - start.value());
00166     if (duration > 200) {
00167       LOG(isprintf("duration=%d ms.", duration));
00168     }
00169 #endif
00170 
00171     return true;
00172   }
00173 
00174   const octopus_entity &ent() const { return _ent; }
00175 
00176   // stops the background activity of this object and drops the connection
00177   // to the client.
00178   void croak() {
00179     FUNCDEF("croak");
00180     _grabber.stop();
00181     int actions = 0;
00182     while (get_incoming_data(actions)) {
00183       // keep receiving whatever's there already.  we are trying to drain
00184       // the socket before destroying it.
00185     }
00186     _healthy = false;
00187     close_common();
00188   }
00189 
00190   bool healthy() const { return _healthy; }
00191     // this is true unless the object has been told to shut down.
00192 
00193   bool still_connected() const { return _still_connected; }
00194     // this is true unless the client side dropped the connection.
00195 
00196   cromp_server &parent() const { return _parent; }
00197 
00198   bool push_client_replies(int &actions) {
00199     FUNCDEF("push_client_replies");
00200     if (!healthy()) return false;
00201     if (ent().blank()) {
00202       // not pushing replies if we haven't even gotten a command yet.
00203 #ifdef DEBUG_CROMP_SERVER
00204       LOG("not pushing replies for blank.");
00205 #endif
00206       return false;
00207     }
00208 
00209     if (buffer_clog(MAXIMUM_BYTES_PER_SEND)) {
00210 LOG("buffer clog being cleared now.");
00211       // the buffers are pretty full; we'll try later.
00212       push_outgoing(EXTREME_SEND_TRIES_ALLOWED);
00213       // if we're still clogged, then leave.
00214       if (buffer_clog(MAXIMUM_BYTES_PER_SEND)) {
00215 LOG("could not completely clear buffer clog.");
00216         return true;
00217       }
00218 LOG("cleared out buffer clog.");
00219     }
00220 
00221     int any_left = true;
00222     while (actions++ < MAXIMUM_ACTIONS_PER_CLIENT) {
00223       // make sure we're not wasting our time.
00224       if (!_octo->responses().items_held()) {
00225         any_left = false;
00226         break;
00227       }
00228       // make sure we don't ignore receptions.
00229       grab_anything(false);
00230       // try to grab a result for this entity.
00231       int num_located = _octo->responses().acquire_for_entity(ent(),
00232           _waiting, MAXIMUM_SIZE_BATCH);
00233       if (!num_located) {
00234         any_left = false;
00235         break;
00236       }
00237 
00238 #ifndef OMIT_CRYPTO_SUPPORT
00239       // if we're encrypting, we need to wrap these as well.
00240       if (_parent.encrypting()) {
00241         for (int i = 0; i < _waiting.elements(); i++) {
00242           infoton *curr = _waiting[i]->_data;
00243           infoton *processed = _parent.wrap_infoton(curr,
00244               _waiting[i]->_id._entity);
00245           if (processed) _waiting[i]->_data = processed;  // replace infoton.
00246         }
00247       }
00248 #endif
00249 
00250       outcome ret = pack_and_ship(_waiting, 0);
00251         // no attempt to send yet; we're just stuffing the buffer.
00252       if ( (ret != cromp_common::OKAY) && (ret != cromp_common::TIMED_OUT) ) {
00253 //hmmm: what about keeping transmission as held in list; retry later on it?
00254 
00255 //#ifdef DEBUG_CROMP_SERVER
00256         LOG(istring("failed to send package back to client: ")
00257             + cromp_common::outcome_name(ret));
00258 //#endif
00259         any_left = false;
00260         break;
00261       }
00262 
00263       if (pending_sends() > SEND_THRESHOLD) {
00264 #ifdef DEBUG_CROMP_SERVER
00265         LOG(istring("over sending threshold on ") + _ent.text_form());
00266 #endif
00267         push_outgoing(SEND_TRIES_ALLOWED);
00268       }
00269 
00270     }
00271     // now that we've got a pile possibly, we'll try to send them out.
00272     push_outgoing(SEND_TRIES_ALLOWED);
00273     if (!spock()->connected()) {
00274 #ifdef DEBUG_CROMP_SERVER
00275       LOG("noticed disconnection of client.");
00276 #endif
00277       _still_connected = false;
00278     }
00279     return any_left;
00280   }
00281 
00282   bool get_incoming_data(int &actions) {
00283     FUNCDEF("get_incoming_data");
00284     if (!healthy()) return false;
00285     int first_one = true;
00286     bool saw_something = false;  // true if we got a packet.
00287     while (actions++ < MAXIMUM_ACTIONS_PER_CLIENT) {
00288       // pull in anything waiting.
00289       infoton *item = NIL;
00290       octopus_request_id req_id;
00291       outcome ret = retrieve_and_restore_any(item, req_id,
00292           first_one? DATA_AWAIT_TIMEOUT : 0);
00293       first_one = false;
00294       if (ret == cromp_common::TIMED_OUT) {
00295         actions--;  // didn't actually eat one.
00296         return false;
00297       } else if (ret != cromp_common::OKAY) {
00298 #ifdef DEBUG_CROMP_SERVER
00299         LOG(istring("got error ") + cromp_common::outcome_name(ret));
00300 #endif
00301         if (ret == cromp_common::NO_CONNECTION) {
00302 #ifdef DEBUG_CROMP_SERVER
00303           LOG("noticed disconnection of client.");
00304 #endif
00305           _still_connected = false;
00306         }
00307         actions--;  // didn't actually eat one.
00308         return false;  // get outa here.
00309       }
00310       // got a packet.
00311       saw_something = true;
00312       if (!_fixated) {
00313         if (req_id._entity.blank()) {
00314           LOG(istring("would have assigned ours to blank id! ")
00315               + req_id._entity.mangled_form());
00316           WHACK(item);
00317           continue;
00318         }
00319 #ifdef DEBUG_CROMP_SERVER
00320         LOG(istring("cmd with entity ") + req_id._entity.mangled_form());
00321 #endif
00322         if (_ent.blank()) {
00323           // assign the entity id now that we know it.
00324           _ent = req_id._entity;
00325 #ifdef DEBUG_CROMP_SERVER
00326           LOG(istring("assigned own entity to ") + _ent.mangled_form());
00327 #endif
00328         } else if (!_fixated && (_ent != req_id._entity) ) {
00329 #ifdef DEBUG_CROMP_SERVER
00330           LOG(istring("fixated on entity of ") + req_id._entity.mangled_form()
00331               + " where we used to have " + _ent.mangled_form());
00332 #endif
00333           _ent = req_id._entity;
00334           _fixated = true;
00335         }
00336       }  // connects to line after debug just below.
00337 #ifdef DEBUG_CROMP_SERVER
00338         else if (_ent != req_id._entity) {
00339         // this checks the validity of the entity.
00340 #ifdef DEBUG_CROMP_SERVER
00341         LOG(istring("seeing wrong entity of ") + req_id._entity.mangled_form()
00342             + " when we fixated on " + _ent.mangled_form());
00343 #endif
00344         WHACK(item);
00345         continue;
00346       }
00347 #endif
00348       // check again so we make sure we're still healthy; could have changed
00349       // state while getting a command.
00350       if (!healthy()) {
00351         WHACK(item);
00352         continue;
00353       }
00354       string_array classif = item->classifier();
00355         // hang onto the classifier since the next time we get a chance, the
00356         // object might be destroyed.
00357 
00358       // we pass responsibility for this item over to the octopus.  that's why
00359       // we're not deleting it once evaluate gets the item.
00360       ret = _octo->evaluate(item, req_id, _parent.instantaneous());
00361       if (ret != tentacle::OKAY) {
00362 #ifdef DEBUG_CROMP_SERVER
00363         LOG(istring("failed to evaluate the infoton we got: ")
00364             + classif.text_form());
00365 #endif
00366 //hmmm: we have upgraded this response to be for all errors, since otherwise
00367 //      clients will just time out waiting for something that's never coming.
00368 
00369         // we do a special type of handling when the tentacle is missing.  this
00370         // is almost always because the wrong type of request is being sent to
00371         // a server, or the server didn't register for all the objects it is
00372         // supposed to handle.
00374 //#ifdef DEBUG_CROMP_SERVER
00375           LOG(istring("injecting unhandled note into response stream for ")
00376               + req_id.text_form() + ", got outcome " + outcome_name(ret));
00377 //#endif
00378           _parent.send_to_client(req_id,
00379               new unhandled_request(req_id, classif, ret));
00380             // this will always work, although it's not a surety that the
00381             // client actually still exists.  probably though, since we're
00382             // just now handling this request.
00384       }
00385     }
00386     return saw_something;  // keep going if we actually did anything good.
00387   }
00388 
00389 private:
00390   cromp_server &_parent;  // the object that owns this client.
00391   octopus *_octo;
00392   octopus_entity _ent;  // the entity by which we know this client.
00393   bool _healthy;  // reports our current state of happiness.
00394   bool _fixated;  // true if the entity id has become firm.
00395   cromp_data_grabber _grabber;  // the data grabbing thread.
00396   infoton_list _waiting;
00397     // used by the push_client_replies() method; allocated once to avoid churn.
00398   bool _still_connected;
00399     // set to true up until we notice that the client disconnected.
00400 };
00401 
00403 
00404 void cromp_data_grabber::perform_activity(void *)
00405 {
00406   FUNCDEF("perform_activity");
00407   while (!should_stop()) {
00408 //    time_stamp started;
00409     bool ret = _parent.handle_client_needs(*this);
00410 //    int duration = int(time_stamp().value() - started.value());
00411     if (!ret) {
00412       // they said to stop.
00413 #ifdef DEBUG_CROMP_SERVER
00414       LOG("done handling client needs.");
00415 #endif
00416       _octo->expunge(_parent.ent());
00417       break;
00418     }
00419   }
00420 }
00421 
00423 
00424 class cromp_client_list : public amorph<cromp_client_record>
00425 {
00426 public:
00427   int find(const octopus_entity &to_find) const {
00428     for (int i = 0; i < elements(); i++)
00429       if (to_find == get(i)->ent()) return i;
00430     return common::NOT_FOUND;
00431   }
00432 };
00433 
00435 
00436 class client_dropping_thread : public ithread
00437 {
00438 public:
00439   client_dropping_thread (cromp_server &parent)
00440   : ithread(DROPPING_INTERVAL),
00441     _parent(parent) {}
00442 
00443   void perform_activity(void *formal(ptr)) {
00444     FUNCDEF("perform_activity");
00445     _parent.drop_dead_clients(); 
00446   }
00447 
00448 private:
00449   cromp_server &_parent;  // we perform tricks for this object.
00450 };
00451 
00453 
00454 class connection_management_thread : public ithread
00455 {
00456 public:
00457   connection_management_thread(cromp_server &parent)
00458   : ithread(),
00459     _parent(parent) {}
00460 
00461   void perform_activity(void *formal(ptr)) {
00462     FUNCDEF("perform_activity");
00463     _parent.look_for_clients(*this); 
00464   }
00465 
00466 private:
00467   cromp_server &_parent;  // we perform tricks for this object.
00468 };
00469 
00471 
00472 #undef LOCK_LISTS
00473 #define LOCK_LISTS auto_synchronizer l(*_list_lock)
00474   // takes over access to the client list and root socket.
00475 
00476 cromp_server::cromp_server(const internet_address &where,
00477     int accepting_threads, bool instantaneous, int max_per_ent)
00478 : cromp_common(cromp_common::chew_hostname(where), max_per_ent),
00479   _clients(new cromp_client_list),
00480   _accepters(new thread_cabinet),
00481   _list_lock(new mutex),
00482   _next_droppage(new time_stamp(DEAD_CLIENT_CLEANING_INTERVAL)),
00483   _instantaneous(instantaneous),
00484   _where(new internet_address(where)),
00485   _accepting_threads(accepting_threads),
00486   _dropper(new client_dropping_thread(*this)),
00487   _enabled(false),
00488   _encrypt_arm(NIL),
00489   _default_security(new cromp_security)
00490 {
00491   FUNCDEF("constructor");
00492 }
00493  
00494 cromp_server::~cromp_server()
00495 {
00496   disable_servers();
00497   _encrypt_arm = NIL;
00498   WHACK(_accepters);
00499   WHACK(_dropper);
00500   WHACK(_clients);
00501   WHACK(_next_droppage);
00502   WHACK(_where);
00503   WHACK(_default_security);
00504   WHACK(_list_lock);
00505 }
00506 
00507 internet_address cromp_server::location() const { return *_where; }
00508 
00509 bool cromp_server::get_sizes(const octopus_entity &id, int &items, int &bytes)
00510 { return octo()->responses().get_sizes(id, items, bytes); }
00511 
00512 internet_address cromp_server::any_address(int port)
00513 {
00514   const byte any_list[] = { 0, 0, 0, 0 };
00515   return internet_address(byte_array(4, any_list), "", port);
00516 }
00517 
00518 istring cromp_server::responses_text_form() const
00519 { return octo()->responses().text_form(); }
00520 
00521 int cromp_server::DEFAULT_ACCEPTERS() {
00522   // default number of listening threads; this is the maximum number of mostly
00523   // simultaneous connections that the server can pick up at a time.
00524 #ifdef EMBEDDED_BUILD
00525   return 1;  // calm number of threads for embedded systems.
00526 #else
00527   return 7;  // others are not generally so limited on resources.
00528 #endif
00529 }
00530 
00531 #ifndef OMIT_CRYPTO_SUPPORT
00532 infoton *cromp_server::wrap_infoton(infoton * &request,
00533     const octopus_entity &ent)
00534 {
00535   FUNCDEF("wrap_infoton");
00536   // identity is not wrapped with encryption; we need to establish and identity
00537   // to talk on a distinct channel with the server.  even if that identity were
00538   // compromised, the interloper should still not be able to listen in on the
00539   // establishment of an encryption channel.  also, the encryption startup
00540   // itself is not encrypted and we don't want to re-encrypt the wrapper.
00541   if (dynamic_cast<identity_infoton *>(request)
00542       || dynamic_cast<encryption_infoton *>(request)
00543       || dynamic_cast<encryption_wrapper *>(request)) return NIL;
00544 
00545 #ifdef DEBUG_CROMP_SERVER
00546   LOG(istring("encrypting ") + request->text_form());
00547 #endif
00548 
00549   octenc_key_record *key = _encrypt_arm->keys().lock(ent);
00550     // lock here is released a bit down below.
00551   if (!key) {
00552     LOG(istring("failed to locate key for entity ") + ent.text_form());
00553     return NIL;
00554   }
00555   byte_array packed_request;
00556   infoton::fast_pack(packed_request, *request);
00557   WHACK(request);
00558   encryption_wrapper *to_return = new encryption_wrapper;
00559   key->_key.encrypt(packed_request, to_return->_wrapped);
00560   _encrypt_arm->keys().unlock(key);
00561   return to_return;
00562 }
00563 #endif
00564 
00565 void cromp_server::enable_servers(bool encrypt, cromp_security *security)
00566 {
00567   FUNCDEF("enable_servers");
00568   if (encrypt) {
00569 #ifndef OMIT_CRYPTO_SUPPORT
00570     // add the tentacles needed for encryption.
00571 #ifdef DEBUG_CROMP_SERVER
00572     LOG(istring("enabling encryption for ") + class_name()
00573         + " on " + _where->text_form());
00574 #endif
00575     _encrypt_arm = new encryption_tentacle;
00576     add_tentacle(_encrypt_arm, true);
00577     add_tentacle(new unwrapping_tentacle, false);
00578 #endif
00579   }
00580   if (security) {
00581     add_tentacle(new login_tentacle(*security), true);
00582   } else {
00583     add_tentacle(new login_tentacle(*_default_security), true);
00584   }
00585   open_common(*_where);  // open the common ground.
00586 
00587   accept_one_client(false);  // try first accept, no waiting.
00588 
00589 #ifdef DEBUG_CROMP_SERVER
00590   LOG(isprintf("adding %d accepting threads.", _accepting_threads));
00591 #endif
00592   for (int i = 0; i < _accepting_threads; i++) {
00593     // crank in a new thread and tell it yes on starting it.
00594     _accepters->add_thread(new connection_management_thread(*this), true, NIL);
00595   }
00596 
00597   _dropper->start(NIL);
00598   _enabled = true;
00599 }
00600 
00601 void cromp_server::disable_servers()
00602 {
00603   FUNCDEF("disable_servers");
00604   _dropper->stop();  // signal the thread to leave when it can.
00605   _accepters->stop_all();  // signal the accepting threads to exit.
00606   if (_clients) {
00607     LOCK_LISTS;
00608       // make sure no one rearranges or uses the client list while we're
00609       // working on it.
00610     for (int i = 0; i < _clients->elements(); i++) {
00611       // stop the client's activities before the big shutdown.
00612       cromp_client_record *cli = (*_clients)[i];
00613       if (cli) cli->croak();
00614     }
00615   }
00616 
00617   close_common();  // zap the socket so that our blocked waiters get woken up.
00618 
00619   // now finalize the shutdown.  we don't grab the lock because we don't want
00620   // a deadlock, but we also shouldn't need to grab the lock.  by here, we have
00621   // cancelled all threads, no new clients should be able to be added, and the
00622   // destruction of this list will ensure that each client's thread really is
00623   // stopped.
00624   WHACK(_clients);
00625 
00626   _enabled = false;  // record our defunctivity.
00627 }
00628 
00629 int cromp_server::clients() const
00630 {
00631   LOCK_LISTS;
00632   return _clients? _clients->elements() : 0;
00633 }
00634 
00635 bool cromp_server::find_entity(const octopus_entity &id,
00636     internet_address &found)
00637 {
00638   FUNCDEF("find_entity");
00639   found = internet_address();
00640   LOCK_LISTS;
00641   int indy = _clients->find(id);
00642   if (negative(indy)) return false;  // didn't find it.
00643   cromp_client_record *cli = (*_clients)[indy];
00644     // pull out the address from the record at that index.
00645   found = cli->spock()->remote();
00646   return true;
00647 }
00648 
00649 outcome cromp_server::accept_one_client(bool wait)
00650 {
00651   FUNCDEF("accept_one_client");
00652   spocket *accepted = NIL;
00653 //printf((timestamp(true, true) + "into accept\n").s());
00654   outcome ret = spock()->accept(accepted, wait);
00655 //printf((timestamp(true, true) + "out of accept\n").s());
00656     // accept and wait for it to finish.
00657   if ( (ret == spocket::OKAY) && accepted) {
00658     // we got a new client to talk to.
00659     cromp_client_record *adding = new cromp_client_record(*this, accepted,
00660         octo());
00661 #ifdef DEBUG_CROMP_SERVER
00662     LOG(isprintf("found a new client on sock %d.", accepted->OS_socket()));
00663 #endif
00664     LOCK_LISTS;  // short term lock.
00665     _clients->append(adding);
00666     return OKAY;
00667   } else {
00668     if (ret == spocket::NO_CONNECTION)
00669       return NOT_FOUND;  // normal occurrence.
00670 #ifdef DEBUG_CROMP_SERVER
00671     LOG(istring("error accepting client: ") + spocket::outcome_name(ret));
00672 #endif
00673     return DISALLOWED;
00674   }
00675 }
00676 
00677 void cromp_server::look_for_clients(ithread &requestor)
00678 {
00679   FUNCDEF("look_for_clients");
00680   // see if any clients have been accepted.
00681   while (!requestor.should_stop()) {
00682     outcome ret = accept_one_client(false);
00683     if ( (ret != OKAY) && (ret != NOT_FOUND) ) {
00684       // we got an error condition besides our normal set.
00685 //#ifdef DEBUG_CROMP_SERVER
00686       LOG(istring("got real error on socket; leaving for good.")
00687           + spocket::outcome_name(ret));
00688 //#endif
00689       break;
00690     }
00691     // if we weren't told we got a client, then we'll sleep.  if we did get
00692     // a client, we'll try again right away.
00693     if (ret != OKAY)
00694       portable::sleep_ms(ACCEPTANCE_SNOOZE);
00695   }
00696 }
00697 
00698 outcome cromp_server::send_to_client(const octopus_request_id &id,
00699     infoton *data)
00700 {
00701   FUNCDEF("send_to_client");
00702   if (!octo()->responses().add_item(data, id)) {
00703 #ifdef DEBUG_CROMP_SERVER
00704     LOG("failed to store result for client--no space left currently.");
00705 #endif
00706     return TOO_FULL;
00707   }
00708   return OKAY;
00709 }
00710 
00711 outcome cromp_server::get_from_client(const octopus_request_id &id,
00712     infoton * &data, int timeout)
00713 {
00714   FUNCDEF("get_from_client");
00715 //hmmm: this implementation locks the lists; can't we get the client to do
00716 //      most of the work for this?
00717   LOCK_LISTS;
00718   int indy = _clients->find(id._entity);
00719   if (negative(indy)) return NOT_FOUND;  // didn't find it.
00720   cromp_client_record *cli = (*_clients)[indy];
00721   return cli->retrieve_and_restore(data, id, timeout);
00722 }
00723 
00724 void cromp_server::drop_dead_clients()
00725 {
00726   FUNCDEF("drop_dead_clients");
00727   // clean out any dead clients.
00728 
00729   {
00730     LOCK_LISTS;
00731     if (time_stamp() < *_next_droppage) return;  // not time yet.
00732   }
00733 
00734   LOCK_LISTS;  // keep locked from now on.
00735   for (int i = 0; i < _clients->elements(); i++) {
00736     cromp_client_record *cli = (*_clients)[i];
00737     if (!cli) {
00738 #ifdef DEBUG_CROMP_SERVER
00739       LOG(istring("error in list structure."));
00740 #endif
00741       _clients->zap(i, i);
00742       i--;   // skip back before deleted guy.
00743       continue;
00744     }
00745     if (!cli->still_connected() || !cli->healthy()) {
00746 #ifdef DEBUG_CROMP_SERVER
00747       LOG(istring("dropping disconnected client ") + cli->ent().mangled_form());
00748 #endif
00749       cli->croak();  // stop it from operating.
00750 
00751 //hmmm: check if it has data waiting and complain about it perhaps.
00752       _clients->zap(i, i);
00753       i--;   // skip back before deleted guy.
00754       continue;
00755     }
00756   }
00757 
00758   _next_droppage->reset(DEAD_CLIENT_CLEANING_INTERVAL);
00759 }
00760 
00761 
00762 #endif //CROMP_SERVER_IMPLEMENTATION_FILE
00763 

Generated on Thu Aug 28 04:32:49 2008 for HOOPLE Libraries by  doxygen 1.5.1