00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015 #include <basis/byte_array.h>
00016 #include <basis/istring.h>
00017 #include <basis/portable.h>
00018 #include <cromp/cromp_client.h>
00019 #include <cromp/cromp_server.h>
00020 #include <mechanisms/time_stamp.h>
00021 #include <octopus/entity_defs.h>
00022 #include <octopus/tentacle.h>
00023 #include <octopus/unhandled_request.h>
00024 #include <opsystem/application_shell.h>
00025 #include <opsystem/command_line.h>
00026 #include <loggers/console_logger.h>
00027 #include <opsystem/directory_tree.h>
00028 #include <loggers/file_logger.h>
00029 #include <opsystem/filename_list.h>
00030 #include <data_struct/static_memory_gremlin.h>
00031 #include <sockets/address.h>
00032 #include <sockets/machine_uid.h>
00033 #include <sockets/tcpip_stack.h>
00034 #include <tentacles/file_transfer_tentacle.h>
00035
00036 #define LOG(a) CLASS_EMERGENCY_LOG(program_wide_logger(), a)
00037
00038 const int REPORTING_INTERVAL = 28 * SECOND_ms;
00039
00040 const int REFRESH_INTERVAL = 20 * MINUTE_ms;
00041
00042 const int TRANSFER_PORT = 10808;
00043
00044
00045 const int MAX_CHUNK = 2 * MEGABYTE;
00046
00047
00048
00049
00051
00052 class transporter : public application_shell
00053 {
00054 public:
00055 transporter();
00056 ~transporter();
00057
00058 virtual int execute();
00059
00060 IMPLEMENT_CLASS_NAME("transporter");
00061
00062 int push_client_download();
00063
00064
00065 int print_instructions();
00066
00067
00068 private:
00069 bool _saw_clients;
00070 cromp_server *_server_side;
00071
00072 cromp_client *_client_side;
00073 bool _leave_when_no_clients;
00074 bool _encryption;
00075 istring _source;
00076 istring _target;
00077 bool _started_okay;
00078 };
00079
00081
00082 transporter::transporter()
00083 : application_shell("transporter"),
00084 _saw_clients(false),
00085 _server_side(NIL),
00086 _client_side(NIL),
00087 _leave_when_no_clients(false),
00088 _encryption(false),
00089 _started_okay(false)
00090 {
00091 FUNCDEF("constructor");
00092 SET_DEFAULT_COMBO_LOGGER;
00093 LOG("");
00094 LOG("");
00095
00096 command_line args(__argc, __argv);
00097
00098 istring port_text;
00099 int port = TRANSFER_PORT;
00100 if (args.get_value("port", port_text, false))
00101 port = port_text.convert(TRANSFER_PORT);
00102 int posn = 0;
00103 if (args.find("exit", posn)) {
00104 LOG("seeing the 'exit without clients' flag set.");
00105 _leave_when_no_clients = true;
00106 }
00107
00108 int indy = 0;
00109 if (args.find("encrypt", indy, false)
00110 || (args.find('e', indy, false)) ) {
00111 LOG("enabling encryption!");
00112
00113 _encryption = true;
00114 }
00115
00116 bool server = true;
00117 indy = 0;
00118 if (args.find("client", indy, false)) {
00119 LOG("client side chosen");
00120 server = false;
00121 }
00122
00123 internet_address addr;
00124 addr.port = port;
00125
00126
00127 istring hostname("local");
00128 istring host_temp;
00129 if (args.get_value("host", host_temp, false)) {
00130 LOG(istring("using host: ") + host_temp);
00131 hostname = host_temp;
00132 } else LOG(istring("using host: ") + hostname);
00133 strcpy(addr.hostname, hostname.s());
00134
00135 if (server) {
00136 istring key;
00137 if (!args.get_value("key", key, false)) {
00138 print_instructions();
00139 LOG("No keyword specified on command line.");
00140 return;
00141 }
00142 istring root;
00143 if (!args.get_value("root", root, false)) {
00144 print_instructions();
00145 LOG("No transfer root was specified on the command line.");
00146 return;
00147 }
00148
00149 LOG("starting transfer server");
00150 _server_side = new cromp_server(cromp_server::any_address(port));
00151 file_transfer_tentacle *new_tent = new file_transfer_tentacle(MAX_CHUNK, false);
00152 LOG(key + " => " + root);
00153 new_tent->add_correspondence(key, root, REFRESH_INTERVAL);
00154 _server_side->add_tentacle(new_tent);
00155 _server_side->enable_servers(_encryption);
00156 } else {
00157 LOG("starting transfer client");
00158 _client_side = new cromp_client(addr);
00159 if (_encryption) _client_side->enable_encryption();
00160
00161 outcome ret = _client_side->connect();
00162 if (ret != cromp_client::OKAY)
00163 non_continuable_error(class_name(), func, istring("failed to connect to "
00164 "the server: ") + cromp_client::outcome_name(ret));
00165
00166 file_transfer_tentacle *new_tent = new file_transfer_tentacle(MAX_CHUNK, false);
00167
00168 if (!args.get_value("source", _source, false)) {
00169 print_instructions();
00170 LOG("No source path was specified on the command line.");
00171 return;
00172 }
00173 if (!args.get_value("target", _target, false)) {
00174 print_instructions();
00175 LOG("No target path was specified on the command line.");
00176 return;
00177 }
00178
00179 string_array includes;
00180 outcome regis = new_tent->register_file_transfer
00181 (_client_side->entity(), _source, _target, includes);
00182 if (regis != cromp_client::OKAY)
00183 non_continuable_error(class_name(), func, "failed to register transfer");
00184
00185 _client_side->add_tentacle(new_tent);
00186 }
00187
00188 _started_okay = true;
00189
00190 }
00191
00192 transporter::~transporter()
00193 {
00194 WHACK(_client_side);
00195 WHACK(_server_side);
00196 }
00197
00198 int transporter::print_instructions()
00199 {
00200 istring name = filename(__argv[0]).basename().raw();
00201 log(isprintf("%s usage:", name.s()));
00202 log("");
00203 log(isprintf("\
00204 This program can transfer directory trees across the network. It will only\n\
00205 copy the files missing on the client's side given what the server offers.\n\
00206 The program can function as either the server side or the client side.\n\
00207 The available flags are:\n\
00208 \n\
00209 %s --client --host srvname --port P --source key_path --target cli_dest\n\
00210 \n\
00211 The client side needs to know the server host (srvname) and the port where\n\
00212 the server is listening for connections (P). The client will compare its\n\
00213 local path (cli_dest) with the server's keyed path (key_path) and copy the\n\
00214 files that are missing on the client's side. The key path will begin with\n\
00215 whatever keyword the server is offering, plus optional additional path\n\
00216 components to retrieve less than the whole tree being served.\n\
00217 \n\
00218 \n\
00219 %s --server --host srvname --port P --key keyname --root srv_path\n\
00220 \n\
00221 The server side needs to know what address and port to listen on (srvname\n\
00222 and P). It will open a server there that provides a directory hierarchy\n\
00223 starting at the root specified (srv_path). The directory tree will be known\n\
00224 to clients as the key word (keyname), thus freeing the clients from needing\n\
00225 to know absolute paths on the server.\n\
00226 \n\
00227 ", name.s(), name.s()));
00228
00229 return 23;
00230 }
00231
00232 int transporter::push_client_download()
00233 {
00234 FUNCDEF("push_client_download");
00235
00236 file_transfer_infoton initiate;
00237 initiate._request = true;
00238 initiate._command = file_transfer_infoton::TREE_COMPARISON;
00239 initiate._src_root = _source;
00240 initiate._dest_root = _target;
00241 directory_tree target_area(_target);
00242 target_area.calculate();
00243 string_set includes;
00244 initiate.package_tree_info(target_area, includes);
00245 octopus_request_id cmd_id;
00246 outcome start_ret = _client_side->submit(initiate, cmd_id);
00247 if (start_ret != tentacle::OKAY)
00248 non_continuable_error(class_name(), func, istring("failed to initiate "
00249 " the transfer: ") + cromp_client::outcome_name(start_ret));
00250
00251 infoton *start_reply_tmp = NIL;
00252
00253 outcome first_receipt = _client_side->acquire(start_reply_tmp, cmd_id);
00254 if (first_receipt != cromp_client::OKAY)
00255 non_continuable_error(class_name(), func, istring("failed to receive response: ")
00256 + cromp_client::outcome_name(start_ret));
00257 file_transfer_infoton *start_reply = dynamic_cast<file_transfer_infoton *>
00258 (start_reply_tmp);
00259 if (!start_reply)
00260 non_continuable_error(class_name(), func, "failed to cast starting infoton to "
00261 "proper type");
00262
00263
00264 filename_list diffs;
00265 byte_array pack_copy = start_reply->_packed_data;
00266 if (!diffs.unpack(pack_copy))
00267 non_continuable_error(class_name(), func, "could not unpack filename list!");
00268 LOG(istring("got list of diffs:\n") + diffs.text_form());
00269
00270
00271 outcome eval_ret = _client_side->octo()->evaluate(start_reply, cmd_id, true);
00272 if (eval_ret != cromp_client::OKAY)
00273 non_continuable_error(class_name(), func, istring("failed to process the "
00274 "start response: ") + cromp_client::outcome_name(eval_ret));
00275
00276 int iter = 0;
00277
00278 while (true) {
00279 LOG(isprintf("ongoing chunk %d", ++iter));
00280
00281 file_transfer_infoton ongoing;
00282 ongoing._request = true;
00283 ongoing._command = file_transfer_infoton::PLACE_FILE_CHUNKS;
00284 ongoing._src_root = _source;
00285 ongoing._dest_root = _target;
00286
00287 octopus_request_id cmd_id;
00288 outcome place_ret = _client_side->submit(ongoing, cmd_id);
00289 if (place_ret != cromp_client::OKAY)
00290 non_continuable_error(class_name(), func, istring("failed to send ongoing "
00291 "chunk: ") + cromp_client::outcome_name(place_ret));
00292
00293 infoton *place_reply_tmp = NIL;
00294
00295 outcome place_receipt = _client_side->acquire(place_reply_tmp, cmd_id);
00296 if (place_receipt != cromp_client::OKAY)
00297 non_continuable_error(class_name(), func, istring("failed to receive "
00298 "place response: ") + cromp_client::outcome_name(place_receipt));
00299
00300 file_transfer_infoton *place_reply = dynamic_cast<file_transfer_infoton *>
00301 (place_reply_tmp);
00302 if (!place_reply) {
00303 if (dynamic_cast<unhandled_request *>(place_reply_tmp)) {
00304 log("The server does not support file transfers.");
00305 WHACK(place_reply_tmp);
00306 break;
00307 }
00308 non_continuable_error(class_name(), func, "failed to cast storage reply infoton "
00309 "to proper type");
00310 }
00311
00312 int reply_size = place_reply->_packed_data.length();
00313
00314 outcome eval_ret2 = _client_side->octo()->evaluate(place_reply, cmd_id, true);
00315 if (eval_ret2 != tentacle::OKAY)
00316 non_continuable_error(class_name(), func, istring("failed to process the "
00317 "place response: ") + cromp_client::outcome_name(eval_ret2));
00318
00319 if (!reply_size) {
00320 LOG("hit termination condition: no data packed in for file chunks.");
00321 break;
00322 }
00323 }
00324 return 0;
00325 }
00326
00327 int transporter::execute()
00328 {
00329 FUNCDEF("execute");
00330
00331 if (!_started_okay) return 32;
00332
00333 time_stamp next_report(REPORTING_INTERVAL);
00334
00335 while (true) {
00336
00337
00338 if (_server_side && !_server_side->clients() && _leave_when_no_clients
00339 && _saw_clients) {
00340 LOG("exiting now");
00341 break;
00342 }
00343
00344 if (_client_side) return push_client_download();
00345
00346 if (time_stamp() > next_report) {
00347 if (_server_side)
00348 LOG(isprintf("There are %d clients.", _server_side->clients()));
00349
00350 next_report.reset(REPORTING_INTERVAL);
00351 }
00352
00353 portable::sleep_ms(100);
00354 }
00355 return 0;
00356 }
00357
00359
00360 HOOPLE_MAIN(transporter, )
00361