00001 #ifndef SPOCKET_IMPLEMENTATION_FILE
00002 #define SPOCKET_IMPLEMENTATION_FILE
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018 #include "address.h"
00019 #include "imp_sockets.h"
00020 #include "raw_socket.h"
00021 #include "spocket.h"
00022 #include "tcpip_definitions.h"
00023 #include "tcpip_stack.h"
00024
00025 #include <basis/byte_array.h>
00026 #include <basis/function.h>
00027 #include <basis/istring.h>
00028 #include <basis/mutex.h>
00029 #include <basis/portable.h>
00030 #include <data_struct/static_memory_gremlin.h>
00031 #include <mechanisms/time_stamp.h>
00032 #include <loggers/console_logger.h>
00033
00034 #ifdef __UNIX__
00035 #include <arpa/inet.h>
00036 #include <errno.h>
00037 #include <netdb.h>
00038 #include <signal.h>
00039 #include <string.h>
00040 #include <sys/ioctl.h>
00041 #include <sys/socket.h>
00042 #include <sys/types.h>
00043 #include <termios.h>
00044 #include <unistd.h>
00045 #endif
00046
00047
00048
00049
00050 #undef LOG
00051 #define LOG(to_print) CLASS_EMERGENCY_LOG(console_logger(), to_print)
00052 #undef FILT_LOG
00053 #define FILT_LOG(filter, to_print) CLASS_FILTER_LOG(console_logger(), to_print, filter)
00054
00055 const int PENDING_CONNECTIONS_ALLOWED = 14;
00056
00057
00058
00059
00060 const int RESOLVE_INTERVAL = 300;
00061
00062
00063
00064
00065
00066 #define RECOGNIZE_DISCO \
00067 _client_bind = false; \
00068 _was_connected = false
00069
00070
00071 #define ENSURE_HEALTH(retval) \
00072 if (!_was_connected) return retval; \
00073 if (!_socket) { RECOGNIZE_DISCO; return retval; }
00074
00075 #define CHECK_BOGUS(retval) \
00076 if (is_bogus()) { return retval; }
00077
00078 #undef GRAB_LOCK
00079 #ifdef __WIN32__
00080
00081 #define GRAB_LOCK auto_synchronizer l(*_select_lock)
00082
00083
00084
00085 #else
00086 #define GRAB_LOCK
00087 #endif
00088
00089 #ifdef __UNIX__
00090 SAFE_STATIC(mutex, __broken_pipe_synch, )
00091 #endif
00092
00093 spocket::spocket(const internet_address &where, sock_types type)
00094 : _type(type),
00095 _socket(0),
00096 _server_socket(0),
00097 _was_connected(false),
00098 _client(false),
00099 _where(new internet_address(where)),
00100 _remote(new internet_address),
00101 _socks(new raw_socket),
00102 _stack(new tcpip_stack),
00103 _select_lock(new mutex),
00104 _last_resolve(new time_stamp),
00105 _client_bind(false),
00106 _cli_bind(new internet_address)
00107 {
00108 FUNCDEF("constructor");
00109 if ( (_type == BROADCAST) || (_type == UNICAST) ) {
00110
00111 _client = true;
00112 } else if ( (type == CONNECTED) || (type == BOGUS_SOCK) ) {
00113
00114 } else {
00115
00116 LOG(isprintf("unknown socket type %d; failing out.", _type));
00117
00118 return;
00119 }
00120 }
00121
00122 spocket::~spocket()
00123 {
00124 FUNCDEF("destructor");
00125 #ifdef DEBUG_SPOCKET
00126 LOG(isprintf("closing spocket: ") + text_form());
00127 #endif
00128 disconnect();
00129 WHACK(_where);
00130 WHACK(_socks);
00131 WHACK(_stack);
00132 WHACK(_remote);
00133 WHACK(_select_lock);
00134 WHACK(_last_resolve);
00135 WHACK(_cli_bind);
00136 _client_bind = false;
00137 }
00138
00139
00140
00141 const internet_address &spocket::where() const { return *_where; }
00142 const internet_address &spocket::remote() const { return *_remote; }
00143
00144 tcpip_stack &spocket::stack() const { return *_stack; }
00145
00146
00147
00148 istring spocket::text_form()
00149 {
00150 isprintf sock_string("socket=%d", _socket);
00151 if (is_root_server())
00152 sock_string += isprintf("root-socket=%d", _server_socket);
00153
00154 return isprintf("%s spocket: %s, %s, %s",
00155 (is_client()? "client" :
00156 (is_root_server()? "root-server" : "server") ),
00157 (connected()? "connected" :
00158 (was_connected()? "unconnected (was once)" : "never-connected") ),
00159 sock_string.s(),
00160 _where->text_form().s());
00161 }
00162
00163 void spocket::bind_client(const internet_address &addr)
00164 {
00165 _client_bind = true;
00166 *_cli_bind = addr;
00167 }
00168
00169 const char *spocket::outcome_name(const outcome &to_name)
00170 {
00171 switch (to_name.value()) {
00172 case NOT_SERVER: return "NOT_SERVER";
00173 default: return communication_commons::outcome_name(to_name);
00174 }
00175 }
00176
00177 outcome spocket::disconnect()
00178 {
00179 FUNCDEF("disconnect");
00180 RECOGNIZE_DISCO;
00181 if (_socket) {
00182 #ifdef DEBUG_SPOCKET
00183 LOG(isprintf("closing socket %d", _socket));
00184 #endif
00185 _socks->close(_socket);
00186 _socket = 0;
00187 }
00188 if (_server_socket) {
00189 #ifdef DEBUG_SPOCKET
00190 LOG(isprintf("closing server socket %d", _server_socket));
00191 #endif
00192 _socks->close(_server_socket);
00193 _server_socket = 0;
00194 }
00195 return OKAY;
00196 }
00197
00198 bool spocket::connected()
00199 {
00200 FUNCDEF("connected");
00201 ENSURE_HEALTH(false);
00202
00203 if (_type != CONNECTED) return _was_connected;
00204
00205
00206 int sel_mode = 0;
00207 GRAB_LOCK;
00208 int ret = _socks->select(_socket, sel_mode);
00209
00210 if (ret == 0) {
00211 return true;
00212 }
00213 if ( (ret & SI_DISCONNECTED) || (ret & SI_ERRONEOUS) ) {
00214 RECOGNIZE_DISCO;
00215 return false;
00216 }
00217 return true;
00218 }
00219
00220 outcome spocket::await_readable(int timeout)
00221 {
00222 FUNCDEF("await_readable");
00223 CHECK_BOGUS(NO_CONNECTION);
00224 ENSURE_HEALTH(NO_CONNECTION);
00225 GRAB_LOCK;
00226 int mode = raw_socket::SELECTING_JUST_READ;
00227 int ret = _socks->select(_socket, mode, timeout);
00228 if (ret & SI_READABLE) return OKAY;
00229
00230 if (ret & SI_DISCONNECTED) {
00231 RECOGNIZE_DISCO;
00232 return NO_CONNECTION;
00233 }
00234 return _socket? NONE_READY : NO_CONNECTION;
00235
00236 }
00237
00238 outcome spocket::await_writable(int timeout)
00239 {
00240 FUNCDEF("await_writable");
00241 CHECK_BOGUS(NO_CONNECTION);
00242 ENSURE_HEALTH(NO_CONNECTION);
00243 GRAB_LOCK;
00244 int mode = raw_socket::SELECTING_JUST_WRITE;
00245 int ret = _socks->select(_socket, mode, timeout);
00246 if (ret & SI_WRITABLE) return OKAY;
00247
00248 if (ret & SI_DISCONNECTED) {
00249 RECOGNIZE_DISCO;
00250 return NO_CONNECTION;
00251 }
00252 return _socket? NONE_READY : NO_CONNECTION;
00253
00254 }
00255
00256 outcome spocket::connect(int communication_wait)
00257 {
00258 FUNCDEF("connect");
00259 CHECK_BOGUS(NO_CONNECTION);
00260 {
00261 GRAB_LOCK;
00262 if ( (_was_connected && !_client) || _server_socket) {
00263 #ifdef DEBUG_SPOCKET
00264 LOG("this object was already opened as a server!");
00265 #endif
00266 return BAD_INPUT;
00267 }
00268 _client = true;
00269 _was_connected = false;
00270 }
00271
00272 if (!_socket) {
00273
00274
00275 #ifdef DEBUG_SPOCKET
00276 LOG(istring("creating socket now for ") + _where->text_form());
00277 #endif
00278 GRAB_LOCK;
00279 int sock_type = SOCK_STREAM;
00280 int proto = IPPROTO_TCP;
00281
00282 if ( (_type == BROADCAST) || (_type == UNICAST) ) {
00283 sock_type = SOCK_DGRAM;
00284 proto = IPPROTO_IP;
00285 }
00286 _socket = int(::socket(AF_INET, sock_type, proto));
00287 if ( (_socket == u_int(INVALID_SOCKET)) || !_socket) {
00288 _socket = NIL;
00289 LOG("Failed to open the client's connecting spocket.");
00290 return ACCESS_DENIED;
00291 }
00292
00293
00294
00295 _socks->set_non_blocking(_socket, false);
00296
00297 if (_type == BROADCAST) {
00298 if (!_socks->set_broadcast(_socket)) return ACCESS_DENIED;
00299
00300 }
00301
00302 if (!_socks->set_reuse_address(_socket)) return ACCESS_DENIED;
00303
00304 }
00305
00306 if (_type == CONNECTED) {
00307 GRAB_LOCK;
00308
00309
00310
00311
00312 if (!_socks->set_keep_alive(_socket)) {
00313 #ifdef DEBUG_SPOCKET
00314 LOG("couldn't set watchdog timer on socket.");
00315 #endif
00316 }
00317
00318
00319
00320
00321 if (strlen(_where->hostname)
00322 && (_where->is_nil_address()
00323 || (*_last_resolve < time_stamp(-RESOLVE_INTERVAL) ) ) ) {
00324
00325
00326 istring full_host;
00327 byte_array ip_addr = _stack->full_resolve(_where->hostname, full_host);
00328 if (ip_addr.length()) {
00329 ip_addr.stuff(internet_address::ADDRESS_SIZE, _where->ip_address);
00330 FILT_LOG(common::NETWORK_LOGGING, istring("successfully re-resolved "
00331 "address--") + _where->text_form());
00332 }
00333 *_last_resolve = time_stamp();
00334 }
00335
00336
00337 if (_client_bind) {
00338 sockaddr sock = _stack->convert(*_cli_bind);
00339
00340 #ifdef DEBUG_SPOCKET
00341 LOG(isprintf("binding client socket %d to ", _socket)
00342 + inet_ntoa(((sockaddr_in *)&sock)->sin_addr));
00343 #endif
00344
00345
00346 if (negative(bind(_socket, &sock, sizeof(sock)))) {
00347 LOG(isprintf("error binding socket %d to ", _socket)
00348 + inet_ntoa(((sockaddr_in *)&sock)->sin_addr));
00349 }
00350 }
00351
00352 } else if ( (_type == BROADCAST) || (_type == UNICAST) ) {
00353
00354
00355
00356 GRAB_LOCK;
00357 sockaddr sock = _stack->convert(*_where);
00358
00359 #ifdef EMBEDDED_BUILD
00360
00361 sockaddr_in *sock_in = (sockaddr_in *)&sock;
00362 sock_in->sin_addr.s_addr = htonl(INADDR_ANY);
00363 #endif
00364
00365 #ifdef DEBUG_SPOCKET
00366 LOG(isprintf("binding socket %d to ", _socket)
00367 + inet_ntoa(((sockaddr_in *)&sock)->sin_addr));
00368 #endif
00369
00370
00371 if (negative(bind(_socket, &sock, sizeof(sock)))) {
00372 LOG(isprintf("error binding socket %d to ", _socket)
00373 + inet_ntoa(((sockaddr_in *)&sock)->sin_addr));
00374 }
00375
00376
00377 _was_connected = true;
00378 return OKAY;
00379 }
00380
00381
00382
00383 sockaddr sock = _stack->convert(*_where);
00384
00385
00386
00387
00388
00389
00390
00391 time_stamp abort_time(communication_wait);
00392
00393 bool connected = false;
00394
00395 int sock_len = sizeof(sock);
00396
00397 while (time_stamp() < abort_time) {
00398
00399 int ret = ::connect(_socket, &sock, sock_len);
00400 if (ret != SOCKET_ERROR) {
00401 connected = true;
00402 _socks->set_non_blocking(_socket, true);
00403 break;
00404 }
00405
00406 u_int last_error = portable::system_error();
00407
00408
00409 if (last_error == SOCK_EISCONN) {
00410 connected = true;
00411 break;
00412 }
00413
00414 if ( (last_error != SOCK_EWOULDBLOCK)
00415 && (last_error != SOCK_EINPROGRESS) ) {
00416
00417 #ifdef DEBUG_SPOCKET
00418 LOG(isprintf("Connect failed (error %s or %d) on address:",
00419 portable::system_error_text(last_error).s(), last_error)
00420 + _where->text_form());
00421 #endif
00422 if (last_error == SOCK_ECONNREFUSED) return NO_ANSWER;
00423
00424 return ACCESS_DENIED;
00425 }
00426
00427 if (time_stamp() >= abort_time) break;
00428
00429
00430 portable::sleep_ms(10);
00431 }
00432
00433 if (connected) {
00434 #ifdef DEBUG_SPOCKET
00435 LOG(isprintf("socket %d connected to server.", _socket));
00436 #endif
00437 GRAB_LOCK;
00438 _was_connected = true;
00439 return OKAY;
00440 }
00441
00442 return TIMED_OUT;
00443 }
00444
00445 outcome spocket::accept(spocket * &sock, bool wait)
00446 {
00447 FUNCDEF("accept");
00448 CHECK_BOGUS(NO_CONNECTION);
00449 if (_type != CONNECTED) return BAD_INPUT;
00450
00451
00452
00453 sock = NIL;
00454
00455 if (_socket) {
00456 #ifdef DEBUG_SPOCKET
00457 LOG("tried to accept on a client spocket.");
00458 #endif
00459 return NOT_SERVER;
00460 }
00461 _client = false;
00462
00463 if (!_server_socket) {
00464 _server_socket = int(::socket(AF_INET, SOCK_STREAM, IPPROTO_TCP));
00465 #ifdef DEBUG_SPOCKET
00466 LOG(isprintf("srv sock is %d", _server_socket));
00467 #endif
00468
00469 #ifdef DEBUG_SPOCKET
00470 LOG(istring("creating server socket now for ") + _where->text_form());
00471 #endif
00472
00473 if (_server_socket == u_int(INVALID_SOCKET)) {
00474 LOG("Failed to open the serving spocket.");
00475 return BAD_INPUT;
00476 }
00477
00478
00479 if (!_socks->set_reuse_address(_server_socket))
00480 LOG("Failed to mark the socket for re-use.");
00481
00482
00483 sockaddr sock = _stack->convert(*_where);
00484
00485
00486 int sock_len = sizeof(sock);
00487 if (bind(_server_socket, (sockaddr *)&sock, sock_len) < 0) {
00488 LOG(istring("Error on bind of ") + portable::system_error_text(portable::system_error()));
00489 _socks->close(_server_socket);
00490 return ACCESS_DENIED;
00491 }
00492
00493
00494 if (listen(_server_socket, PENDING_CONNECTIONS_ALLOWED) < 0) {
00495 LOG(istring("Listen failed with error of ")
00496 + portable::system_error_text(portable::system_error()));
00497 _socks->close(_server_socket);
00498 return ACCESS_DENIED;
00499 }
00500 }
00501
00502
00503
00504
00505 if (!wait) {
00506 _socks->set_non_blocking(_server_socket, true);
00507
00508 } else {
00509 _socks->set_non_blocking(_server_socket, false);
00510
00511 }
00512
00513
00514 sockaddr new_sock;
00515 socklen_t sock_len = sizeof(new_sock);
00516 int accepted = int(::accept(_server_socket, &new_sock, &sock_len));
00517 int error = portable::system_error();
00518 if (accepted == INVALID_SOCKET) {
00519 if (error == SOCK_EWOULDBLOCK) return NO_CONNECTION;
00520 #ifdef DEBUG_SPOCKET
00521 LOG(istring("Accept got no client, with an error of ")
00522 + portable::system_error_text(error));
00523 #endif
00524 return ACCESS_DENIED;
00525 }
00526
00527
00528 _socks->set_non_blocking(accepted, true);
00529
00530
00531 int sock_hop = 1;
00532 if (setsockopt(accepted, SOL_SOCKET, SO_KEEPALIVE, (char *)&sock_hop,
00533 sizeof(sock_hop)) < 0) {
00534 #ifdef DEBUG_SPOCKET
00535 LOG("couldn't set watchdog timer on socket.");
00536 #endif
00537 }
00538
00539
00540 #ifdef DEBUG_SPOCKET
00541 LOG(istring("accepted a client socket for ") + _where->text_form());
00542 #endif
00543
00544
00545
00546
00547
00548 sock = new spocket(*_where);
00549 *sock->_remote = _stack->convert(new_sock);
00550 sock->_socket = accepted;
00551 sock->_server_socket = 0;
00552 sock->_was_connected = true;
00553 return OKAY;
00554 }
00555
00556 outcome spocket::send(const byte_array &to_send, int &len_sent)
00557 {
00558 return send(to_send.observe(), to_send.length(), len_sent);
00559 }
00560
00561 outcome spocket::send(const byte *buffer, int size, int &len_sent)
00562 {
00563 FUNCDEF("send");
00564 CHECK_BOGUS(OKAY);
00565 if (_type != CONNECTED) return BAD_INPUT;
00566 GRAB_LOCK;
00567 ENSURE_HEALTH(NO_CONNECTION);
00568
00569 len_sent = ::send(_socket, (char *)buffer, size, 0);
00570 int error_code = portable::system_error();
00571 if (!len_sent) {
00572 #ifdef DEBUG_SPOCKET
00573 LOG("No data went out on the spocket.");
00574 #endif
00575 return PARTIAL;
00576 }
00577 if (len_sent == SOCKET_ERROR) {
00578 if (error_code == SOCK_EWOULDBLOCK) {
00579 #ifdef DEBUG_SPOCKET
00580 LOG("would block, will try later...");
00581 if (len_sent > 0)
00582 LOG("HEY HEY! some was sent but we were not counting it!!!");
00583 #endif
00584 return NONE_READY;
00585 }
00586 #ifdef DEBUG_SPOCKET
00587 LOG(istring("Error ") + portable::system_error_text(error_code)
00588 + " occurred during the send!");
00589 #endif
00590 if (!connected()) return NO_CONNECTION;
00591 #ifdef DEBUG_SPOCKET
00592 LOG(isprintf("forcing disconnect on socket %d.", _socket));
00593 #endif
00594
00595
00596 disconnect();
00597 return ACCESS_DENIED;
00598 }
00599 if (len_sent != size) {
00600
00601 #ifdef DEBUG_SPOCKET
00602 LOG(isprintf("sent %d bytes out of %d.", len_sent, size));
00603 #endif
00604 return PARTIAL;
00605 }
00606
00607 return OKAY;
00608 }
00609
00610 outcome spocket::send_to(const internet_address &where_to,
00611 const byte_array &to_send, int &len_sent)
00612 {
00613 return send_to(where_to, to_send.observe(), to_send.length(), len_sent);
00614 }
00615
00616 outcome spocket::send_to(const internet_address &where_to, const byte *to_send,
00617 int size, int &len_sent)
00618 {
00619 FUNCDEF("send_to");
00620 CHECK_BOGUS(OKAY);
00621 if (_type == CONNECTED) return BAD_INPUT;
00622 sockaddr dest = _stack->convert(where_to);
00623 int ret = sendto(_socket, (char *)to_send, size, 0, &dest, sizeof(dest));
00624 int error = portable::system_error();
00625 if (ret < 0) {
00626 if (error == SOCK_EWOULDBLOCK) return NONE_READY;
00627 LOG(istring("failed to send packet; error ")
00628 + _stack->tcpip_error_name(error));
00629 return ACCESS_DENIED;
00630 }
00631 if (ret != size) {
00632 LOG(istring("didn't send whole datagram!"));
00633 }
00634 len_sent = ret;
00635 return OKAY;
00636 }
00637
00638 outcome spocket::receive(byte_array &buffer, int &size)
00639 {
00640 FUNCDEF("receive");
00641 CHECK_BOGUS(NONE_READY);
00642 if (_type != CONNECTED) return BAD_INPUT;
00643 if (size <= 0) return BAD_INPUT;
00644 buffer.reset(size);
00645 outcome to_return = receive(buffer.access(), size);
00646
00647 if (to_return == OKAY)
00648 buffer.zap(size, buffer.last());
00649 return to_return;
00650 }
00651
00652 outcome spocket::receive(byte *buffer, int &size)
00653 {
00654 FUNCDEF("receive");
00655 CHECK_BOGUS(NONE_READY);
00656 if (_type != CONNECTED) return BAD_INPUT;
00657 ENSURE_HEALTH(NO_CONNECTION);
00658 int expected = size;
00659 size = 0;
00660 if (expected <= 0) return BAD_INPUT;
00661 GRAB_LOCK;
00662 int len = recv(_socket, (char *)buffer, expected, 0);
00663 if (!len) {
00664
00665 int ret = _socks->select(_socket, raw_socket::SELECTING_JUST_READ);
00666 if (ret & SI_DISCONNECTED) {
00667 RECOGNIZE_DISCO;
00668 return NO_CONNECTION;
00669 }
00670
00671 return NONE_READY;
00672 } else if (len < 0) {
00673 if (portable::system_error() == SOCK_EWOULDBLOCK) return NONE_READY;
00674 #ifdef DEBUG_SPOCKET
00675 LOG(istring("The receive failed with an error ")
00676 + portable::system_error_text(portable::system_error()));
00677 #endif
00678 if (!connected()) return NO_CONNECTION;
00679 return ACCESS_DENIED;
00680 }
00681 size = len;
00682 return OKAY;
00683 }
00684
00685 outcome spocket::receive_from(byte_array &buffer, int &size,
00686 internet_address &where_from)
00687 {
00688 FUNCDEF("receive_from");
00689 where_from = internet_address();
00690 CHECK_BOGUS(NONE_READY);
00691 if (_type == CONNECTED) return BAD_INPUT;
00692 if (size <= 0) return BAD_INPUT;
00693 buffer.reset(size);
00694 outcome to_return = receive_from(buffer.access(), size, where_from);
00695
00696 if (to_return == OKAY)
00697 buffer.zap(size, buffer.last());
00698 return to_return;
00699 }
00700
00701 outcome spocket::receive_from(byte *buffer, int &size,
00702 internet_address &where_from)
00703 {
00704 FUNCDEF("receive_from");
00705 where_from = internet_address();
00706 CHECK_BOGUS(NONE_READY);
00707 if (_type == CONNECTED) return BAD_INPUT;
00708 ENSURE_HEALTH(NO_CONNECTION);
00709 int expected = size;
00710 size = 0;
00711 if (expected <= 0) return BAD_INPUT;
00712 GRAB_LOCK;
00713 sockaddr from;
00714 socklen_t fromlen = sizeof(from);
00715 int len = recvfrom(_socket, (char *)buffer, expected, 0, &from, &fromlen);
00716 int err = portable::system_error();
00717 if (!len) return NONE_READY;
00718 else if (len < 0) {
00719 #ifdef DEBUG_SPOCKET
00720 LOG(isprintf("actual sys err value=%d", err));
00721 #endif
00722 if (err == SOCK_EWOULDBLOCK) return NONE_READY;
00723 if (err == SOCK_ECONNRESET) return NONE_READY;
00724
00725
00726
00727 #ifdef DEBUG_SPOCKET
00728 LOG(istring("The recvfrom failed with an error ")
00729 + portable::system_error_text(err));
00730 #endif
00731 if (!connected()) return NO_CONNECTION;
00732 return ACCESS_DENIED;
00733 }
00734 where_from = _stack->convert(from);
00735 size = len;
00736 return OKAY;
00737 }
00738
00739 #endif //SPOCKET_IMPLEMENTATION_FILE
00740