00001 #ifndef SOCKET_MINDER_IMPLEMENTATION_FILE
00002 #define SOCKET_MINDER_IMPLEMENTATION_FILE
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018 #include "raw_socket.h"
00019 #include "socket_data.h"
00020 #include "socket_minder.h"
00021 #include "tcpip_definitions.h"
00022 #include "tcpip_stack.h"
00023
00024 #include <basis/mutex.h>
00025 #include <basis/log_base.h>
00026 #include <basis/portable.h>
00027 #include <basis/set.cpp>
00028 #include <data_struct/amorph.cpp>
00029 #include <data_struct/unique_id.h>
00030 #include <mechanisms/ithread.h>
00031 #include <post_office/mailbox.h>
00032 #include <post_office/os_event.h>
00033 #include <post_office/post_office.h>
00034
00035 #include <errno.h>
00036 #ifdef __WIN32__
00037 #include <ws2tcpip.h>
00038 #endif
00039 #ifdef __UNIX__
00040 #include <arpa/inet.h>
00041 #include <sys/socket.h>
00042 #endif
00043
00044
00045
00046
00047 #undef LOG
00048 #define LOG(to_print) CLASS_EMERGENCY_LOG(program_wide_logger(), to_print)
00049
00050 const int SOCKET_CHECK_INTERVAL = 50;
00051
00052
00053 const int SOCKMIND_MAXIMUM_RECEIVES = 10;
00054
00055
00056 const int MAXIMUM_TRANSFER_CHUNK = 512 * KILOBYTE;
00057
00058
00059 const int CONN_ALERT_INTERVAL = 100;
00060
00061
00062
00063 const int MULTI_SELECT_TIMEOUT = 250;
00064
00065
00066
00068
00069 class socket_data_amorph : public amorph<socket_data> {};
00070
00072
00073 class socket_minder_prompter : public ithread
00074 {
00075 public:
00076 socket_minder_prompter(socket_minder &parent)
00077 : ithread(SOCKET_CHECK_INTERVAL, ithread::SLACK_INTERVAL),
00078 _parent(parent)
00079 {
00080 start(NIL);
00081 }
00082
00083 ~socket_minder_prompter() {
00084 stop();
00085 }
00086
00087 virtual void perform_activity(void *formal(ptr)) { _parent.snoozy_select(); }
00088
00089 private:
00090 socket_minder &_parent;
00091 };
00092
00094
00095 socket_minder::socket_minder(post_office &post, int parent_route,
00096 int event_type, int message)
00097 : _post(post),
00098 _parent_route(parent_route),
00099 _event_type(event_type),
00100 _lock(new mutex),
00101 _socket_list(new socket_data_amorph),
00102 _socks(new raw_socket),
00103 _stack(new tcpip_stack),
00104 _message(message),
00105 _pending_sox(new int_set),
00106 _prompter(new socket_minder_prompter(*this))
00107 {
00108 _prompter->start(NIL);
00109 }
00110
00111 socket_minder::~socket_minder()
00112 {
00113 _prompter->stop();
00114 WHACK(_prompter);
00115 WHACK(_socket_list);
00116 WHACK(_lock);
00117 WHACK(_pending_sox);
00118 WHACK(_socks);
00119 WHACK(_stack);
00120 }
00121
00122 void socket_minder::disengage()
00123 {
00124 _prompter->stop();
00125 }
00126
00127 istring socket_minder::text_form() const
00128 {
00129 auto_synchronizer l(*_lock);
00130 istring to_return;
00131
00132 for (int i = 0; i < _socket_list->elements(); i++) {
00133 const socket_data *curr = _socket_list->get(i);
00134 to_return += curr->text_form();
00135 if (i != _socket_list->elements() - 1)
00136 to_return += log_base::platform_ending();
00137 }
00138
00139 return to_return;
00140 }
00141
00142 void socket_minder::snoozy_select()
00143 {
00144 FUNCDEF("snoozy_select");
00145 int_array read_sox;
00146 int_array write_sox;
00147 int_array pending;
00148
00149 get_sockets(read_sox, write_sox, pending);
00150
00151
00152 for (int p = 0; p < pending.length(); p++) {
00153 socket_data *sd = lock_socket_data(pending[p]);
00154 if (!sd) continue;
00155 handle_pending_connecters(*sd);
00156 unlock_socket_data(sd);
00157 }
00158
00159
00160 int ret = _socks->select(read_sox, write_sox, MULTI_SELECT_TIMEOUT);
00161 if (!ret || (!read_sox.length() && !write_sox.length()) ) {
00162 return;
00163 }
00164
00165
00166
00167
00168
00169
00170 for (int r = 0; r < read_sox.length(); r++) {
00171 const int sock = read_sox[r];
00172 if (owns_socket(sock)) {
00173 socket_data *sd = lock_socket_data(sock);
00174 if (!sd) continue;
00175 push_receives(*sd, SI_READABLE);
00176 unlock_socket_data(sd);
00177 read_sox.zap(r, r);
00178 r--;
00179 }
00180 }
00181
00182
00183 for (int w = 0; w < write_sox.length(); w++) {
00184 const int sock = write_sox[w];
00185 if (owns_socket(sock)) {
00186 socket_data *sd = lock_socket_data(sock);
00187 if (!sd) continue;
00188 push_sends(*sd, SI_WRITABLE);
00189 unlock_socket_data(sd);
00190 write_sox.zap(w, w);
00191 w--;
00192 }
00193 }
00194 }
00195
00196 void socket_minder::get_sockets(int_array &read_sox, int_array &write_sox,
00197 int_array &pendings) const
00198 {
00199 auto_synchronizer l(*_lock);
00200 for (int i = 0; i < _socket_list->elements(); i++) {
00201 socket_data *sd = _socket_list->borrow(i);
00202 if (sd->_connection_pending) {
00203
00204 pendings += sd->_socket;
00205 } else {
00206
00207 read_sox += sd->_socket;
00208
00209 if (sd->_partially_sent.length())
00210 write_sox += sd->_socket;
00211 }
00212 }
00213 }
00214
00215 bool socket_minder::owns_socket(int socket) const
00216 {
00217 auto_synchronizer l(*_lock);
00218 for (int i = 0; i < _socket_list->elements(); i++) {
00219 if (_socket_list->borrow(i)->_socket == socket) return true;
00220 }
00221 return false;
00222 }
00223
00224 socket_data *socket_minder::lock_socket_data(int socket)
00225 {
00226 _lock->lock();
00227 for (int i = 0; i < _socket_list->elements(); i++)
00228 if (_socket_list->borrow(i)->_socket == socket)
00229 return _socket_list->borrow(i);
00230
00231 _lock->unlock();
00232 return NIL;
00233 }
00234
00235 void socket_minder::unlock_socket_data(socket_data *to_unlock)
00236 {
00237 if (!to_unlock) return;
00238
00239 _lock->unlock();
00240 }
00241
00242 bool socket_minder::add_socket_data(int socket, bool server, int server_socket,
00243 bool connected_mode, bool connection_pending)
00244 {
00245 auto_synchronizer l(*_lock);
00246 socket_data *harpo = lock_socket_data(socket);
00247 if (harpo) {
00248 unlock_socket_data(harpo);
00249 return false;
00250 }
00251 socket_data *new_data = new socket_data(socket, server, server_socket,
00252 connected_mode);
00253 _socks->set_non_blocking(socket);
00254
00255
00256 new_data->_connection_pending = connection_pending;
00257 _socket_list->append(new_data);
00258 return true;
00259 }
00260
00261 bool socket_minder::remove_socket_data(int socket)
00262 {
00263 FUNCDEF("remove_socket_data");
00264 auto_synchronizer l(*_lock);
00265 for (int i = 0; i < _socket_list->elements(); i++) {
00266 if (_socket_list->borrow(i)->_socket == socket) {
00267
00268 evaluate_interest(*_socket_list->borrow(i));
00269 _socket_list->zap(i, i);
00270 return true;
00271 }
00272 }
00273
00274 return false;
00275 }
00276
00277 bool socket_minder::register_interest(int socket, int interests)
00278 {
00279 FUNCDEF("register_interest");
00280 socket_data *harpo = lock_socket_data(socket);
00281 #ifdef DEBUG_SOCKET_MINDER
00282 LOG(istring(istring::SPRINTF, "registering interest of %d for socket "
00283 "%d.", interests, socket));
00284 #endif
00285 if (!harpo) return false;
00286 harpo->_registered_interests = interests;
00287 unlock_socket_data(harpo);
00288 return true;
00289 }
00290
00291 bool socket_minder::is_connection_pending(int socket)
00292 {
00293 socket_data *harpo = lock_socket_data(socket);
00294 if (!harpo) return false;
00295 bool to_return = harpo->_connection_pending;
00296 unlock_socket_data(harpo);
00297 return to_return;
00298 }
00299
00300 bool socket_minder::set_connection_pending(int socket, bool pending)
00301 {
00302 socket_data *harpo = lock_socket_data(socket);
00303 if (!harpo) return false;
00304 harpo->_connection_pending = pending;
00305 unlock_socket_data(harpo);
00306 return true;
00307 }
00308
00309 void socket_minder::fire_event(int to_fire, int at_whom,
00310 u_int parm1, u_int parm2)
00311 {
00312 _post.drop_off(at_whom, new OS_event(_event_type, to_fire, parm1, parm2));
00313 }
00314
00315 void socket_minder::put_pending_server(int to_put, bool at_head)
00316 {
00317 if (!to_put) return;
00318 auto_synchronizer l(*_lock);
00319 if (at_head) {
00320 _pending_sox->insert(0, 1);
00321 (*_pending_sox)[0] = to_put;
00322 } else {
00323 *_pending_sox += to_put;
00324 }
00325 }
00326
00327 bool socket_minder::zap_pending_server(int socket)
00328 {
00329 auto_synchronizer l(*_lock);
00330 if (!_pending_sox->member(socket)) return false;
00331 _pending_sox->remove(socket);
00332 return true;
00333 }
00334
00335 int socket_minder::get_pending_server()
00336 {
00337 auto_synchronizer l(*_lock);
00338 if (!_pending_sox->length()) return 0;
00339 int to_return = _pending_sox->get(0);
00340 _pending_sox->zap(0, 0);
00341 return to_return;
00342 }
00343
00344 bool socket_minder::handle_pending_connecters(socket_data &to_peek)
00345 {
00346 FUNCDEF("handle_pending_connecters");
00347 if (!to_peek._connection_pending) return false;
00348
00349 if (to_peek._last_conn_alert > time_stamp(-CONN_ALERT_INTERVAL)) {
00350
00351 return false;
00352 }
00353 to_peek._last_conn_alert.reset();
00354
00355
00356
00357 if (!to_peek._is_server
00358 && (to_peek._registered_interests & SI_CONNECTED) ) {
00359
00360 #ifdef DEBUG_SOCKET_MINDER
00361 LOG(isprintf("sending client SI_CONNECTED event on parent %d",
00362 _parent_route));
00363 #endif
00364 fire_event(_message, _parent_route, to_peek._socket, SI_CONNECTED);
00365 } else if (to_peek._is_server
00366 && (to_peek._registered_interests & SI_CONNECTED) ) {
00367
00368
00369 sockaddr sock_addr;
00370 socklen_t sock_len = sizeof(sock_addr);
00371 int new_sock = int(::accept(to_peek._socket, &sock_addr, &sock_len));
00372
00373 if (new_sock != INVALID_SOCKET) {
00374 LOG(isprintf("accept got a client socket %d.", new_sock));
00375 if (_pending_sox->member(new_sock)) {
00376 LOG(isprintf("already have seen socket %d in pending!", new_sock));
00377 } else {
00378 *_pending_sox += new_sock;
00379 #ifdef DEBUG_SOCKET_MINDER
00380 LOG(isprintf("sending server SI_CONNECTED event on parent %d",
00381 _parent_route));
00382 #endif
00383 fire_event(_message, _parent_route, to_peek._socket, SI_CONNECTED);
00384 }
00385 } else if (_pending_sox->length()) {
00386
00387 fire_event(_message, _parent_route, to_peek._socket, SI_CONNECTED);
00388 }
00389 }
00390
00391
00392 return true;
00393 }
00394
00395 bool socket_minder::evaluate_interest(socket_data &to_peek)
00396 {
00397 FUNCDEF("evaluate_interest");
00398 if (to_peek._connection_pending) {
00399 return handle_pending_connecters(to_peek);
00400 }
00401
00402 int sel_mode = 0;
00403
00404 int states = _socks->select(to_peek._socket, sel_mode);
00405
00406 if (!states) {
00407 return true;
00408 }
00409
00410 if (! (states & SI_ERRONEOUS) && ! (states & SI_DISCONNECTED) ) {
00411 push_sends(to_peek, states);
00412 push_receives(to_peek, states);
00413 }
00414
00415 if ( (to_peek._registered_interests & SI_ERRONEOUS)
00416 && (states & SI_ERRONEOUS) ) {
00417
00418 LOG(isprintf("socket %d has status of erroneous.", to_peek._socket));
00419
00420 }
00421
00422 if ( (to_peek._registered_interests & SI_DISCONNECTED)
00423 && (states & SI_DISCONNECTED) ) {
00424
00425 fire_event(_message, _parent_route, to_peek._socket,
00426 SI_DISCONNECTED);
00427 return true;
00428 }
00429
00430 return true;
00431 }
00432
00433 void socket_minder::push_sends(socket_data &to_poke, int states)
00434 {
00435 FUNCDEF("push_sends");
00436 if (to_poke._connection_pending) {
00437 LOG("not supposed to try this when not connected yet...");
00438 }
00439
00440 #ifdef DEBUG_SOCKET_MINDER
00441 if (to_poke._partially_sent.length()) {
00442 LOG(isprintf("socket %d: %d bytes to send.", to_poke._socket,
00443 to_poke._partially_sent.length()));
00444 }
00445 #endif
00446
00447 int error_code = 0;
00448
00449 if ( (states & SI_WRITABLE) && to_poke._partially_sent.length()) {
00450
00451 byte_array &to_send = to_poke._partially_sent;
00452 int len_sent = send(to_poke._socket, (char *)to_send.observe(),
00453 to_send.length(), 0);
00454 if (!len_sent) {
00455
00456 LOG(isprintf("didn't send any data on socket %d.", to_poke._socket));
00457 } else if (len_sent == SOCKET_ERROR) {
00458
00459 error_code = portable::system_error();
00460 } else {
00461 #ifdef DEBUG_SOCKET_MINDER
00462 LOG(isprintf("updating that %d bytes got sent out of %d to send.",
00463 len_sent, to_send.length()));
00464 if (to_send.length() != len_sent)
00465 LOG(isprintf("size to send (%d) not same as size sent (%d).",
00466 to_send.length(), len_sent));
00467 #endif
00468
00469 to_send.zap(0, len_sent - 1);
00470 }
00471 }
00472
00473
00474 if (error_code) {
00475 if (error_code != SOCK_EWOULDBLOCK)
00476 LOG(istring(istring::SPRINTF, "error on socket %d is %s.",
00477 to_poke._socket, _stack->tcpip_error_name(error_code).s()));
00478
00479 switch (error_code) {
00480 case SOCK_ENOTSOCK:
00481 case SOCK_ECONNABORTED:
00482 case SOCK_ECONNRESET: {
00483
00484 LOG(isprintf("due to %s condition, closing socket %d.",
00485 _stack->tcpip_error_name(error_code).s(), to_poke._socket));
00486 fire_event(_message, _parent_route, to_poke._socket,
00487 SI_DISCONNECTED);
00488 to_poke._partially_sent.reset();
00489 break;
00490 }
00491 }
00492 }
00493 }
00494
00495 void socket_minder::push_receives(socket_data &to_poke, int states)
00496 {
00497 FUNCDEF("push_receives");
00498 if (to_poke._connection_pending) {
00499 LOG("not supposed to try this when not connected yet...");
00500 }
00501
00502 #ifdef DEBUG_SOCKET_MINDER
00503 if (to_poke._partially_received.length())
00504 LOG(isprintf("socket %d: %d bytes already received.", to_poke._socket,
00505 to_poke._partially_received.length()));
00506 #endif
00507
00508 int error_code = 0;
00509
00510 if ( (states & SI_READABLE) && to_poke._connected_mode) {
00511
00512
00513 bool got_something = true;
00514 to_poke._receive_buffer.reset(MAXIMUM_TRANSFER_CHUNK);
00515
00516 int count = 0;
00517 while (got_something && (count++ < SOCKMIND_MAXIMUM_RECEIVES)) {
00518 got_something = false;
00519 int len = recv(to_poke._socket, (char *)to_poke._receive_buffer.observe(),
00520 to_poke._receive_buffer.length(), 0);
00521 if (len > 0) {
00522 #ifdef DEBUG_SOCKET_MINDER
00523 LOG(isprintf("received %d bytes on socket %d.", len, to_poke._socket));
00524 #endif
00525
00526 got_something = true;
00527
00528 if (len < MAXIMUM_TRANSFER_CHUNK)
00529 to_poke._receive_buffer.zap(len, to_poke._receive_buffer.last());
00530
00531 to_poke._partially_received += to_poke._receive_buffer;
00532 to_poke._receive_buffer.reset(MAXIMUM_TRANSFER_CHUNK);
00533
00534 } else {
00535 error_code = portable::system_error();
00536
00537
00538 states = _socks->select(to_poke._socket,
00539 raw_socket::SELECTING_JUST_READ);
00540 if (states & SI_DISCONNECTED) {
00541 error_code = SOCK_ECONNRESET;
00542 LOG(isprintf("noticed disconnection on socket %d.",
00543 to_poke._socket));
00544 }
00545
00546 }
00547 }
00548
00549 if ( !(states & SI_DISCONNECTED)
00550 && to_poke._partially_received.length()) {
00551 #ifdef DEBUG_SOCKET_MINDER
00552 LOG(isprintf("firing readable now: sock=%d", to_poke._socket));
00553 #endif
00554 fire_event(_message, _parent_route, to_poke._socket, SI_READABLE);
00555 }
00556 } else if ( (states & SI_READABLE) && !to_poke._connected_mode) {
00557
00558 fire_event(_message, _parent_route, to_poke._socket,
00559 SI_READABLE);
00560 }
00561
00562
00563 if (error_code) {
00564 if (error_code != SOCK_EWOULDBLOCK)
00565 LOG(istring(istring::SPRINTF, "error on socket %d is %s.",
00566 to_poke._socket, _stack->tcpip_error_name(error_code).s()));
00567
00568 switch (error_code) {
00569 case SOCK_ENOTSOCK:
00570 case SOCK_ECONNABORTED:
00571 case SOCK_ECONNRESET: {
00572
00573 LOG(isprintf("due to %s condition, closing socket %d.",
00574 _stack->tcpip_error_name(error_code).s(), to_poke._socket));
00575 fire_event(_message, _parent_route, to_poke._socket,
00576 SI_DISCONNECTED);
00577 to_poke._partially_sent.reset();
00578 break;
00579 }
00580 }
00581 }
00582 }
00583
00584
00585 #endif //SOCKET_MINDER_IMPLEMENTATION_FILE
00586