t_cromp_client.cpp

Go to the documentation of this file.
00001 /*****************************************************************************\
00002 *                                                                             *
00003 *  Name   : test_cromp_client                                                 *
00004 *  Author : Chris Koeritz                                                     *
00005 *                                                                             *
00006 *******************************************************************************
00007 * Copyright (c) 2002-$now By Author.  This program is free software; you can  *
00008 * redistribute it and/or modify it under the terms of the GNU General Public  *
00009 * License as published by the Free Software Foundation; either version 2 of   *
00010 * the License or (at your option) any later version.  This is online at:      *
00011 *     http://www.fsf.org/copyleft/gpl.html                                    *
00012 * Please send any updates to: fred@gruntose.com                               *
00013 \*****************************************************************************/
00014 
00015 #include "crompish_pax.h"
00016 
00017 #include <basis/chaos.h>
00018 #include <basis/istring.h>
00019 #include <basis/portable.h>
00020 #include <basis/set.cpp>
00021 #include <cromp/cromp_client.h>
00022 #include <mechanisms/ithread.h>
00023 #include <mechanisms/thread_cabinet.h>
00024 #include <mechanisms/throughput_counter.h>
00025 #include <octopus/entity_data_bin.h>
00026 #include <octopus/entity_defs.h>
00027 #include <octopus/infoton.h>
00028 #include <opsystem/application_shell.h>
00029 #include <opsystem/command_line.h>
00030 #include <loggers/console_logger.h>
00031 #include <loggers/file_logger.h>
00032 #include <opsystem/filename.h>
00033 #include <opsystem/rendezvous.h>
00034 #include <data_struct/static_memory_gremlin.h>
00035 #include <sockets/address.h>
00036 
00037 #include <stdlib.h>
00038 
00039 #undef LOG
00040 #define LOG(s) CLASS_EMERGENCY_LOG(program_wide_logger(), s)
00041 
00042 #define DEBUG_TESTER
00043   // uncomment for noisier version.
00044 
00045 // the number of transactions to send during a test.  if timing connection
00046 // duration, then use a maximum of 1.  if timing speed of operation once
00047 // connected, use a large number.
00048 //const int MAXIMUM_SENDS = 10008;
00049 const int MAXIMUM_SENDS = 100008;
00050 //const int MAXIMUM_SENDS = 10000008;
00051   // have had success with up to 10000000 sends using small data segments.
00052 
00053 const int NUMBER_OF_THREADS = 1;
00054 //const int NUMBER_OF_THREADS = 10;
00055 //const int NUMBER_OF_THREADS = 20;
00056   // the number of simultaneous actors on the single cromp_client.
00057 
00058 //const int GRABBER_THREADS = 5;
00059 const int GRABBER_THREADS = 0;
00060   // the number of threads that just pluck at the cromp_client trying to
00061   // interfere with the testing threads.
00062 
00063 //const int MAX_SEND_TRIES = 0;  // don't pause.
00064 const int MAX_SEND_TRIES = 1;  // try to get stuff out but don't wait long.
00065 //const int MAX_SEND_TRIES = 5;  // wait a reasonable amount of times to send.
00066 //const int MAX_SEND_TRIES = 10000;  // force it to get out, hopefully.
00067   // the number of times we try to push the sends out.  zero means never
00068   // try to push anything, just add it to the buffer.  1 or more is that
00069   // many tries to push the send.
00070 
00071 //const int CHECKPOINT_SIZE = 1000;
00072 //const int CHECKPOINT_SIZE = 100;
00073 const int CHECKPOINT_SIZE = 100;
00074   // prints a counter out when we reach a multiple of this many sends.
00075 
00076 //const int DATA_SEGMENT_SIZE = 0;
00077 const int DATA_SEGMENT_SIZE = 64;
00078 //const int DATA_SEGMENT_SIZE = 128 * KILOBYTE;
00079 //const int DATA_SEGMENT_SIZE = 1 * MEGABYTE;
00080   // the chunk size that we attach.
00081 
00082 const int REPORTING_INTERVAL = 10 * SECOND_ms;
00083   // this is the period between reports on how the test is going.
00084 
00085 //***this is where we are in testing the faux dual cpu problem.
00086 //***the longer delay shows the problem more easily.  the shorter delay
00087 //***is being used for a long test.
00088 const int MAXIMUM_ACQUISITION_DELAY = 8 * SECOND_ms;
00089 //const int MAXIMUM_ACQUISITION_DELAY = int(0.5 * SECOND_ms);
00090   // the longest we'll snooze off waiting for pending receptions to occur.
00091 
00092 //const int MAXIMUM_PENDING_REQUESTS = 1;
00093 const int MAXIMUM_PENDING_REQUESTS = 108;
00094   // this is a threshold for the number of requests; once hit, we start
00095   // awaiting the responses.
00096 
00097 //const int PENDING_REQUESTS_FORCED = 3;
00098 //const int PENDING_REQUESTS_FORCED = 80;
00099 const int PENDING_REQUESTS_FORCED = MAXIMUM_PENDING_REQUESTS;
00100   // when we've been forced to gather some pending responses to previous
00101   // requests, this is how many we'll try to get at once.  numbers closer
00102   // to the MAXIMUM_PENDING_REQUESTS will force more synchrony.
00103 
00104 const int CHANCE_OF_RECONSTRUCT = 14;
00105   // how frequently a bus reconstruction occurs, in 1000.
00106 
00107 #define LOG(s) CLASS_EMERGENCY_LOG(program_wide_logger(), s)
00108 #define BASE_LOG(s) EMERGENCY_LOG(program_wide_logger(), s)
00109 
00110 class cromp_client_tester : public application_shell
00111 {
00112 public:
00113   cromp_client_tester();
00114   ~cromp_client_tester();
00115 
00116   virtual int execute();
00117 
00118   IMPLEMENT_CLASS_NAME("cromp_client_tester");
00119 
00120   void bite_server(basis::set<octopus_request_id> &ids,
00121           basis::set<octopus_request_id> &delinquents, void *originator);
00122     // performs the big chunk of testing.  the "ids" are the history of the
00123     // sends that were made and they're managed by this method.  the
00124     // "originator" is a tag we can use to generate unique print outs.
00125 
00126   int print_instructions();
00128 
00129   void grab_items();
00131 
00132   void cause_object_reconstruction();
00134 
00135   void increment_thread_count() {
00136     FUNCDEF("increment_thread_count");
00137     auto_synchronizer l(*_lock);
00138     _threads_active++;
00139 //LOG(isprintf("count now %d", _threads_active));
00140   }
00141 
00142   void decrement_thread_count() {
00143     FUNCDEF("decrement_thread_count");
00144     auto_synchronizer l(*_lock);
00145     _threads_active--;
00146 //LOG(isprintf("count now %d", _threads_active));
00147   }
00148 
00149   void report(const time_stamp &start_time, double bytes_transmitted,
00150           double conversations);
00151     // describes how the test is going.
00152 
00153 private:
00154   cromp_client *_uplink;  // provides the connection and transmission services.
00155 
00156   mutex *_lock;  // protects the objects below.
00157   int _threads_active;  // the number of transmitter threads running.
00158   time_stamp _last_report;  // when we last reported on progress.
00159   double _finished_loops;  // counts number of loops we've achieved.
00160   bool _encryption;  // true if we're encrypting.
00161   int _send_count;  
00162   int _thread_count;  
00163   int _grabber_count;  
00164   int _send_tries;  
00165   int _checkpoint_count;  
00166   int _dataseg_size;  
00167   int _report_interval;  
00168   int _snooze_duration;  
00169   bool _rpc_style;  
00170   bool _reconstruct_object;  
00171   internet_address _server_loc;  
00172 
00173   void look_for_receipts(int count, basis::set<octopus_request_id> &ids,
00174           basis::set<octopus_request_id> &delinquents, bool wait = false);
00175     // attempts to get "count" items from the list of "ids".
00176 };
00177 
00179 
00180 class bitey_thread : public ithread
00181 {
00182 public:
00183   bitey_thread(cromp_client_tester &parent)
00184       : ithread(), _parent(parent) {}
00185 
00186   void perform_activity(void *formal(ptr)) {
00187     FUNCDEF("perform_activity");
00188     _parent.increment_thread_count();
00189     _parent.bite_server(_ids, _delinquents, this);
00190     _parent.decrement_thread_count();
00191   }
00192 
00193 private:
00194   cromp_client_tester &_parent;
00195   basis::set<octopus_request_id> _ids;  // the ids for commands we've sent.
00196   basis::set<octopus_request_id> _delinquents;  // missing ids during rcv.
00197 };
00198 
00200 
00201 //hmmm: next stop; inject the types of items they're expecting in grab_items.
00202 
00203 class grabby_thread : public ithread
00204 {
00205 public:
00206   grabby_thread(cromp_client_tester &parent)
00207   : ithread(), _parent(parent) {}
00208 
00209   void perform_activity(void *formal(ptr)) {
00210     while (!should_stop()) {
00211       _parent.grab_items();
00212       if (_rando.inclusive(0, 100) > 10)
00213         portable::sleep_ms(_rando.inclusive(5, 38));
00214     }
00215   }
00216 
00217 private:
00218   cromp_client_tester &_parent;
00219   chaos _rando;
00220 };
00221 
00223 
00224 cromp_client_tester::cromp_client_tester()
00225 : application_shell("cromp_client_tester"),
00226   _uplink(NIL),
00227   _lock(new mutex),
00228   _threads_active(0),
00229   _finished_loops(0.0),
00230   _encryption(false),
00231   _send_count(0),
00232   _thread_count(0),
00233   _grabber_count(0),
00234   _send_tries(0),
00235   _checkpoint_count(0),
00236   _dataseg_size(0),
00237   _report_interval(0),
00238   _snooze_duration(0),
00239   _rpc_style(false),
00240   _reconstruct_object(false),
00241   _server_loc()
00242 {
00243   FUNCDEF("constructor");
00244   LOG("");
00245   LOG("");
00246 
00247   command_line args(__argc, __argv);
00248 //LOG(isprintf("argc is %d and first is %s", __argc, __argv[0]));
00249 
00250   int indy = 0;
00251   if (args.find("help", indy, false)
00252       || (args.find("?", indy, false))
00253       || (args.find('?', indy, false)) ) {
00254     print_instructions();
00255     exit(0);
00256   }
00257 
00258   // check for a port on the command line.
00259   istring port_text;
00260   int port = 5678;
00261   if (args.get_value("port", port_text, false)) {
00262     LOG(istring("using port: ") + port_text);
00263     port = port_text.convert(5678);
00264   }
00265   _server_loc.port = port;
00266 
00267 //hmmm:normalize host so this can take either name or IP.
00268 
00269   indy = 0;
00270   if (args.find("encrypt", indy, false) || (args.find('e', indy, true)) ) {
00271     // they're saying that we should encrypt the communication.
00272     LOG("turning on encryption.");
00273     _encryption = true;
00274   }
00275 
00276   indy = 0;
00277   if (args.find("rpc", indy, false) || (args.find('R', indy, true)) ) {
00278     // this is telling us to turn on RPC mode.  we will make each request and
00279     // reply pair synchronous, i.e., each reply will be awaited for when a
00280     // request has been made.
00281     LOG("turning on RPC style requests.");
00282     _rpc_style = true;
00283   }
00284 
00285   // check for a hostname on the command line.
00286   istring hostname("local");
00287   istring host_temp;
00288   if (args.get_value("host", host_temp, false)) {
00289     LOG(istring("using host: ") + host_temp);
00290     hostname = host_temp;
00291   }
00292 LOG(istring("using host: ") + hostname);
00293   strcpy(_server_loc.hostname, hostname.s());
00294 
00295   istring send_temp;
00296   int send_count = MAXIMUM_SENDS;
00297   if (args.get_value("sends", send_temp, false)) {
00298     LOG(istring("using send count: ") + send_temp);
00299     send_count = send_temp.convert(send_count);
00300     if (send_count <= 0) send_count = 1;
00301   }
00302   _send_count = send_count;
00303 
00304   istring thread_temp;
00305   int thread_count = NUMBER_OF_THREADS;
00306   if (args.get_value("threads", thread_temp, false)) {
00307     LOG(istring("using thread count: ") + thread_temp);
00308     thread_count = thread_temp.convert(thread_count);
00309     if (thread_count <= 0) thread_count = 1;
00310   }
00311   _thread_count = thread_count;
00312 
00313   istring grabber_temp;
00314   int grabber_count = GRABBER_THREADS;
00315   if (args.get_value("grab", grabber_temp, false)) {
00316     LOG(istring("using grabber count: ") + grabber_temp);
00317     grabber_count = grabber_temp.convert(grabber_count);
00318     if (grabber_count < 0) grabber_count = 0;
00319   }
00320   _grabber_count = grabber_count;
00321 
00322   istring send_tries_temp;
00323   int send_tries = MAX_SEND_TRIES;
00324   if (args.get_value("trysend", send_tries_temp, false)) {
00325     LOG(istring("using send tries: ") + send_tries_temp);
00326     send_tries = send_tries_temp.convert(send_tries);
00327     if (send_tries < 0) send_tries = 0;
00328   }
00329   _send_tries = send_tries;
00330 
00331 //hmmm: how tiresome.  how about a macro here?  could help in general
00332 //      with command_line also.
00333 
00334   istring checkpoint_temp;
00335   int checkpoint_count = CHECKPOINT_SIZE;
00336   if (args.get_value("print", checkpoint_temp, false)) {
00337     LOG(istring("using checkpoint count: ") + checkpoint_temp);
00338     checkpoint_count = checkpoint_temp.convert(checkpoint_count);
00339     if (checkpoint_count <= 0) checkpoint_count = 1;
00340   }
00341   _checkpoint_count = checkpoint_count;
00342 
00343   istring dataseg_temp;
00344   int dataseg_size = DATA_SEGMENT_SIZE;
00345   if (args.get_value("dataseg", dataseg_temp, false)) {
00346     LOG(istring("using dataseg size: ") + dataseg_temp);
00347     dataseg_size = dataseg_temp.convert(dataseg_size);
00348     if (dataseg_size < 0) dataseg_size = 0;
00349   }
00350   _dataseg_size = dataseg_size;
00351 
00352   istring report_temp;
00353   int report_interval = REPORTING_INTERVAL;
00354   if (args.get_value("report", report_temp, false)) {
00355     LOG(istring("using report interval: ") + report_temp);
00356     report_interval = report_temp.convert(report_interval);
00357     if (report_interval <= 0) report_interval = 1;
00358     report_interval *= SECOND_ms;  // convert to milliseconds.
00359   }
00360   _report_interval = report_interval;
00361 
00362   istring snooze_temp;
00363   int snooze_duration = 0;  // no snooze by default.
00364   if (args.get_value("snooze", snooze_temp, false)) {
00365     LOG(istring("using snooze duration: ") + snooze_temp);
00366     snooze_duration = snooze_temp.convert(snooze_duration);
00367     if (snooze_duration < 0) snooze_duration = 0;
00368   }
00369   _snooze_duration = snooze_duration;
00370 
00371   if (args.find("reconstruct", indy, false)) {
00372     LOG("saw reconstruct flag; will periodically tear down object.");
00373     _reconstruct_object = true;
00374   }
00375 
00376 LOG(istring("opening at ") + _server_loc.text_form());
00377   _uplink = new cromp_client(_server_loc);
00378 
00379   _uplink->add_tentacle(new bubbles_tentacle(false));
00380 //we don't need backgrounding right now.
00381 }
00382 
00383 cromp_client_tester::~cromp_client_tester()
00384 {
00385   WHACK(_lock);
00386   WHACK(_uplink);
00387 }
00388 
00389 int cromp_client_tester::print_instructions()
00390 {
00391   istring name = filename(__argv[0]).basename().raw();
00392   log(isprintf("%s usage:", name.s()));
00393   log("");
00394   log(isprintf("\
00395 This program connects to a cromp test server and exchanges packets to test\n\
00396 the performance of the cromp protocol.  All command line flags are optional\n\
00397 but can be added to specify how the test should be performed.  Currently,\n\
00398 the valid options are:\n\
00399    --help\tShow this set of command-line help.\n\
00400    -?\t\tditto.\n\
00401    --port N\tConnect to the server on the port specified.\n\
00402    --host X\tConnect to server at IP address or hostname X.\n\
00403    --encrypt\tEncrypt the connection.  Server must do this also.\n\
00404    -e\t\tditto.\n\
00405    --sends N\tThe number of sends to perform.\n\
00406    --threads N\tNumber of threads competing for single cromp link.\n\
00407    --grab N\tNumber of additional threads stressing retrievals.\n\
00408    --trysend N\tCount of tries for sending if not all data went out.\n\
00409    --print N\tItems handled in between showing send counter.\n\
00410    --dataseg N\tSize of extra data packed in each test packet.\n\
00411    --report N\tDuration of time between reports, in seconds.\n\
00412    --snooze N\tSleep N ms between each send; this invalidates timing info.\n\
00413    --rpc\tEmulate Remote Procedure Call by awaiting each response.\n\
00414    -R\t\tditto\n\
00415 "));
00416   return -3;
00417 }
00418 
00419 void cromp_client_tester::look_for_receipts(int count,
00420     basis::set<octopus_request_id> &ids,
00421     basis::set<octopus_request_id> &delinquents, bool wait)
00422 {
00423   FUNCDEF("look_for_receipts");
00424   infoton *received = NIL;
00425   while (count--) {
00426     if (!ids.length()) break;  // nothing to check on.
00427     octopus_request_id the_id = ids[0];
00428     ids.zap(0, 0);  // take out the one we're inspecting right now.
00429 
00430     time_stamp start_acquire;
00431     int delay = MAXIMUM_ACQUISITION_DELAY;
00432     if (wait) delay = 2 * MINUTE_ms;  // force a long delay.
00433     outcome ret = _uplink->acquire(received, the_id, delay);
00434     int acquire_duration = int(time_stamp().value() - start_acquire.value());
00435     if (acquire_duration >= MAXIMUM_ACQUISITION_DELAY - 1) {
00436       LOG("passed time limit for acquire!  this is the faux dual-cpu bug!");
00437       LOG(isprintf("there were %d items left to acquire.", count));
00438       LOG(isprintf("pending %d bytes to send, %d bytes accumulated.",
00439           _uplink->pending_sends(), _uplink->accumulated_bytes()));
00440       LOG(isprintf("the data bin had %d items awaiting pickup.",
00441           _uplink->octo()->responses().items_held()));
00442       if (ret != cromp_client::TIMED_OUT) {
00443         LOG("cromp client lied about outcome??  didn't call this timed out!!");
00444       }
00445     }
00446 
00447     if (ret != cromp_client::OKAY) {
00448       if (ret != cromp_client::TIMED_OUT) {
00449         LOG(istring("failed to acquire the response--got error ")
00450                 + cromp_client::outcome_name(ret));
00451         // give it another chance later.
00452         ids += the_id;
00453 LOG(isprintf("moved %s back to main id queue.", the_id.text_form().s()));
00454       } else {
00455         if (delinquents.member(the_id))
00456           deadly_error(class_name(), func,
00457               istring("a delinquent response is still missing: ")
00458               + the_id.text_form());
00459         // if we hadn't already seen it, we'll watch for it next time.
00460         delinquents += the_id;
00461 LOG(isprintf("added %s to delinquents.", the_id.text_form().s()));
00462       }
00463       return;
00464     }
00465     // check that the right type is coming back to us.
00466     bubble *cast = dynamic_cast<bubble *>(received);
00467     if (!cast) {
00468       deadly_error(class_name(), func, istring("got the wrong type "
00469           "of response: ") + received->classifier().text_form());
00470     }
00471  
00472     // if we had a problem with this item earlier, we remove it since it
00473     // succeeded this time.
00474     if (delinquents.member(the_id))
00475       delinquents.remove(the_id);
00476     WHACK(received);
00477   }
00478 }
00479 
00480 void cromp_client_tester::bite_server(basis::set<octopus_request_id> &ids,
00481     basis::set<octopus_request_id> &delinquents,
00482     void *originator)
00483 {
00484   FUNCDEF("bite_server");
00485   octopus_request_id cmd_id;
00486 
00488 
00489   outcome ret;
00490 
00491   double overall_sent = 0;
00492 
00493   // this computes the size of the exchange object with no extra data attached.
00494   byte_array temp;
00495   bubble test_size(_dataseg_size, screen_rectangle(0, 120, 220, 280),
00496       238843);
00497   test_size.data().reset();
00498     // set the data segment to zero length.
00499   test_size.pack(temp);
00500   int base_length = temp.length();
00501     // this is the base packed length of the bubble object.
00502 
00503   int failure_count = 0;
00504 
00505   time_stamp start;  // record when our testing started.
00506 
00507   for (int sends = 1; sends <= _send_count; sends++) {
00508     bubble to_send(_dataseg_size, screen_rectangle(0, 120, 220, 280),
00509         238843);
00510     int curr_sending = to_send.data_length() + base_length * 2;
00511     overall_sent += curr_sending;
00512       // we compute the overall sent by what's sent in the request (which is
00513       // of the base length plus the attached array size) and the reply (which
00514       // is the base length only since the server resets the data attachment).
00515       // we go ahead and count it as sent before the send, since we're going
00516       // to bomb out if the send doesn't work.
00517     ret = _uplink->submit(to_send, cmd_id, _send_tries);
00518     switch (ret.value()) {
00519       case cromp_client::OKAY: {
00520         // complete success in sending that chunk out.
00521         ids.add(cmd_id);  // record it.
00522         if (_rpc_style) {
00523           // this call is used to force single requests and replies RPC style.
00524           look_for_receipts(1, ids, delinquents, true);
00525         }
00526         // sleep if we were asked to.
00527         if (_snooze_duration) {
00528           _uplink->keep_alive_pause(_snooze_duration, 60);
00529           look_for_receipts(1, ids, delinquents);
00530         }
00531         break;
00532       }
00533       case cromp_client::TOO_FULL: {
00534 //treating as failure right now.
00535 BASE_LOG("got too full outcome!");
00536         sends--;
00537         overall_sent -= curr_sending;
00538         continue;
00539         break;
00540       }
00541       case cromp_client::TIMED_OUT: {
00542 //treating as failure right now.
00543 BASE_LOG("got timed out outcome!");
00544         sends--;
00545         overall_sent -= curr_sending;
00546         continue;
00547         break;
00548       }
00549       default: {
00550         // a failure case that we have no other handling for.
00551         if (failure_count++ < 20) {
00552           sends--;  // skip back for the failed one.
00553           overall_sent -= curr_sending;  // remove unsent portion.
00554           LOG(istring("got failure outcome ") + cromp_client::outcome_name(ret)
00555               + " from attempt to submit request.");
00556           if (_snooze_duration) {
00557             _uplink->keep_alive_pause(_snooze_duration, 60);
00558           }
00559           continue;  // try again.
00560         }
00561         deadly_error(class_name(), func,
00562             istring("failed to submit the request--got error ")
00563                 + cromp_client::outcome_name(ret));
00564         break;
00565       }
00566     }
00567 
00568     _finished_loops += 1.0;
00569 
00570     if (ids.elements() > MAXIMUM_PENDING_REQUESTS) {
00571       // grab some of the items waiting.  hopefully they're back by now.
00572       look_for_receipts(PENDING_REQUESTS_FORCED, ids, delinquents);
00573     }
00574 
00575     if (! (sends % _checkpoint_count)) {
00576       BASE_LOG(isprintf("%x send #%d", originator, sends));
00577     }
00578   }
00579   BASE_LOG(isprintf("%x final send #%d", originator, _send_count));
00580 
00583 
00584   look_for_receipts(ids.elements(), ids, delinquents);
00585 
00586   LOG(isprintf("concluded %d test requests and responses.", _send_count));
00587 }
00588 
00589 void cromp_client_tester::grab_items()
00590 {
00591   FUNCDEF("grab_items");
00592   octopus_request_id id(_uplink->entity(), -12);
00593     // look for an id we don't expect to have any thing waiting for.
00594   infoton *found = NIL;
00595   outcome ret = _uplink->retrieve_and_restore(found, id, 0);
00596   WHACK(found);
00597 }
00598 
00599 void cromp_client_tester::report(const time_stamp &start_time,
00600     double bytes_transmitted, double conversations)
00601 {
00602   FUNCDEF("report");
00603   throughput_counter bandwidth;  // calculator for communication speed.
00604   double duration = time_stamp().value() - start_time.value();
00605     // the elapsed duration so far.
00606   bandwidth.add_run(bytes_transmitted, duration, conversations * 2);
00607     // create a portrait of how the run has progressed.  we multiply the
00608     // conversations by two since we are counting both the request and the
00609     // response (send and receive) as a transfer.
00610 
00611   // calculate the number of bytes per item for real as it plays out in
00612   // cromp sending.
00613   double bytes_per_item = bandwidth.bytes_sent() / bandwidth.number_of_sends();
00614 
00615   bubble my_bubble(_dataseg_size);  // an exemplar for our sends.
00616 
00617   // calculate how much space bubble's naming takes up.
00618   byte_array packed_classifier;
00619   basis::pack(packed_classifier, my_bubble.classifier());
00620   double classifier_size = packed_classifier.length() - sizeof(int);
00621     // that's how much space is used by our goofy classifier name.  there are
00622     // a few bytes extra overhead for packing a string array and we remove
00623     // them from consideration; we only want credit for the name, since that
00624     // is not truly overhead, given that the bubble infoton chose it.
00625   double payload_portion = my_bubble.packed_size() + classifier_size;
00626     // calculate the portion of our transmissions that are solely the
00627     // result of what we are putting into the package.
00628   double overhead = bytes_per_item - payload_portion;
00629     // okay, this is how many bytes per item is cromp noise, rather than
00630     // something the user is responsible for.
00631   double percent_overhead = overhead / bytes_per_item;
00632 
00633 // change 0 to 1 to enable this section of information.
00634 #if 0
00635   // get additional facts about how much of a packed infoton is wasted.
00636   byte_array packed_infote;
00637   infoton::fast_pack(packed_infote, my_bubble);
00638   log(isprintf("sane? -- overhead for just packed infoton is %d bytes.",
00639       packed_infote.length() - payload_portion));
00640   octopus_request_id example_request(_uplink->entity(), 23982);
00641   byte_array packed_req_id;
00642   example_request.pack(packed_req_id);
00643   log(isprintf("  -- overhead for octo request id is %d bytes.",
00644       packed_req_id.length()));
00645   byte_array packed_transa;
00646   cromp_transaction::flatten(packed_transa, my_bubble,
00647       octopus_request_id(_uplink->entity(), 23982));
00648   log(isprintf("  -- overhead for cromp transation is %d bytes.",
00649       packed_transa.length() - payload_portion));
00650 #endif
00651 
00652   BASE_LOG(isprintf("sent %.0f items, %.0f bytes, %.0f bytes per item,%s"
00653       "payload %.0f bytes, overhead %.0f bytes, percent overhead %.1f%%,%s"
00654       "in %.2f seconds is %f ms/item%s"
00655       "at %.2f %cb/sec & %.2f items/sec.",
00656       bandwidth.number_of_sends(), bandwidth.bytes_sent(),
00657       bytes_per_item,
00658       log_base::platform_ending(),
00659       payload_portion, overhead, percent_overhead * 100.0,
00660       log_base::platform_ending(),
00661       bandwidth.total_time() / SECOND_ms,
00662       bandwidth.total_time() / bandwidth.number_of_sends(),
00663       log_base::platform_ending(),
00664       (bandwidth.kilobytes_per_second() < 1024.0? 
00665         bandwidth.kilobytes_per_second()
00666         : bandwidth.megabytes_per_second()),
00667       (bandwidth.kilobytes_per_second() < 1024.0? 'K' : 'M'),
00668       bandwidth.number_of_sends() / (bandwidth.total_time() / SECOND_ms)));
00669 }
00670 
00671 void cromp_client_tester::cause_object_reconstruction()
00672 {
00673   FUNCDEF("cause_object_reconstruction");
00674   int rando = chaos().inclusive(1, 100);
00675   if (rando > CHANCE_OF_RECONSTRUCT) return;  // not doing it this time.
00676   LOG(istring("reconstructing client at ") + _server_loc.text_form());
00677   WHACK(_uplink);
00678   _uplink = new cromp_client(_server_loc);
00679 }
00680 
00681 int cromp_client_tester::execute()
00682 {
00683   FUNCDEF("execute");
00684 
00685   // testing that crompish pax are done right.
00686   bubble fud(randomizer().inclusive(12, 2829));
00687   byte_array packed_fud;
00688   fud.pack(packed_fud);
00689   if (packed_fud.length() != fud.packed_size())
00690     deadly_error(class_name(), func, "bubble's packed size method is wrong.");
00691 
00692   if (_encryption) _uplink->enable_encryption();
00693 
00694   outcome ret = _uplink->connect();
00695   if (ret != cromp_client::OKAY) {
00696     deadly_error(class_name(), func, istring("connection failed with error: ")
00697         + cromp_client::outcome_name(ret));
00698   }
00699 
00700   thread_cabinet cab;  // we store a bunch of threads here.
00701 
00702   LOG(isprintf("adding %d grabber threads to test.", _grabber_count));
00703 
00704   // create the extra grabber threads.
00705   for (int i = 0; i < _grabber_count; i++) {
00706     grabby_thread *to_add = new grabby_thread(*this);
00707     cab.add_thread(to_add, false, NIL);
00708   }
00709 
00710   LOG(isprintf("adding %d transmitter threads to test.", _thread_count));
00711 
00712   // create the specified number of threads.
00713   for (int j = 0; j < _thread_count; j++) {
00714     bitey_thread *to_add = new bitey_thread(*this);
00715     cab.add_thread(to_add, false, NIL);
00716   }
00717 
00718 //LOG("starting all threads...");
00719   time_stamp start;
00720   cab.start_all(NIL);
00721 //LOG("done starting threads...");
00722 
00723   portable::sleep_ms(400);  // wait until a few get cranked up.
00724 
00725 //LOG("did our initial sleep...");
00726 
00727   while (cab.any_running()) {
00728     portable::sleep_ms(30);
00729     if (!_threads_active) {
00730       break;
00731     }
00732 //LOG("main loop...");
00733     if (time_stamp(-_report_interval) > _last_report) {
00734       report(start, cromp_common::total_bytes_sent()
00735           + cromp_common::total_bytes_received(),
00736           _finished_loops);
00737       _last_report.reset();
00738     }
00739     if (_reconstruct_object) {
00740       cause_object_reconstruction();
00741     }
00742   }
00743 
00744   LOG("- done testing -");
00745 
00746   if (_finished_loops != double(_thread_count) * _send_count)
00747     LOG(isprintf("number of loops was calculated differently: wanted %d, "
00748         "got %d", _thread_count * _send_count, _finished_loops));
00749 
00750   report(start, cromp_common::total_bytes_sent()
00751       + cromp_common::total_bytes_received(),
00752       _thread_count * _send_count);
00753 
00754 //LOG("stopping all threads...");
00755   cab.stop_all();
00756   LOG("all threads exited.");
00757 
00758 #ifdef DEBUG_TESTER
00761 #endif
00762 
00763   BASE_LOG("cromp_client:: works for those functions tested.");
00764 
00765   return 0;
00766 }
00767 
00769 
00770 HOOPLE_MAIN(cromp_client_tester, )
00771 

Generated on Fri Nov 21 04:30:00 2008 for HOOPLE Libraries by  doxygen 1.5.1