00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015 #include "crompish_pax.h"
00016
00017 #include <basis/chaos.h>
00018 #include <basis/istring.h>
00019 #include <basis/portable.h>
00020 #include <basis/set.cpp>
00021 #include <cromp/cromp_client.h>
00022 #include <mechanisms/ithread.h>
00023 #include <mechanisms/thread_cabinet.h>
00024 #include <mechanisms/throughput_counter.h>
00025 #include <octopus/entity_data_bin.h>
00026 #include <octopus/entity_defs.h>
00027 #include <octopus/infoton.h>
00028 #include <opsystem/application_shell.h>
00029 #include <opsystem/command_line.h>
00030 #include <loggers/console_logger.h>
00031 #include <loggers/file_logger.h>
00032 #include <opsystem/filename.h>
00033 #include <opsystem/rendezvous.h>
00034 #include <data_struct/static_memory_gremlin.h>
00035 #include <sockets/address.h>
00036
00037 #include <stdlib.h>
00038
00039 #undef LOG
00040 #define LOG(s) CLASS_EMERGENCY_LOG(program_wide_logger(), s)
00041
00042 #define DEBUG_TESTER
00043
00044
00045
00046
00047
00048
00049 const int MAXIMUM_SENDS = 100008;
00050
00051
00052
00053 const int NUMBER_OF_THREADS = 1;
00054
00055
00056
00057
00058
00059 const int GRABBER_THREADS = 0;
00060
00061
00062
00063
00064 const int MAX_SEND_TRIES = 1;
00065
00066
00067
00068
00069
00070
00071
00072
00073 const int CHECKPOINT_SIZE = 100;
00074
00075
00076
00077 const int DATA_SEGMENT_SIZE = 64;
00078
00079
00080
00081
00082 const int REPORTING_INTERVAL = 10 * SECOND_ms;
00083
00084
00085
00086
00087
00088 const int MAXIMUM_ACQUISITION_DELAY = 8 * SECOND_ms;
00089
00090
00091
00092
00093 const int MAXIMUM_PENDING_REQUESTS = 108;
00094
00095
00096
00097
00098
00099 const int PENDING_REQUESTS_FORCED = MAXIMUM_PENDING_REQUESTS;
00100
00101
00102
00103
00104 const int CHANCE_OF_RECONSTRUCT = 14;
00105
00106
00107 #define LOG(s) CLASS_EMERGENCY_LOG(program_wide_logger(), s)
00108 #define BASE_LOG(s) EMERGENCY_LOG(program_wide_logger(), s)
00109
00110 class cromp_client_tester : public application_shell
00111 {
00112 public:
00113 cromp_client_tester();
00114 ~cromp_client_tester();
00115
00116 virtual int execute();
00117
00118 IMPLEMENT_CLASS_NAME("cromp_client_tester");
00119
00120 void bite_server(basis::set<octopus_request_id> &ids,
00121 basis::set<octopus_request_id> &delinquents, void *originator);
00122
00123
00124
00125
00126 int print_instructions();
00128
00129 void grab_items();
00131
00132 void cause_object_reconstruction();
00134
00135 void increment_thread_count() {
00136 FUNCDEF("increment_thread_count");
00137 auto_synchronizer l(*_lock);
00138 _threads_active++;
00139
00140 }
00141
00142 void decrement_thread_count() {
00143 FUNCDEF("decrement_thread_count");
00144 auto_synchronizer l(*_lock);
00145 _threads_active--;
00146
00147 }
00148
00149 void report(const time_stamp &start_time, double bytes_transmitted,
00150 double conversations);
00151
00152
00153 private:
00154 cromp_client *_uplink;
00155
00156 mutex *_lock;
00157 int _threads_active;
00158 time_stamp _last_report;
00159 double _finished_loops;
00160 bool _encryption;
00161 int _send_count;
00162 int _thread_count;
00163 int _grabber_count;
00164 int _send_tries;
00165 int _checkpoint_count;
00166 int _dataseg_size;
00167 int _report_interval;
00168 int _snooze_duration;
00169 bool _rpc_style;
00170 bool _reconstruct_object;
00171 internet_address _server_loc;
00172
00173 void look_for_receipts(int count, basis::set<octopus_request_id> &ids,
00174 basis::set<octopus_request_id> &delinquents, bool wait = false);
00175
00176 };
00177
00179
00180 class bitey_thread : public ithread
00181 {
00182 public:
00183 bitey_thread(cromp_client_tester &parent)
00184 : ithread(), _parent(parent) {}
00185
00186 void perform_activity(void *formal(ptr)) {
00187 FUNCDEF("perform_activity");
00188 _parent.increment_thread_count();
00189 _parent.bite_server(_ids, _delinquents, this);
00190 _parent.decrement_thread_count();
00191 }
00192
00193 private:
00194 cromp_client_tester &_parent;
00195 basis::set<octopus_request_id> _ids;
00196 basis::set<octopus_request_id> _delinquents;
00197 };
00198
00200
00201
00202
00203 class grabby_thread : public ithread
00204 {
00205 public:
00206 grabby_thread(cromp_client_tester &parent)
00207 : ithread(), _parent(parent) {}
00208
00209 void perform_activity(void *formal(ptr)) {
00210 while (!should_stop()) {
00211 _parent.grab_items();
00212 if (_rando.inclusive(0, 100) > 10)
00213 portable::sleep_ms(_rando.inclusive(5, 38));
00214 }
00215 }
00216
00217 private:
00218 cromp_client_tester &_parent;
00219 chaos _rando;
00220 };
00221
00223
00224 cromp_client_tester::cromp_client_tester()
00225 : application_shell("cromp_client_tester"),
00226 _uplink(NIL),
00227 _lock(new mutex),
00228 _threads_active(0),
00229 _finished_loops(0.0),
00230 _encryption(false),
00231 _send_count(0),
00232 _thread_count(0),
00233 _grabber_count(0),
00234 _send_tries(0),
00235 _checkpoint_count(0),
00236 _dataseg_size(0),
00237 _report_interval(0),
00238 _snooze_duration(0),
00239 _rpc_style(false),
00240 _reconstruct_object(false),
00241 _server_loc()
00242 {
00243 FUNCDEF("constructor");
00244 LOG("");
00245 LOG("");
00246
00247 command_line args(__argc, __argv);
00248
00249
00250 int indy = 0;
00251 if (args.find("help", indy, false)
00252 || (args.find("?", indy, false))
00253 || (args.find('?', indy, false)) ) {
00254 print_instructions();
00255 exit(0);
00256 }
00257
00258
00259 istring port_text;
00260 int port = 5678;
00261 if (args.get_value("port", port_text, false)) {
00262 LOG(istring("using port: ") + port_text);
00263 port = port_text.convert(5678);
00264 }
00265 _server_loc.port = port;
00266
00267
00268
00269 indy = 0;
00270 if (args.find("encrypt", indy, false) || (args.find('e', indy, true)) ) {
00271
00272 LOG("turning on encryption.");
00273 _encryption = true;
00274 }
00275
00276 indy = 0;
00277 if (args.find("rpc", indy, false) || (args.find('R', indy, true)) ) {
00278
00279
00280
00281 LOG("turning on RPC style requests.");
00282 _rpc_style = true;
00283 }
00284
00285
00286 istring hostname("local");
00287 istring host_temp;
00288 if (args.get_value("host", host_temp, false)) {
00289 LOG(istring("using host: ") + host_temp);
00290 hostname = host_temp;
00291 }
00292 LOG(istring("using host: ") + hostname);
00293 strcpy(_server_loc.hostname, hostname.s());
00294
00295 istring send_temp;
00296 int send_count = MAXIMUM_SENDS;
00297 if (args.get_value("sends", send_temp, false)) {
00298 LOG(istring("using send count: ") + send_temp);
00299 send_count = send_temp.convert(send_count);
00300 if (send_count <= 0) send_count = 1;
00301 }
00302 _send_count = send_count;
00303
00304 istring thread_temp;
00305 int thread_count = NUMBER_OF_THREADS;
00306 if (args.get_value("threads", thread_temp, false)) {
00307 LOG(istring("using thread count: ") + thread_temp);
00308 thread_count = thread_temp.convert(thread_count);
00309 if (thread_count <= 0) thread_count = 1;
00310 }
00311 _thread_count = thread_count;
00312
00313 istring grabber_temp;
00314 int grabber_count = GRABBER_THREADS;
00315 if (args.get_value("grab", grabber_temp, false)) {
00316 LOG(istring("using grabber count: ") + grabber_temp);
00317 grabber_count = grabber_temp.convert(grabber_count);
00318 if (grabber_count < 0) grabber_count = 0;
00319 }
00320 _grabber_count = grabber_count;
00321
00322 istring send_tries_temp;
00323 int send_tries = MAX_SEND_TRIES;
00324 if (args.get_value("trysend", send_tries_temp, false)) {
00325 LOG(istring("using send tries: ") + send_tries_temp);
00326 send_tries = send_tries_temp.convert(send_tries);
00327 if (send_tries < 0) send_tries = 0;
00328 }
00329 _send_tries = send_tries;
00330
00331
00332
00333
00334 istring checkpoint_temp;
00335 int checkpoint_count = CHECKPOINT_SIZE;
00336 if (args.get_value("print", checkpoint_temp, false)) {
00337 LOG(istring("using checkpoint count: ") + checkpoint_temp);
00338 checkpoint_count = checkpoint_temp.convert(checkpoint_count);
00339 if (checkpoint_count <= 0) checkpoint_count = 1;
00340 }
00341 _checkpoint_count = checkpoint_count;
00342
00343 istring dataseg_temp;
00344 int dataseg_size = DATA_SEGMENT_SIZE;
00345 if (args.get_value("dataseg", dataseg_temp, false)) {
00346 LOG(istring("using dataseg size: ") + dataseg_temp);
00347 dataseg_size = dataseg_temp.convert(dataseg_size);
00348 if (dataseg_size < 0) dataseg_size = 0;
00349 }
00350 _dataseg_size = dataseg_size;
00351
00352 istring report_temp;
00353 int report_interval = REPORTING_INTERVAL;
00354 if (args.get_value("report", report_temp, false)) {
00355 LOG(istring("using report interval: ") + report_temp);
00356 report_interval = report_temp.convert(report_interval);
00357 if (report_interval <= 0) report_interval = 1;
00358 report_interval *= SECOND_ms;
00359 }
00360 _report_interval = report_interval;
00361
00362 istring snooze_temp;
00363 int snooze_duration = 0;
00364 if (args.get_value("snooze", snooze_temp, false)) {
00365 LOG(istring("using snooze duration: ") + snooze_temp);
00366 snooze_duration = snooze_temp.convert(snooze_duration);
00367 if (snooze_duration < 0) snooze_duration = 0;
00368 }
00369 _snooze_duration = snooze_duration;
00370
00371 if (args.find("reconstruct", indy, false)) {
00372 LOG("saw reconstruct flag; will periodically tear down object.");
00373 _reconstruct_object = true;
00374 }
00375
00376 LOG(istring("opening at ") + _server_loc.text_form());
00377 _uplink = new cromp_client(_server_loc);
00378
00379 _uplink->add_tentacle(new bubbles_tentacle(false));
00380
00381 }
00382
00383 cromp_client_tester::~cromp_client_tester()
00384 {
00385 WHACK(_lock);
00386 WHACK(_uplink);
00387 }
00388
00389 int cromp_client_tester::print_instructions()
00390 {
00391 istring name = filename(__argv[0]).basename().raw();
00392 log(isprintf("%s usage:", name.s()));
00393 log("");
00394 log(isprintf("\
00395 This program connects to a cromp test server and exchanges packets to test\n\
00396 the performance of the cromp protocol. All command line flags are optional\n\
00397 but can be added to specify how the test should be performed. Currently,\n\
00398 the valid options are:\n\
00399 --help\tShow this set of command-line help.\n\
00400 -?\t\tditto.\n\
00401 --port N\tConnect to the server on the port specified.\n\
00402 --host X\tConnect to server at IP address or hostname X.\n\
00403 --encrypt\tEncrypt the connection. Server must do this also.\n\
00404 -e\t\tditto.\n\
00405 --sends N\tThe number of sends to perform.\n\
00406 --threads N\tNumber of threads competing for single cromp link.\n\
00407 --grab N\tNumber of additional threads stressing retrievals.\n\
00408 --trysend N\tCount of tries for sending if not all data went out.\n\
00409 --print N\tItems handled in between showing send counter.\n\
00410 --dataseg N\tSize of extra data packed in each test packet.\n\
00411 --report N\tDuration of time between reports, in seconds.\n\
00412 --snooze N\tSleep N ms between each send; this invalidates timing info.\n\
00413 --rpc\tEmulate Remote Procedure Call by awaiting each response.\n\
00414 -R\t\tditto\n\
00415 "));
00416 return -3;
00417 }
00418
00419 void cromp_client_tester::look_for_receipts(int count,
00420 basis::set<octopus_request_id> &ids,
00421 basis::set<octopus_request_id> &delinquents, bool wait)
00422 {
00423 FUNCDEF("look_for_receipts");
00424 infoton *received = NIL;
00425 while (count--) {
00426 if (!ids.length()) break;
00427 octopus_request_id the_id = ids[0];
00428 ids.zap(0, 0);
00429
00430 time_stamp start_acquire;
00431 int delay = MAXIMUM_ACQUISITION_DELAY;
00432 if (wait) delay = 2 * MINUTE_ms;
00433 outcome ret = _uplink->acquire(received, the_id, delay);
00434 int acquire_duration = int(time_stamp().value() - start_acquire.value());
00435 if (acquire_duration >= MAXIMUM_ACQUISITION_DELAY - 1) {
00436 LOG("passed time limit for acquire! this is the faux dual-cpu bug!");
00437 LOG(isprintf("there were %d items left to acquire.", count));
00438 LOG(isprintf("pending %d bytes to send, %d bytes accumulated.",
00439 _uplink->pending_sends(), _uplink->accumulated_bytes()));
00440 LOG(isprintf("the data bin had %d items awaiting pickup.",
00441 _uplink->octo()->responses().items_held()));
00442 if (ret != cromp_client::TIMED_OUT) {
00443 LOG("cromp client lied about outcome?? didn't call this timed out!!");
00444 }
00445 }
00446
00447 if (ret != cromp_client::OKAY) {
00448 if (ret != cromp_client::TIMED_OUT) {
00449 LOG(istring("failed to acquire the response--got error ")
00450 + cromp_client::outcome_name(ret));
00451
00452 ids += the_id;
00453 LOG(isprintf("moved %s back to main id queue.", the_id.text_form().s()));
00454 } else {
00455 if (delinquents.member(the_id))
00456 deadly_error(class_name(), func,
00457 istring("a delinquent response is still missing: ")
00458 + the_id.text_form());
00459
00460 delinquents += the_id;
00461 LOG(isprintf("added %s to delinquents.", the_id.text_form().s()));
00462 }
00463 return;
00464 }
00465
00466 bubble *cast = dynamic_cast<bubble *>(received);
00467 if (!cast) {
00468 deadly_error(class_name(), func, istring("got the wrong type "
00469 "of response: ") + received->classifier().text_form());
00470 }
00471
00472
00473
00474 if (delinquents.member(the_id))
00475 delinquents.remove(the_id);
00476 WHACK(received);
00477 }
00478 }
00479
00480 void cromp_client_tester::bite_server(basis::set<octopus_request_id> &ids,
00481 basis::set<octopus_request_id> &delinquents,
00482 void *originator)
00483 {
00484 FUNCDEF("bite_server");
00485 octopus_request_id cmd_id;
00486
00488
00489 outcome ret;
00490
00491 double overall_sent = 0;
00492
00493
00494 byte_array temp;
00495 bubble test_size(_dataseg_size, screen_rectangle(0, 120, 220, 280),
00496 238843);
00497 test_size.data().reset();
00498
00499 test_size.pack(temp);
00500 int base_length = temp.length();
00501
00502
00503 int failure_count = 0;
00504
00505 time_stamp start;
00506
00507 for (int sends = 1; sends <= _send_count; sends++) {
00508 bubble to_send(_dataseg_size, screen_rectangle(0, 120, 220, 280),
00509 238843);
00510 int curr_sending = to_send.data_length() + base_length * 2;
00511 overall_sent += curr_sending;
00512
00513
00514
00515
00516
00517 ret = _uplink->submit(to_send, cmd_id, _send_tries);
00518 switch (ret.value()) {
00519 case cromp_client::OKAY: {
00520
00521 ids.add(cmd_id);
00522 if (_rpc_style) {
00523
00524 look_for_receipts(1, ids, delinquents, true);
00525 }
00526
00527 if (_snooze_duration) {
00528 _uplink->keep_alive_pause(_snooze_duration, 60);
00529 look_for_receipts(1, ids, delinquents);
00530 }
00531 break;
00532 }
00533 case cromp_client::TOO_FULL: {
00534
00535 BASE_LOG("got too full outcome!");
00536 sends--;
00537 overall_sent -= curr_sending;
00538 continue;
00539 break;
00540 }
00541 case cromp_client::TIMED_OUT: {
00542
00543 BASE_LOG("got timed out outcome!");
00544 sends--;
00545 overall_sent -= curr_sending;
00546 continue;
00547 break;
00548 }
00549 default: {
00550
00551 if (failure_count++ < 20) {
00552 sends--;
00553 overall_sent -= curr_sending;
00554 LOG(istring("got failure outcome ") + cromp_client::outcome_name(ret)
00555 + " from attempt to submit request.");
00556 if (_snooze_duration) {
00557 _uplink->keep_alive_pause(_snooze_duration, 60);
00558 }
00559 continue;
00560 }
00561 deadly_error(class_name(), func,
00562 istring("failed to submit the request--got error ")
00563 + cromp_client::outcome_name(ret));
00564 break;
00565 }
00566 }
00567
00568 _finished_loops += 1.0;
00569
00570 if (ids.elements() > MAXIMUM_PENDING_REQUESTS) {
00571
00572 look_for_receipts(PENDING_REQUESTS_FORCED, ids, delinquents);
00573 }
00574
00575 if (! (sends % _checkpoint_count)) {
00576 BASE_LOG(isprintf("%x send #%d", originator, sends));
00577 }
00578 }
00579 BASE_LOG(isprintf("%x final send #%d", originator, _send_count));
00580
00583
00584 look_for_receipts(ids.elements(), ids, delinquents);
00585
00586 LOG(isprintf("concluded %d test requests and responses.", _send_count));
00587 }
00588
00589 void cromp_client_tester::grab_items()
00590 {
00591 FUNCDEF("grab_items");
00592 octopus_request_id id(_uplink->entity(), -12);
00593
00594 infoton *found = NIL;
00595 outcome ret = _uplink->retrieve_and_restore(found, id, 0);
00596 WHACK(found);
00597 }
00598
00599 void cromp_client_tester::report(const time_stamp &start_time,
00600 double bytes_transmitted, double conversations)
00601 {
00602 FUNCDEF("report");
00603 throughput_counter bandwidth;
00604 double duration = time_stamp().value() - start_time.value();
00605
00606 bandwidth.add_run(bytes_transmitted, duration, conversations * 2);
00607
00608
00609
00610
00611
00612
00613 double bytes_per_item = bandwidth.bytes_sent() / bandwidth.number_of_sends();
00614
00615 bubble my_bubble(_dataseg_size);
00616
00617
00618 byte_array packed_classifier;
00619 basis::pack(packed_classifier, my_bubble.classifier());
00620 double classifier_size = packed_classifier.length() - sizeof(int);
00621
00622
00623
00624
00625 double payload_portion = my_bubble.packed_size() + classifier_size;
00626
00627
00628 double overhead = bytes_per_item - payload_portion;
00629
00630
00631 double percent_overhead = overhead / bytes_per_item;
00632
00633
00634 #if 0
00635
00636 byte_array packed_infote;
00637 infoton::fast_pack(packed_infote, my_bubble);
00638 log(isprintf("sane? -- overhead for just packed infoton is %d bytes.",
00639 packed_infote.length() - payload_portion));
00640 octopus_request_id example_request(_uplink->entity(), 23982);
00641 byte_array packed_req_id;
00642 example_request.pack(packed_req_id);
00643 log(isprintf(" -- overhead for octo request id is %d bytes.",
00644 packed_req_id.length()));
00645 byte_array packed_transa;
00646 cromp_transaction::flatten(packed_transa, my_bubble,
00647 octopus_request_id(_uplink->entity(), 23982));
00648 log(isprintf(" -- overhead for cromp transation is %d bytes.",
00649 packed_transa.length() - payload_portion));
00650 #endif
00651
00652 BASE_LOG(isprintf("sent %.0f items, %.0f bytes, %.0f bytes per item,%s"
00653 "payload %.0f bytes, overhead %.0f bytes, percent overhead %.1f%%,%s"
00654 "in %.2f seconds is %f ms/item%s"
00655 "at %.2f %cb/sec & %.2f items/sec.",
00656 bandwidth.number_of_sends(), bandwidth.bytes_sent(),
00657 bytes_per_item,
00658 log_base::platform_ending(),
00659 payload_portion, overhead, percent_overhead * 100.0,
00660 log_base::platform_ending(),
00661 bandwidth.total_time() / SECOND_ms,
00662 bandwidth.total_time() / bandwidth.number_of_sends(),
00663 log_base::platform_ending(),
00664 (bandwidth.kilobytes_per_second() < 1024.0?
00665 bandwidth.kilobytes_per_second()
00666 : bandwidth.megabytes_per_second()),
00667 (bandwidth.kilobytes_per_second() < 1024.0? 'K' : 'M'),
00668 bandwidth.number_of_sends() / (bandwidth.total_time() / SECOND_ms)));
00669 }
00670
00671 void cromp_client_tester::cause_object_reconstruction()
00672 {
00673 FUNCDEF("cause_object_reconstruction");
00674 int rando = chaos().inclusive(1, 100);
00675 if (rando > CHANCE_OF_RECONSTRUCT) return;
00676 LOG(istring("reconstructing client at ") + _server_loc.text_form());
00677 WHACK(_uplink);
00678 _uplink = new cromp_client(_server_loc);
00679 }
00680
00681 int cromp_client_tester::execute()
00682 {
00683 FUNCDEF("execute");
00684
00685
00686 bubble fud(randomizer().inclusive(12, 2829));
00687 byte_array packed_fud;
00688 fud.pack(packed_fud);
00689 if (packed_fud.length() != fud.packed_size())
00690 deadly_error(class_name(), func, "bubble's packed size method is wrong.");
00691
00692 if (_encryption) _uplink->enable_encryption();
00693
00694 outcome ret = _uplink->connect();
00695 if (ret != cromp_client::OKAY) {
00696 deadly_error(class_name(), func, istring("connection failed with error: ")
00697 + cromp_client::outcome_name(ret));
00698 }
00699
00700 thread_cabinet cab;
00701
00702 LOG(isprintf("adding %d grabber threads to test.", _grabber_count));
00703
00704
00705 for (int i = 0; i < _grabber_count; i++) {
00706 grabby_thread *to_add = new grabby_thread(*this);
00707 cab.add_thread(to_add, false, NIL);
00708 }
00709
00710 LOG(isprintf("adding %d transmitter threads to test.", _thread_count));
00711
00712
00713 for (int j = 0; j < _thread_count; j++) {
00714 bitey_thread *to_add = new bitey_thread(*this);
00715 cab.add_thread(to_add, false, NIL);
00716 }
00717
00718
00719 time_stamp start;
00720 cab.start_all(NIL);
00721
00722
00723 portable::sleep_ms(400);
00724
00725
00726
00727 while (cab.any_running()) {
00728 portable::sleep_ms(30);
00729 if (!_threads_active) {
00730 break;
00731 }
00732
00733 if (time_stamp(-_report_interval) > _last_report) {
00734 report(start, cromp_common::total_bytes_sent()
00735 + cromp_common::total_bytes_received(),
00736 _finished_loops);
00737 _last_report.reset();
00738 }
00739 if (_reconstruct_object) {
00740 cause_object_reconstruction();
00741 }
00742 }
00743
00744 LOG("- done testing -");
00745
00746 if (_finished_loops != double(_thread_count) * _send_count)
00747 LOG(isprintf("number of loops was calculated differently: wanted %d, "
00748 "got %d", _thread_count * _send_count, _finished_loops));
00749
00750 report(start, cromp_common::total_bytes_sent()
00751 + cromp_common::total_bytes_received(),
00752 _thread_count * _send_count);
00753
00754
00755 cab.stop_all();
00756 LOG("all threads exited.");
00757
00758 #ifdef DEBUG_TESTER
00761 #endif
00762
00763 BASE_LOG("cromp_client:: works for those functions tested.");
00764
00765 return 0;
00766 }
00767
00769
00770 HOOPLE_MAIN(cromp_client_tester, )
00771