transporter.cpp

Go to the documentation of this file.
00001 /*****************************************************************************\
00002 *                                                                             *
00003 *  Name   : transporter                                                       *
00004 *  Author : Chris Koeritz                                                     *
00005 *                                                                             *
00006 *******************************************************************************
00007 * Copyright (c) 2002-$now By Author.  This program is free software; you can  *
00008 * redistribute it and/or modify it under the terms of the GNU General Public  *
00009 * License as published by the Free Software Foundation; either version 2 of   *
00010 * the License or (at your option) any later version.  This is online at:      *
00011 *     http://www.fsf.org/copyleft/gpl.html                                    *
00012 * Please send any updates to: fred@gruntose.com                               *
00013 \*****************************************************************************/
00014 
00015 #include <basis/byte_array.h>
00016 #include <basis/istring.h>
00017 #include <basis/portable.h>
00018 #include <cromp/cromp_client.h>
00019 #include <cromp/cromp_server.h>
00020 #include <mechanisms/time_stamp.h>
00021 #include <octopus/entity_defs.h>
00022 #include <octopus/tentacle.h>
00023 #include <octopus/unhandled_request.h>
00024 #include <opsystem/application_shell.h>
00025 #include <opsystem/command_line.h>
00026 #include <loggers/console_logger.h>
00027 #include <opsystem/directory_tree.h>
00028 #include <loggers/file_logger.h>
00029 #include <opsystem/filename_list.h>
00030 #include <data_struct/static_memory_gremlin.h>
00031 #include <sockets/address.h>
00032 #include <sockets/machine_uid.h>
00033 #include <sockets/tcpip_stack.h>
00034 #include <tentacles/file_transfer_tentacle.h>
00035 
00036 #define LOG(a) CLASS_EMERGENCY_LOG(program_wide_logger(), a)
00037 
00038 const int REPORTING_INTERVAL = 28 * SECOND_ms;  // how often to squawk.
00039 
00040 const int REFRESH_INTERVAL = 20 * MINUTE_ms;  // how often we check tree.
00041 
00042 const int TRANSFER_PORT = 10808;
00043   // simple port grabbed randomly for the default.
00044 
00045 const int MAX_CHUNK = 2 * MEGABYTE;
00046   // we will transfer fairly large chunks so we can get this done reasonably
00047   // quickly.  even at that size, it shouldn't cause most modern machines to
00048   // hiccup even slightly.
00049 
00051 
00052 class transporter : public application_shell
00053 {
00054 public:
00055   transporter();
00056   ~transporter();
00057 
00058   virtual int execute();
00059 
00060   IMPLEMENT_CLASS_NAME("transporter");
00061 
00062   int push_client_download();
00063     // for a client side download, this prods the transfer process.
00064 
00065   int print_instructions();
00066     // shows the instructions for this application.
00067 
00068 private:
00069   bool _saw_clients;  // true if we ever got a connection.
00070   cromp_server *_server_side;
00071     // provides connection and transmission services for servers.
00072   cromp_client *_client_side;  // client side connection.
00073   bool _leave_when_no_clients;  // true if we should just do one run.
00074   bool _encryption;  // true if we're encrypting.
00075   istring _source;  // the source path which a client will ask the server for.
00076   istring _target;  // the target path where files are stored for the client.
00077   bool _started_okay;  // true if we got past the command line checks.
00078 };
00079 
00081 
00082 transporter::transporter()
00083 : application_shell("transporter"),
00084   _saw_clients(false),
00085   _server_side(NIL),
00086   _client_side(NIL),
00087   _leave_when_no_clients(false),
00088   _encryption(false),
00089   _started_okay(false)
00090 {
00091   FUNCDEF("constructor");
00092   SET_DEFAULT_COMBO_LOGGER;
00093   LOG("");
00094   LOG("");
00095 
00096   command_line args(__argc, __argv);
00097   // check for a port on the command line.
00098   istring port_text;
00099   int port = TRANSFER_PORT;
00100   if (args.get_value("port", port_text, false))
00101     port = port_text.convert(TRANSFER_PORT);
00102   int posn = 0;
00103   if (args.find("exit", posn)) {
00104     LOG("seeing the 'exit without clients' flag set.");
00105     _leave_when_no_clients = true;
00106   }
00107 
00108   int indy = 0;
00109   if (args.find("encrypt", indy, false)
00110       || (args.find('e', indy, false)) ) {
00111 LOG("enabling encryption!");
00112     // they're saying that we should encrypt the communication.
00113     _encryption = true;
00114   }
00115 
00116   bool server = true;
00117   indy = 0;
00118   if (args.find("client", indy, false)) {
00119 LOG("client side chosen");
00120     server = false;
00121   }
00122 
00123   internet_address addr;
00124   addr.port = port;
00125 
00126   // check for a hostname on the command line.
00127   istring hostname("local");
00128   istring host_temp;
00129   if (args.get_value("host", host_temp, false)) {
00130     LOG(istring("using host: ") + host_temp);
00131     hostname = host_temp;
00132   } else LOG(istring("using host: ") + hostname);
00133   strcpy(addr.hostname, hostname.s());
00134 
00135   if (server) {
00136     istring key;
00137     if (!args.get_value("key", key, false)) {
00138       print_instructions();
00139       LOG("No keyword specified on command line.");
00140       return;
00141     }
00142     istring root;
00143     if (!args.get_value("root", root, false)) {
00144       print_instructions();
00145       LOG("No transfer root was specified on the command line.");
00146       return;
00147     }
00148 
00149     LOG("starting transfer server");
00150     _server_side = new cromp_server(cromp_server::any_address(port));
00151     file_transfer_tentacle *new_tent = new file_transfer_tentacle(MAX_CHUNK, false);
00152 LOG(key + " => " + root);
00153     new_tent->add_correspondence(key, root, REFRESH_INTERVAL);
00154     _server_side->add_tentacle(new_tent);
00155     _server_side->enable_servers(_encryption);
00156   } else {
00157     LOG("starting transfer client");
00158     _client_side = new cromp_client(addr);
00159     if (_encryption) _client_side->enable_encryption();
00160 
00161     outcome ret = _client_side->connect();
00162     if (ret != cromp_client::OKAY)
00163       non_continuable_error(class_name(), func, istring("failed to connect to "
00164           "the server: ") + cromp_client::outcome_name(ret));
00165 
00166     file_transfer_tentacle *new_tent = new file_transfer_tentacle(MAX_CHUNK, false);
00167 
00168     if (!args.get_value("source", _source, false)) {
00169       print_instructions();
00170       LOG("No source path was specified on the command line.");
00171       return;
00172     }
00173     if (!args.get_value("target", _target, false)) {
00174       print_instructions();
00175       LOG("No target path was specified on the command line.");
00176       return;
00177     }
00178 
00179     string_array includes;
00180     outcome regis = new_tent->register_file_transfer
00181         (_client_side->entity(), _source, _target, includes);
00182     if (regis != cromp_client::OKAY)
00183       non_continuable_error(class_name(), func, "failed to register transfer");
00184 
00185     _client_side->add_tentacle(new_tent);
00186   }
00187 
00188   _started_okay = true;
00189 
00190 }
00191 
00192 transporter::~transporter()
00193 {
00194   WHACK(_client_side);
00195   WHACK(_server_side);
00196 }
00197 
00198 int transporter::print_instructions()
00199 {
00200   istring name = filename(__argv[0]).basename().raw();
00201   log(isprintf("%s usage:", name.s()));
00202   log("");
00203   log(isprintf("\
00204 This program can transfer directory trees across the network.  It will only\n\
00205 copy the files missing on the client's side given what the server offers.\n\
00206 The program can function as either the server side or the client side.\n\
00207 The available flags are:\n\
00208 \n\
00209 %s --client --host srvname --port P --source key_path --target cli_dest\n\
00210 \n\
00211 The client side needs to know the server host (srvname) and the port where\n\
00212 the server is listening for connections (P).  The client will compare its\n\
00213 local path (cli_dest) with the server's keyed path (key_path) and copy the\n\
00214 files that are missing on the client's side.  The key path will begin with\n\
00215 whatever keyword the server is offering, plus optional additional path\n\
00216 components to retrieve less than the whole tree being served.\n\
00217 \n\
00218 \n\
00219 %s --server --host srvname --port P --key keyname --root srv_path\n\
00220 \n\
00221 The server side needs to know what address and port to listen on (srvname\n\
00222 and P).  It will open a server there that provides a directory hierarchy\n\
00223 starting at the root specified (srv_path).  The directory tree will be known\n\
00224 to clients as the key word (keyname), thus freeing the clients from needing\n\
00225 to know absolute paths on the server.\n\
00226 \n\
00227 ", name.s(), name.s()));
00228 
00229   return 23;
00230 }
00231 
00232 int transporter::push_client_download()
00233 {
00234   FUNCDEF("push_client_download");
00235   // prepare a client request
00236   file_transfer_infoton initiate;
00237   initiate._request = true;
00238   initiate._command = file_transfer_infoton::TREE_COMPARISON;
00239   initiate._src_root = _source;
00240   initiate._dest_root = _target;
00241   directory_tree target_area(_target);
00242   target_area.calculate();
00243   string_set includes;
00244   initiate.package_tree_info(target_area, includes);
00245   octopus_request_id cmd_id;
00246   outcome start_ret = _client_side->submit(initiate, cmd_id);
00247   if (start_ret != tentacle::OKAY)
00248     non_continuable_error(class_name(), func, istring("failed to initiate "
00249         " the transfer: ") + cromp_client::outcome_name(start_ret));
00250 
00251   infoton *start_reply_tmp = NIL;
00252 //hmmm: set timeout appropriate to the speed of the connection!
00253   outcome first_receipt = _client_side->acquire(start_reply_tmp, cmd_id);
00254   if (first_receipt != cromp_client::OKAY)
00255     non_continuable_error(class_name(), func, istring("failed to receive response: ")
00256         + cromp_client::outcome_name(start_ret));
00257   file_transfer_infoton *start_reply = dynamic_cast<file_transfer_infoton *>
00258       (start_reply_tmp);
00259   if (!start_reply)
00260     non_continuable_error(class_name(), func, "failed to cast starting infoton to "
00261         "proper type");
00262 
00263 //debugging start
00264   filename_list diffs;
00265   byte_array pack_copy = start_reply->_packed_data;
00266   if (!diffs.unpack(pack_copy))
00267     non_continuable_error(class_name(), func, "could not unpack filename list!");
00268   LOG(istring("got list of diffs:\n") + diffs.text_form());
00269 //debugging end
00270 
00271   outcome eval_ret = _client_side->octo()->evaluate(start_reply, cmd_id, true);
00272   if (eval_ret != cromp_client::OKAY)
00273     non_continuable_error(class_name(), func, istring("failed to process the "
00274         "start response: ") + cromp_client::outcome_name(eval_ret));
00275 
00276   int iter = 0;
00277 
00278   while (true) {
00279 LOG(isprintf("ongoing chunk %d", ++iter));
00280     // keep going until we find a broken reply.
00281     file_transfer_infoton ongoing;
00282     ongoing._request = true;
00283     ongoing._command = file_transfer_infoton::PLACE_FILE_CHUNKS;
00284     ongoing._src_root = _source;
00285     ongoing._dest_root = _target;
00286 
00287     octopus_request_id cmd_id;
00288     outcome place_ret = _client_side->submit(ongoing, cmd_id);
00289     if (place_ret != cromp_client::OKAY)
00290       non_continuable_error(class_name(), func, istring("failed to send ongoing "
00291           "chunk: ") + cromp_client::outcome_name(place_ret));
00292 
00293     infoton *place_reply_tmp = NIL;
00294 //hmmm: set timeout appropriate to the speed of the connection!
00295     outcome place_receipt = _client_side->acquire(place_reply_tmp, cmd_id);
00296     if (place_receipt != cromp_client::OKAY)
00297       non_continuable_error(class_name(), func, istring("failed to receive "
00298           "place response: ") + cromp_client::outcome_name(place_receipt));
00299 
00300     file_transfer_infoton *place_reply = dynamic_cast<file_transfer_infoton *>
00301         (place_reply_tmp);
00302     if (!place_reply) {
00303       if (dynamic_cast<unhandled_request *>(place_reply_tmp)) {
00304         log("The server does not support file transfers.");
00305         WHACK(place_reply_tmp);
00306         break;
00307       }
00308       non_continuable_error(class_name(), func, "failed to cast storage reply infoton "
00309           "to proper type");
00310     }
00311 
00312     int reply_size = place_reply->_packed_data.length();
00313 
00314     outcome eval_ret2 = _client_side->octo()->evaluate(place_reply, cmd_id, true);
00315     if (eval_ret2 != tentacle::OKAY)
00316       non_continuable_error(class_name(), func, istring("failed to process the "
00317           "place response: ") + cromp_client::outcome_name(eval_ret2));
00318 
00319     if (!reply_size) {
00320       LOG("hit termination condition: no data packed in for file chunks.");
00321       break;
00322     }
00323   }
00324   return 0;
00325 }
00326 
00327 int transporter::execute()
00328 {
00329   FUNCDEF("execute");
00330 
00331   if (!_started_okay) return 32;
00332 
00333   time_stamp next_report(REPORTING_INTERVAL);
00334 
00335   while (true) {
00336     // make sure we didn't see our exit condition.
00337 
00338     if (_server_side && !_server_side->clients() && _leave_when_no_clients
00339         && _saw_clients) {
00340       LOG("exiting now");
00341       break;
00342     }
00343 
00344     if (_client_side) return push_client_download();
00345 
00346     if (time_stamp() > next_report) {
00347       if (_server_side)
00348         LOG(isprintf("There are %d clients.", _server_side->clients()));
00349 //report about client side also.
00350       next_report.reset(REPORTING_INTERVAL);
00351     }
00352 
00353     portable::sleep_ms(100); 
00354   }
00355   return 0;
00356 }
00357 
00359 
00360 HOOPLE_MAIN(transporter, )
00361 

Generated on Fri Nov 28 04:28:51 2008 for HOOPLE Libraries by  doxygen 1.5.1