socket_minder.cpp

Go to the documentation of this file.
00001 #ifndef SOCKET_MINDER_IMPLEMENTATION_FILE
00002 #define SOCKET_MINDER_IMPLEMENTATION_FILE
00003 
00004 /*****************************************************************************\
00005 *                                                                             *
00006 *  Name   : socket_minder                                                     *
00007 *  Author : Chris Koeritz                                                     *
00008 *                                                                             *
00009 *******************************************************************************
00010 * Copyright (c) 1999-$now By Author.  This program is free software; you can  *
00011 * redistribute it and/or modify it under the terms of the GNU General Public  *
00012 * License as published by the Free Software Foundation; either version 2 of   *
00013 * the License or (at your option) any later version.  This is online at:      *
00014 *     http://www.fsf.org/copyleft/gpl.html                                    *
00015 * Please send any updates to: fred@gruntose.com                               *
00016 \*****************************************************************************/
00017 
00018 #include "raw_socket.h"
00019 #include "socket_data.h"
00020 #include "socket_minder.h"
00021 #include "tcpip_definitions.h"
00022 #include "tcpip_stack.h"
00023 
00024 #include <basis/mutex.h>
00025 #include <basis/log_base.h>
00026 #include <basis/portable.h>
00027 #include <basis/set.cpp>
00028 #include <data_struct/amorph.cpp>
00029 #include <data_struct/unique_id.h>
00030 #include <mechanisms/ithread.h>
00031 #include <post_office/mailbox.h>
00032 #include <post_office/os_event.h>
00033 #include <post_office/post_office.h>
00034 
00035 #include <errno.h>
00036 #ifdef __WIN32__
00037   #include <ws2tcpip.h>
00038 #endif
00039 #ifdef __UNIX__
00040   #include <arpa/inet.h>
00041   #include <sys/socket.h>
00042 #endif
00043 
00044 //#define DEBUG_SOCKET_MINDER
00045   // uncomment for noisiness.
00046 
00047 #undef LOG
00048 #define LOG(to_print) CLASS_EMERGENCY_LOG(program_wide_logger(), to_print)
00049 
00050 const int SOCKET_CHECK_INTERVAL = 50;
00051   // we will scoot around in our sockets this frequently.
00052 
00053 const int SOCKMIND_MAXIMUM_RECEIVES = 10;
00054   // we'll receive this many items from the socket in one go.
00055 
00056 const int MAXIMUM_TRANSFER_CHUNK = 512 * KILOBYTE;
00057   // largest block of data we'll try to deal with at one time.
00058 
00059 const int CONN_ALERT_INTERVAL = 100;
00060   // this is the most frequently that we will generate a connection checking
00061   // event.
00062 
00063 const int MULTI_SELECT_TIMEOUT = 250;
00064   // the snooze on select will occur for this long.  during this interval,
00065   // it is likely no new sockets will be considered.
00066 
00068 
00069 class socket_data_amorph : public amorph<socket_data> {};
00070 
00072 
00073 class socket_minder_prompter : public ithread
00074 {
00075 public:
00076   socket_minder_prompter(socket_minder &parent)
00077   : ithread(SOCKET_CHECK_INTERVAL, ithread::SLACK_INTERVAL),
00078     _parent(parent)
00079   {
00080     start(NIL);
00081   }
00082 
00083   ~socket_minder_prompter() {
00084     stop();  // shut down our thread.
00085   }
00086 
00087   virtual void perform_activity(void *formal(ptr)) { _parent.snoozy_select(); }
00088 
00089 private:
00090   socket_minder &_parent;  // the object we're hooked to.
00091 };
00092 
00094 
00095 socket_minder::socket_minder(post_office &post, int parent_route,
00096     int event_type, int message)
00097 : _post(post),
00098   _parent_route(parent_route),
00099   _event_type(event_type),
00100   _lock(new mutex),
00101   _socket_list(new socket_data_amorph),
00102   _socks(new raw_socket),
00103   _stack(new tcpip_stack),
00104   _message(message),
00105   _pending_sox(new int_set),
00106   _prompter(new socket_minder_prompter(*this))
00107 {
00108   _prompter->start(NIL);
00109 }
00110 
00111 socket_minder::~socket_minder()
00112 {
00113   _prompter->stop();
00114   WHACK(_prompter);
00115   WHACK(_socket_list);
00116   WHACK(_lock);
00117   WHACK(_pending_sox);
00118   WHACK(_socks); 
00119   WHACK(_stack);
00120 }
00121 
00122 void socket_minder::disengage()
00123 {
00124   _prompter->stop();
00125 }
00126 
00127 istring socket_minder::text_form() const
00128 {
00129   auto_synchronizer l(*_lock);
00130   istring to_return;
00131 
00132   for (int i = 0; i < _socket_list->elements(); i++) {
00133     const socket_data *curr = _socket_list->get(i);
00134     to_return += curr->text_form();
00135     if (i != _socket_list->elements() - 1)
00136       to_return += log_base::platform_ending();
00137   }
00138 
00139   return to_return;
00140 }
00141 
00142 void socket_minder::snoozy_select()
00143 {
00144   FUNCDEF("snoozy_select");
00145   int_array read_sox;
00146   int_array write_sox;
00147   int_array pending;
00148 
00149   get_sockets(read_sox, write_sox, pending);
00150 
00151   // process any with pending connections right now, rather than later.
00152   for (int p = 0; p < pending.length(); p++) {
00153     socket_data *sd = lock_socket_data(pending[p]);
00154     if (!sd) continue;  // something hosed there.
00155     handle_pending_connecters(*sd);
00156     unlock_socket_data(sd);
00157   }
00158 
00159   // now select on all of our sockets simultaneously.
00160   int ret = _socks->select(read_sox, write_sox, MULTI_SELECT_TIMEOUT);
00161   if (!ret || (!read_sox.length() && !write_sox.length()) ) {
00162     return;  // nothing happened.
00163   }
00164 
00165   // rotate through the lists and push socket_minders around as needed.
00166   // any sockets we have events for but no socket_data are orphans and will
00167   // be ignored.
00168 
00169   // check read sockets.
00170   for (int r = 0; r < read_sox.length(); r++) {
00171     const int sock = read_sox[r];
00172     if (owns_socket(sock)) {
00173       socket_data *sd = lock_socket_data(sock);
00174       if (!sd) continue;  // something hosed there.
00175       push_receives(*sd, SI_READABLE);
00176       unlock_socket_data(sd);
00177       read_sox.zap(r, r);
00178       r--;  // skip back before deleted guy.
00179     }
00180   }
00181 
00182   // check write sockets.
00183   for (int w = 0; w < write_sox.length(); w++) {
00184     const int sock = write_sox[w];
00185     if (owns_socket(sock)) {
00186       socket_data *sd = lock_socket_data(sock);
00187       if (!sd) continue;  // something hosed there.
00188       push_sends(*sd, SI_WRITABLE);
00189       unlock_socket_data(sd);
00190       write_sox.zap(w, w);
00191       w--;  // skip back before deleted guy.
00192     }
00193   }
00194 }
00195 
00196 void socket_minder::get_sockets(int_array &read_sox, int_array &write_sox,
00197     int_array &pendings) const
00198 {
00199   auto_synchronizer l(*_lock);
00200   for (int i = 0; i < _socket_list->elements(); i++) {
00201     socket_data *sd = _socket_list->borrow(i);
00202     if (sd->_connection_pending) {
00203       // this is not ready for sends and receives yet.
00204       pendings += sd->_socket;
00205     } else {
00206       // always add sockets to test if they have data waiting.
00207       read_sox += sd->_socket;
00208       // only check on writability if there is data pending for sending.
00209       if (sd->_partially_sent.length())
00210         write_sox += sd->_socket;
00211     }
00212   }
00213 }
00214 
00215 bool socket_minder::owns_socket(int socket) const
00216 {
00217   auto_synchronizer l(*_lock);
00218   for (int i = 0; i < _socket_list->elements(); i++) {
00219     if (_socket_list->borrow(i)->_socket == socket) return true;
00220   }
00221   return false;
00222 }
00223 
00224 socket_data *socket_minder::lock_socket_data(int socket)
00225 {
00226   _lock->lock();
00227   for (int i = 0; i < _socket_list->elements(); i++)
00228     if (_socket_list->borrow(i)->_socket == socket)
00229       return _socket_list->borrow(i);
00230   // this is a failure to get here; there was no suitable socket.
00231   _lock->unlock();
00232   return NIL;
00233 }
00234 
00235 void socket_minder::unlock_socket_data(socket_data *to_unlock)
00236 {
00237   if (!to_unlock) return;
00238 //can't affect it now.  to_unlock = NIL;
00239   _lock->unlock();
00240 }
00241 
00242 bool socket_minder::add_socket_data(int socket, bool server, int server_socket,
00243     bool connected_mode, bool connection_pending)
00244 {
00245   auto_synchronizer l(*_lock);
00246   socket_data *harpo = lock_socket_data(socket);
00247   if (harpo) {
00248     unlock_socket_data(harpo);
00249     return false;
00250   }
00251   socket_data *new_data = new socket_data(socket, server, server_socket,
00252       connected_mode);
00253   _socks->set_non_blocking(socket);
00254     // ensure the new guy is treated as non-blocking.  unix does not seem
00255     // to inherit this from the parent.
00256   new_data->_connection_pending = connection_pending;
00257   _socket_list->append(new_data);
00258   return true;
00259 }
00260 
00261 bool socket_minder::remove_socket_data(int socket)
00262 {
00263   FUNCDEF("remove_socket_data");
00264   auto_synchronizer l(*_lock);
00265   for (int i = 0; i < _socket_list->elements(); i++) {
00266     if (_socket_list->borrow(i)->_socket == socket) {
00267       // give the socket a last chance to get its data out.
00268       evaluate_interest(*_socket_list->borrow(i));
00269       _socket_list->zap(i, i);
00270       return true;
00271     }
00272   }
00273 //  LOG(isprintf("couldn't find socket %d.", socket));
00274   return false;
00275 }
00276 
00277 bool socket_minder::register_interest(int socket, int interests)
00278 {
00279   FUNCDEF("register_interest");
00280   socket_data *harpo = lock_socket_data(socket);
00281 #ifdef DEBUG_SOCKET_MINDER
00282   LOG(istring(istring::SPRINTF, "registering interest of %d for socket "
00283       "%d.", interests, socket));
00284 #endif
00285   if (!harpo) return false;
00286   harpo->_registered_interests = interests;
00287   unlock_socket_data(harpo);
00288   return true;
00289 }
00290 
00291 bool socket_minder::is_connection_pending(int socket)
00292 {
00293   socket_data *harpo = lock_socket_data(socket);
00294   if (!harpo) return false;
00295   bool to_return = harpo->_connection_pending;
00296   unlock_socket_data(harpo);
00297   return to_return;
00298 }
00299 
00300 bool socket_minder::set_connection_pending(int socket, bool pending)
00301 {
00302   socket_data *harpo = lock_socket_data(socket);
00303   if (!harpo) return false;
00304   harpo->_connection_pending = pending;
00305   unlock_socket_data(harpo);
00306   return true;
00307 }
00308 
00309 void socket_minder::fire_event(int to_fire, int at_whom,
00310     u_int parm1, u_int parm2)
00311 {
00312   _post.drop_off(at_whom, new OS_event(_event_type, to_fire, parm1, parm2));
00313 }
00314 
00315 void socket_minder::put_pending_server(int to_put, bool at_head)
00316 {
00317   if (!to_put) return;  // bogus.
00318   auto_synchronizer l(*_lock);
00319   if (at_head) {
00320     _pending_sox->insert(0, 1);
00321     (*_pending_sox)[0] = to_put;
00322   } else {
00323     *_pending_sox += to_put;
00324   }
00325 }
00326 
00327 bool socket_minder::zap_pending_server(int socket)
00328 {
00329   auto_synchronizer l(*_lock);
00330   if (!_pending_sox->member(socket)) return false;
00331   _pending_sox->remove(socket);
00332   return true;
00333 }
00334 
00335 int socket_minder::get_pending_server()
00336 {
00337   auto_synchronizer l(*_lock);
00338   if (!_pending_sox->length()) return 0;
00339   int to_return = _pending_sox->get(0);
00340   _pending_sox->zap(0, 0);
00341   return to_return;
00342 }
00343 
00344 bool socket_minder::handle_pending_connecters(socket_data &to_peek)
00345 {
00346   FUNCDEF("handle_pending_connecters");
00347   if (!to_peek._connection_pending) return false;  // not needed here.
00348 
00349   if (to_peek._last_conn_alert > time_stamp(-CONN_ALERT_INTERVAL)) {
00350     // not time yet.
00351     return false;
00352   }
00353   to_peek._last_conn_alert.reset();
00354 
00355   // force the issue; there is no simple way to portably know whether
00356   // the socket got a connection or not, so just say it did.
00357   if (!to_peek._is_server
00358       && (to_peek._registered_interests & SI_CONNECTED) ) {
00359     // deal with a client first.  this just means, zing an event.
00360 #ifdef DEBUG_SOCKET_MINDER
00361     LOG(isprintf("sending client SI_CONNECTED event on parent %d",
00362         _parent_route));
00363 #endif
00364     fire_event(_message, _parent_route, to_peek._socket, SI_CONNECTED);
00365   } else if (to_peek._is_server
00366       && (to_peek._registered_interests & SI_CONNECTED) ) {
00367     // special handling for servers.  we accept the waiting guy if he's
00368     // there.  otherwise we don't send the event.
00369     sockaddr sock_addr;
00370     socklen_t sock_len = sizeof(sock_addr);
00371     int new_sock = int(::accept(to_peek._socket, &sock_addr, &sock_len));
00372       // check for a new socket.
00373     if (new_sock != INVALID_SOCKET) {
00374       LOG(isprintf("accept got a client socket %d.", new_sock));
00375       if (_pending_sox->member(new_sock)) {
00376         LOG(isprintf("already have seen socket %d in pending!", new_sock));
00377       } else {
00378         *_pending_sox += new_sock;
00379 #ifdef DEBUG_SOCKET_MINDER
00380         LOG(isprintf("sending server SI_CONNECTED event on parent %d",
00381             _parent_route));
00382 #endif
00383         fire_event(_message, _parent_route, to_peek._socket, SI_CONNECTED);
00384       }
00385     } else if (_pending_sox->length()) {
00386       // there are still pending connectees.
00387       fire_event(_message, _parent_route, to_peek._socket, SI_CONNECTED);
00388     }
00389   }
00390   // also, if the connection is still pending, we don't want to select on
00391   // it yet.
00392   return true;
00393 }
00394 
00395 bool socket_minder::evaluate_interest(socket_data &to_peek)
00396 {
00397   FUNCDEF("evaluate_interest");
00398   if (to_peek._connection_pending) {
00399     return handle_pending_connecters(to_peek);
00400   }
00401 
00402   int sel_mode = 0;
00403 
00404   int states = _socks->select(to_peek._socket, sel_mode);
00405 
00406   if (!states) {
00407     return true;  // nothing to report.
00408   }
00409 
00410   if (! (states & SI_ERRONEOUS) && ! (states & SI_DISCONNECTED) ) {
00411     push_sends(to_peek, states);
00412     push_receives(to_peek, states);
00413   }
00414 
00415   if ( (to_peek._registered_interests & SI_ERRONEOUS)
00416       && (states & SI_ERRONEOUS) ) {
00417     // there's some kind of bad problem on the socket.
00418     LOG(isprintf("socket %d has status of erroneous.", to_peek._socket));
00419 //hmmm: what to do?  generate an event?
00420   }
00421 
00422   if ( (to_peek._registered_interests & SI_DISCONNECTED)
00423       && (states & SI_DISCONNECTED) ) {
00424     // we lost our state of connectedness.
00425     fire_event(_message, _parent_route, to_peek._socket,
00426         SI_DISCONNECTED);
00427     return true;  // get out now.
00428   }
00429 
00430   return true;
00431 }
00432 
00433 void socket_minder::push_sends(socket_data &to_poke, int states)
00434 {
00435   FUNCDEF("push_sends");
00436   if (to_poke._connection_pending) {
00437     LOG("not supposed to try this when not connected yet...");
00438   }
00439 
00440 #ifdef DEBUG_SOCKET_MINDER
00441   if (to_poke._partially_sent.length()) {
00442     LOG(isprintf("socket %d: %d bytes to send.", to_poke._socket,
00443         to_poke._partially_sent.length()));
00444   }
00445 #endif
00446 
00447   int error_code = 0;
00448 
00449   if ( (states & SI_WRITABLE) && to_poke._partially_sent.length()) {
00450     // write to the socket since we see an opportunity.
00451     byte_array &to_send = to_poke._partially_sent;
00452     int len_sent = send(to_poke._socket, (char *)to_send.observe(),
00453         to_send.length(), 0);
00454     if (!len_sent) {
00455       // nothing got sent.
00456       LOG(isprintf("didn't send any data on socket %d.", to_poke._socket));
00457     } else if (len_sent == SOCKET_ERROR) {
00458       // something bad happened.
00459       error_code = portable::system_error();
00460     } else {
00461 #ifdef DEBUG_SOCKET_MINDER
00462       LOG(isprintf("updating that %d bytes got sent out of %d to send.",
00463           len_sent, to_send.length()));
00464       if (to_send.length() != len_sent)
00465         LOG(isprintf("size to send (%d) not same as size sent (%d).",
00466             to_send.length(), len_sent));
00467 #endif
00468       // update the partially sent chunk for the bit we sent out.
00469       to_send.zap(0, len_sent - 1);
00470     }
00471   }
00472 
00473   // handle errors we have seen.
00474   if (error_code) {
00475     if (error_code != SOCK_EWOULDBLOCK)
00476       LOG(istring(istring::SPRINTF, "error on socket %d is %s.",
00477           to_poke._socket, _stack->tcpip_error_name(error_code).s()));
00478 
00479     switch (error_code) {
00480       case SOCK_ENOTSOCK:  // fall-through.
00481       case SOCK_ECONNABORTED:  // fall-through.
00482       case SOCK_ECONNRESET: {
00483         // the connection got hammerlocked somehow.
00484         LOG(isprintf("due to %s condition, closing socket %d.",
00485             _stack->tcpip_error_name(error_code).s(), to_poke._socket));
00486         fire_event(_message, _parent_route, to_poke._socket,
00487             SI_DISCONNECTED);
00488         to_poke._partially_sent.reset();  // clear with no connection.
00489         break;
00490       }
00491     }
00492   }
00493 }
00494 
00495 void socket_minder::push_receives(socket_data &to_poke, int states)
00496 {
00497   FUNCDEF("push_receives");
00498   if (to_poke._connection_pending) {
00499     LOG("not supposed to try this when not connected yet...");
00500   }
00501 
00502 #ifdef DEBUG_SOCKET_MINDER
00503   if (to_poke._partially_received.length())
00504     LOG(isprintf("socket %d: %d bytes already received.", to_poke._socket,
00505         to_poke._partially_received.length()));
00506 #endif
00507 
00508   int error_code = 0;
00509 
00510   if ( (states & SI_READABLE) && to_poke._connected_mode) {
00511     // grab any data that's waiting on the connection-oriented socket.
00512 
00513     bool got_something = true;  // hopeful for now.
00514     to_poke._receive_buffer.reset(MAXIMUM_TRANSFER_CHUNK);
00515       // allocate space where we'll get new data.
00516     int count = 0;
00517     while (got_something && (count++ < SOCKMIND_MAXIMUM_RECEIVES)) {
00518       got_something = false;  // now get all pessimistic.
00519       int len = recv(to_poke._socket, (char *)to_poke._receive_buffer.observe(),
00520           to_poke._receive_buffer.length(), 0);
00521       if (len > 0) {
00522 #ifdef DEBUG_SOCKET_MINDER
00523         LOG(isprintf("received %d bytes on socket %d.", len, to_poke._socket));
00524 #endif
00525         // we received some actual data; we're happy again.
00526         got_something = true;
00527         // zap any extra off.
00528         if (len < MAXIMUM_TRANSFER_CHUNK)
00529           to_poke._receive_buffer.zap(len, to_poke._receive_buffer.last());
00530         // add in what we were given.
00531         to_poke._partially_received += to_poke._receive_buffer;
00532         to_poke._receive_buffer.reset(MAXIMUM_TRANSFER_CHUNK);
00533           // reset to the right size for trying some more.
00534       } else {
00535         error_code = portable::system_error();
00536 
00537         // reset the states flag based on current state.
00538         states = _socks->select(to_poke._socket,
00539             raw_socket::SELECTING_JUST_READ);
00540         if (states & SI_DISCONNECTED) {
00541           error_code = SOCK_ECONNRESET;  // make like regular disconnect.
00542           LOG(isprintf("noticed disconnection on socket %d.",
00543               to_poke._socket));
00544         }
00545 
00546       }
00547     }
00548 
00549     if ( !(states & SI_DISCONNECTED)
00550         && to_poke._partially_received.length()) { 
00551 #ifdef DEBUG_SOCKET_MINDER
00552       LOG(isprintf("firing readable now: sock=%d", to_poke._socket));
00553 #endif
00554       fire_event(_message, _parent_route, to_poke._socket, SI_READABLE);
00555     }
00556   } else if ( (states & SI_READABLE) && !to_poke._connected_mode) {
00557     // datagram style; we need to still alert the parent.
00558     fire_event(_message, _parent_route, to_poke._socket,
00559         SI_READABLE);
00560   }
00561 
00562   // handle errors we have seen.
00563   if (error_code) {
00564     if (error_code != SOCK_EWOULDBLOCK)
00565       LOG(istring(istring::SPRINTF, "error on socket %d is %s.",
00566           to_poke._socket, _stack->tcpip_error_name(error_code).s()));
00567 
00568     switch (error_code) {
00569       case SOCK_ENOTSOCK:  // fall-through.
00570       case SOCK_ECONNABORTED:  // fall-through.
00571       case SOCK_ECONNRESET: {
00572         // the connection got hammerlocked somehow.
00573         LOG(isprintf("due to %s condition, closing socket %d.",
00574             _stack->tcpip_error_name(error_code).s(), to_poke._socket));
00575         fire_event(_message, _parent_route, to_poke._socket,
00576             SI_DISCONNECTED);
00577         to_poke._partially_sent.reset();  // clear with no connection.
00578         break;
00579       }
00580     }
00581   }
00582 }
00583 
00584 
00585 #endif //SOCKET_MINDER_IMPLEMENTATION_FILE
00586 

Generated on Fri Oct 10 04:28:34 2008 for HOOPLE Libraries by  doxygen 1.5.1