file_transfer_tentacle.cpp

Go to the documentation of this file.
00001 #ifndef FILE_TRANSFER_TENTACLE_IMPLEMENTATION_FILE
00002 #define FILE_TRANSFER_TENTACLE_IMPLEMENTATION_FILE
00003 
00004 /*****************************************************************************\
00005 *                                                                             *
00006 *  Name   : file_transfer_tentacle                                            *
00007 *  Author : Chris Koeritz                                                     *
00008 *                                                                             *
00009 *******************************************************************************
00010 * Copyright (c) 2005-$now By Author.  This program is free software; you can  *
00011 * redistribute it and/or modify it under the terms of the GNU General Public  *
00012 * License as published by the Free Software Foundation; either version 2 of   *
00013 * the License or (at your option) any later version.  This is online at:      *
00014 *     http://www.fsf.org/copyleft/gpl.html                                    *
00015 * Please send any updates to: fred@gruntose.com                               *
00016 \*****************************************************************************/
00017 
00018 #include "file_transfer_tentacle.h"
00019 
00020 #include <basis/auto_synch.h>
00021 #include <basis/log_base.h>
00022 #include <basis/mutex.h>
00023 #include <mechanisms/ithread.h>
00024 #include <octopus/entity_defs.h>
00025 #include <octopus/unhandled_request.h>
00026 #include <opsystem/directory_tree.h>
00027 #include <opsystem/filename.h>
00028 #include <opsystem/filename_list.h>
00029 #include <opsystem/heavy_file_ops.h>
00030 
00031 #undef AUTO_LOCK
00032 #define AUTO_LOCK auto_synchronizer loc(*_lock);
00033   // protects our lists.
00034 
00035 const int FTT_CLEANING_INTERVAL = 30 * SECOND_ms;
00036   // this is how frequently we clean up the list to remove outdated transfers.
00037 
00038 const int TRANSFER_TIMEOUT = 10 * MINUTE_ms;
00039   // if it hasn't been touched in this long, it's out of there.
00040 
00041 #define DEBUG_FILE_TRANSFER_TENTACLE
00042   // uncomment for noisier version.
00043 
00044 #undef LOG
00045 #define LOG(s) CLASS_EMERGENCY_LOG(program_wide_logger(), s)
00046 
00048 
00049 class file_transfer_record 
00050 {
00051 public:
00052   // valid for both transfers and correspondences.
00053   istring _src_root;  // where the info is on the data provider.
00054   time_stamp _last_active;  // when this was last used.
00055 
00056   // valid for file transfers only.
00057   octopus_entity _ent;  // the entity requesting this service.
00058   istring _dest_root;  // where the info is on the data sink.
00059   filename_list *_diffs;  // the differences to be transferred.
00060   file_transfer_header _last_sent;  // the last chunk that was sent.
00061   bool _done;  // true if the transfer is finished.
00062   string_array _includes;  // the set to include.
00063 
00064   // valid for correspondence records only.
00065   directory_tree *_local_dir;  // our local information about the transfer.
00066   istring _source_mapping;  // valid for a correspondence record.
00067   int _refresh_interval;  // the rate of refreshing the source tree.
00068 
00069   file_transfer_record() : _diffs(NIL), _done(false), _local_dir(NIL) {}
00070 
00071   ~file_transfer_record() {
00072     WHACK(_local_dir);
00073     WHACK(_diffs);
00074   }
00075 
00076   istring text_form() const {
00077     istring to_return;
00078     to_return += istring("src=") + _src_root + istring(" last act=")
00079         + _last_active.text_form();
00080     if (_ent.blank()) to_return += istring(" ent=") + _ent.text_form();
00081     if (_dest_root.t()) {
00082       to_return += istring(" dest=") + _dest_root;
00083       to_return += istring(" last_sent=") + _last_sent.text_form();
00084     }
00085     return to_return;
00086   }
00087 };
00088 
00090 
00091 // this implementation assumes that the same entity will never simultaneously
00092 // transfer the same source to the same destination.  that assumption holds
00093 // up fine for different clients, since they should have different entities.
00094 // when there is a collision on the entity/src/dest, then the default action
00095 // is to assume that the transfer is just being started over.
00096 
00097 class file_transfer_status : public amorph<file_transfer_record>
00098 {
00099 public:
00100   // find a transfer record by the key fields.
00101   file_transfer_record *find(const octopus_entity &ent, const istring &src,
00102       const istring &dest) {
00103     for (int i = 0; i < elements(); i++) {
00104       const file_transfer_record *rec = get(i);
00105       if (rec && (rec->_ent == ent) && (rec->_src_root == src)
00106           && (rec->_dest_root == dest) ) {
00107         return borrow(i);
00108       }
00109     }
00110     return NIL;
00111   }
00112 
00113   virtual ~file_transfer_status() {}
00114 
00115   IMPLEMENT_CLASS_NAME("file_transfer_status");
00116 
00117   // find a file correspondence record by the mapping name.
00118   file_transfer_record *find_mapping(const istring &source_mapping) {
00119     for (int i = 0; i < elements(); i++) {
00120       const file_transfer_record *rec = get(i);
00121       if (rec && (rec->_source_mapping == source_mapping) )
00122         return borrow(i);
00123     }
00124     return NIL;
00125   }
00126 
00127   // turns a source mapping into the location that it corresponds to.
00128   istring translate(const istring &source_path) const {
00129     FUNCDEF("translate");
00130     string_array pieces;
00131     filename(source_path).separate(pieces);
00132     istring source_mapping = pieces[0];
00133     pieces.zap(0, 0);  // remove source part.
00134 
00135     for (int i = 0; i < elements(); i++) {
00136       const file_transfer_record *rec = get(i);
00137       if (rec && (rec->_source_mapping == source_mapping) ) {
00138         return rec->_src_root;
00139       }
00140     }
00141     return istring::empty_string();
00142   }
00143 
00144   // removes a file transfer record by the key fields.
00145   bool whack(const octopus_entity &ent, const istring &src,
00146       const istring &dest) {
00147     for (int i = 0; i < elements(); i++) {
00148       const file_transfer_record *rec = get(i);
00149       if (rec && (rec->_ent == ent) && (rec->_src_root == src)
00150           && (rec->_dest_root == dest) ) {
00151         zap(i, i);
00152         return true;
00153       }
00154     }
00155     return false;
00156   }
00157 
00158   // clean all records for the entity "ent".
00159   void whack_all(const octopus_entity &ent) {
00160     for (int i = elements() - 1; i >= 0; i--) {
00161       const file_transfer_record *rec = get(i);
00162       if (rec && (rec->_ent == ent) )
00163         zap(i, i);
00164     }
00165   }
00166 
00167   // removes a file transfer correspondence.
00168   bool whack_mapping(const istring &source_mapping) {
00169     for (int i = elements() - 1; i >= 0; i--) {
00170       const file_transfer_record *rec = get(i);
00171       if (rec && (rec->_source_mapping == source_mapping) ) {
00172         zap(i, i);
00173         return true;
00174       }
00175     }
00176     return false;
00177   }
00178 
00179   // returns a string dump of the fields in this list.
00180   istring text_form() const {
00181     istring to_return;
00182     for (int i = 0; i < elements(); i++) {
00183       const file_transfer_record *rec = get(i);
00184       if (rec)
00185         to_return += rec->text_form() + log_base::platform_ending();
00186     }
00187     return to_return;
00188   }
00189 };
00190 
00192 
00193 class file_transfer_cleaner : public ithread
00194 {
00195 public:
00196   file_transfer_cleaner(file_transfer_tentacle &parent)
00197       : ithread(FTT_CLEANING_INTERVAL, SLACK_INTERVAL), _parent(parent) {}
00198 
00199   virtual void perform_activity(void *formal(ptr)) { _parent.periodic_actions(); }
00200 
00201 private:
00202   file_transfer_tentacle &_parent;
00203 };
00204 
00206 
00207 file_transfer_tentacle::file_transfer_tentacle(int maximum_transfer,
00208     bool only_provide_comparison)
00209 : tentacle_helper<file_transfer_infoton>
00210       (file_transfer_infoton::file_transfer_classifier(), false),
00211   _maximum_transfer(maximum_transfer),
00212   _transfers(new file_transfer_status),
00213   _correspondences(new file_transfer_status),
00214   _lock(new mutex),
00215   _cleaner(new file_transfer_cleaner(*this)),
00216   _only_comparisons(only_provide_comparison)
00217 {
00218   _cleaner->start(NIL);
00219 }
00220 
00221 file_transfer_tentacle::~file_transfer_tentacle()
00222 {
00223   _cleaner->stop();
00224   WHACK(_transfers);
00225   WHACK(_correspondences);
00226   WHACK(_cleaner);
00227   WHACK(_lock);
00228 }
00229 
00230 istring file_transfer_tentacle::text_form() const
00231 {
00232   AUTO_LOCK;
00233   return _transfers->text_form();
00234 }
00235 
00236 void file_transfer_tentacle::expunge(const octopus_entity &to_remove)
00237 {
00238   AUTO_LOCK;
00239   _transfers->whack_all(to_remove);
00240 }
00241 
00242 outcome file_transfer_tentacle::add_correspondence
00243     (const istring &source_mapping, const istring &source_root,
00244      int refresh_interval)
00245 {
00246   FUNCDEF("add_correspondence");
00247   AUTO_LOCK;
00248 
00249   remove_correspondence(source_mapping);  // clean the old one out first.
00250 
00251   // create new file transfer record to hold this correspondence.
00252   file_transfer_record *new_record = new file_transfer_record;
00253   new_record->_source_mapping = source_mapping;
00254   new_record->_src_root = source_root;
00255   new_record->_refresh_interval = refresh_interval;
00256   new_record->_local_dir = new directory_tree(source_root);
00257 //hmmm: doesn't say anything about a pattern.  do we need to worry about that?
00258 
00259   // check that the directory looked healthy.
00260   if (!new_record->_local_dir->good()) {
00261     WHACK(new_record);
00262     return common::ACCESS_DENIED;
00263   }
00264 #ifdef DEBUG_FILE_TRANSFER_TENTACLE
00265   LOG(istring("adding tree for: ent=") + new_record->_ent.text_form()
00266       + " src=" + new_record->_src_root + " dest=" + new_record->_dest_root);
00267 #endif
00268   // calculate size and checksum info for the directory.
00269   new_record->_local_dir->calculate();
00270 
00271 #ifdef DEBUG_FILE_TRANSFER_TENTACLE
00272   LOG(istring("done adding tree for: ent=") + new_record->_ent.text_form()
00273       + " src=" + new_record->_src_root + " dest=" + new_record->_dest_root);
00274 #endif
00275 
00276   _correspondences->append(new_record);
00277 
00278   return OKAY;
00279 }
00280 
00281 outcome file_transfer_tentacle::remove_correspondence
00282     (const istring &source_mapping)
00283 {
00284   AUTO_LOCK;
00285   if (!_correspondences->whack_mapping(source_mapping))
00286     return NOT_FOUND;
00287   return OKAY;
00288 }
00289 
00290 bool file_transfer_tentacle::get_differences(const octopus_entity &ent,
00291     const istring &src, const istring &dest, filename_list &diffs)
00292 {
00293   FUNCDEF("get_differences");
00294   diffs.reset();
00295   AUTO_LOCK;
00296   file_transfer_record *the_rec = _transfers->find(ent, src, dest);
00297   if (!the_rec) return false;
00298   if (!the_rec->_diffs) return false;  // no diffs listed.
00299   diffs = *the_rec->_diffs;
00300   return true;
00301 }
00302 
00303 bool file_transfer_tentacle::status(const octopus_entity &ent,
00304     const istring &src, const istring &dest, double &total_size,
00305     int &total_files, double &current_size, int &current_files, bool &done,
00306     time_stamp &last_active)
00307 {
00308   FUNCDEF("status");
00309   total_size = 0;
00310   total_files = 0;
00311   current_files = 0;
00312   current_size = 0;
00313   AUTO_LOCK;
00314   file_transfer_record *the_rec = _transfers->find(ent, src, dest);
00315   if (!the_rec) return false;
00316   done = the_rec->_done;
00317   last_active = the_rec->_last_active;
00318 
00319   if (the_rec->_diffs) {
00320     the_rec->_diffs->calculate_progress(the_rec->_last_sent._filename,
00321         the_rec->_last_sent._byte_start + the_rec->_last_sent._length,
00322         current_files, current_size);
00323     total_files = the_rec->_diffs->total_files();
00324     total_size = the_rec->_diffs->total_size();
00325   }
00326 
00327   return true;
00328 }
00329 
00330 outcome file_transfer_tentacle::register_file_transfer
00331     (const octopus_entity &ent, const istring &src_root,
00332     const istring &dest_root, const string_array &includes)
00333 {
00334   FUNCDEF("register_file_transfer");
00335   AUTO_LOCK;
00336   // make sure that this isn't an existing transfer.  if so, we just update
00337   // the status.
00338   file_transfer_record *the_rec = _transfers->find(ent, src_root, dest_root);
00339   if (!the_rec) {
00340     the_rec = new file_transfer_record;
00341     the_rec->_src_root = src_root;
00342     the_rec->_dest_root = dest_root;
00343     the_rec->_ent = ent;
00344     the_rec->_includes = includes;
00345     _transfers->append(the_rec);  // add the new record.
00346   } else {
00347     the_rec->_done = false;
00348     the_rec->_includes = includes;
00349     the_rec->_last_active.reset();  // freshen up the last activity time.
00350   }
00351   return OKAY;
00352 }
00353 
00354 outcome file_transfer_tentacle::cancel_file_transfer(const octopus_entity &ent,
00355     const istring &src_root, const istring &dest_root)
00356 {
00357   AUTO_LOCK;
00358   return _transfers->whack(ent, src_root, dest_root)?  OKAY : NOT_FOUND;
00359 }
00360 
00361 directory_tree *file_transfer_tentacle::lock_directory(const istring &key)
00362 {
00363   _lock->lock();
00364   file_transfer_record *the_rec = _correspondences->find_mapping(key);
00365   if (!the_rec || !the_rec->_local_dir) {
00366     _lock->unlock();
00367     return NIL;  // unknown transfer.
00368   }
00369   return the_rec->_local_dir;
00370 }
00371 
00372 void file_transfer_tentacle::unlock_directory()
00373 {
00374   _lock->unlock();
00375 }
00376 
00377 bool file_transfer_tentacle::add_path(const istring &key,
00378     const istring &new_path)
00379 {
00380   AUTO_LOCK;
00381   file_transfer_record *the_rec = _correspondences->find_mapping(key);
00382   if (!the_rec) return false;  // unknown transfer.
00383   if (!the_rec->_local_dir) return false;  // not right type.
00384   return the_rec->_local_dir->add_path(new_path) == common::OKAY;
00385 }
00386 
00387 bool file_transfer_tentacle::remove_path(const istring &key,
00388     const istring &old_path)
00389 {
00390   AUTO_LOCK;
00391   file_transfer_record *the_rec = _correspondences->find_mapping(key);
00392   if (!the_rec) return false;  // unknown transfer.
00393   if (!the_rec->_local_dir) return false;  // not right type.
00394   return the_rec->_local_dir->remove_path(old_path) == common::OKAY;
00395 }
00396 
00397 void file_transfer_tentacle::periodic_actions()
00398 {
00399   FUNCDEF("periodic_actions");
00400   AUTO_LOCK;
00401 
00402   // first, we'll clean out old transfers.
00403   time_stamp oldest_allowed(-TRANSFER_TIMEOUT);
00404     // nothing older than this should be kept.
00405   for (int i = _transfers->elements() - 1; i >= 0; i--) {
00406     const file_transfer_record *curr = _transfers->get(i);
00407     if (curr->_last_active < oldest_allowed) {
00408 #ifdef DEBUG_FILE_TRANSFER_TENTACLE
00409       LOG(istring("cleaning record for: ent=") + curr->_ent.text_form()
00410           + " src=" + curr->_src_root + " dest=" + curr->_dest_root);
00411 #endif
00412       _transfers->zap(i, i);
00413     }
00414   }
00415 
00416   // then we'll rescan any trees that are ready for it.
00417   for (int i = 0; i < _correspondences->elements(); i++) {
00418     file_transfer_record *curr = _correspondences->borrow(i);
00419     if (curr->_last_active < time_stamp(-curr->_refresh_interval)) {
00420       if (curr->_local_dir) {
00421 #ifdef DEBUG_FILE_TRANSFER_TENTACLE
00422         LOG(istring("refreshing tree for: ent=") + curr->_ent.text_form()
00423             + " src=" + curr->_src_root + " dest=" + curr->_dest_root);
00424 #endif
00425         WHACK(curr->_local_dir);
00426         curr->_local_dir = new directory_tree(curr->_src_root);
00427         curr->_local_dir->calculate();
00428 #ifdef DEBUG_FILE_TRANSFER_TENTACLE
00429         LOG(istring("done refreshing tree for: ent=") + curr->_ent.text_form()
00430             + " src=" + curr->_src_root + " dest=" + curr->_dest_root);
00431 #endif
00432       }
00433       curr->_last_active.reset();  // reset our action time.
00434     }
00435   }
00436 }
00437 
00438 outcome file_transfer_tentacle::reconstitute(const string_array &classifier,
00439     byte_array &packed_form, infoton * &reformed)
00440 {
00441   // this method doesn't use the lists, so it doesn't need locking.
00442   if (classifier != file_transfer_infoton::file_transfer_classifier())
00443     return NO_HANDLER;
00444   return reconstituter(classifier, packed_form, reformed,
00445       (file_transfer_infoton *)NIL);
00446 }
00447 
00448 // the "handle_" methods are thread-safe because the mutex is locked before
00449 // their invocations.
00450 
00451 outcome file_transfer_tentacle::handle_tree_compare_request
00452     (file_transfer_infoton &req, const octopus_request_id &item_id)
00453 {
00454   FUNCDEF("handle_tree_compare_request");
00455 
00456   // get the mapping from the specified location on this side.
00457   filename splitting(req._src_root);
00458   string_array pieces;
00459   splitting.separate(pieces);
00460   istring source_mapping = pieces[0];
00461 
00462   // patch the name up to find the sub_path for the source.
00463   filename source_start;
00464   pieces.zap(0, 0);
00465   source_start.join(pieces);
00466 
00467   // locate the allowed transfer depot for the mapping they provided.
00468   file_transfer_record *mapping_record
00469       = _correspondences->find_mapping(source_mapping);
00470   if (!mapping_record) {
00471     LOG(istring("could not find source mapping of ") + source_mapping);
00472     return NOT_FOUND;
00473   }
00474 
00475   // unpack the tree that they sent us which describes their local area.
00476   directory_tree *dest_tree = new directory_tree;
00477   if (!dest_tree->unpack(req._packed_data)) {
00478     LOG(istring("could not unpack requester's directory tree"));
00479     WHACK(dest_tree);
00480     return GARBAGE;
00481   }
00482 
00483   string_array requested_names;
00484   if (!requested_names.unpack(req._packed_data)) {
00485     LOG(istring("could not unpack requester's filename includes"));
00486     WHACK(dest_tree);
00487     return GARBAGE;
00488   }
00489 
00490   // look up to see if this is about something that has already been seen.
00491   // we don't want to add a new transfer record if they're already working on
00492   // this.  that also lets them do a new tree compare to restart the transfer.
00493   file_transfer_record *the_rec = _transfers->find(item_id._entity,
00494       req._src_root, req._dest_root);
00495   if (!the_rec) {
00496     // there was no existing record; we'll create a new one.
00497     the_rec = new file_transfer_record;
00498     the_rec->_ent = item_id._entity;
00499     the_rec->_src_root = req._src_root;
00500     the_rec->_dest_root = req._dest_root;
00501     _transfers->append(the_rec);
00502   } else {
00503     // record some activity on this record.
00504     the_rec->_done = false;
00505     the_rec->_last_active.reset();
00506   }
00507 
00508   the_rec->_diffs = new filename_list;
00509 
00510   // compare the two trees of files.
00511   directory_tree::compare_trees(*mapping_record->_local_dir,
00512       source_start.raw(), *dest_tree, istring::empty_string(),
00513       *the_rec->_diffs);
00514 
00515   // now prune the diffs to accord with what they claim they want.
00516   if (requested_names.length()) {
00517     for (int i = the_rec->_diffs->elements() - 1; i >= 0; i--) {
00518       filename diff_curr = *the_rec->_diffs->get(i);
00519       bool found = false;
00520       for (int j = 0; j < requested_names.length(); j++) {
00521         filename req_curr(requested_names[j]);
00522         if (req_curr.compare_suffix(diff_curr)) {
00523           found = true;
00524           break;
00525         }
00526       }
00527       if (!found) the_rec->_diffs->zap(i, i);
00528     }
00529   }
00530 
00531   req._packed_data.reset();  // clear out existing stuff before cloning.
00532   file_transfer_infoton *reply = (file_transfer_infoton *)req.clone();
00533   the_rec->_diffs->pack(reply->_packed_data);
00534 
00535 //hmmm: does the other side really need the list of filenames?  i guess we
00536 //      could check validity of what's transferred or check space available
00537 //      before the client starts the transfer.
00538 
00539   reply->_request = false;  // it's a response now.
00540   store_product(reply, item_id);
00541     // send back the comparison list.
00542 
00543   return OKAY;
00544 }
00545 
00546 outcome file_transfer_tentacle::handle_tree_compare_response
00547     (file_transfer_infoton &resp, const octopus_request_id &item_id)
00548 {
00549   FUNCDEF("handle_tree_compare_response");
00550   file_transfer_record *the_rec = _transfers->find(item_id._entity,
00551       resp._src_root, resp._dest_root);
00552   if (!the_rec) {
00553     LOG(istring("could not find the record for this transfer: item=")
00554         + item_id.text_form() + " src=" + resp._src_root + " dest="
00555         + resp._dest_root);
00556     return NOT_FOUND;  // not registered, so reject it.
00557   }
00558 
00559   the_rec->_last_active.reset();  // record some activity on this record.
00560 
00561   filename_list *flist = new filename_list;
00562   if (!flist->unpack(resp._packed_data)) {
00563     WHACK(flist);
00564     return GARBAGE;
00565   }
00566 
00567 //hmmm: verify space on device?
00568 
00569   the_rec->_diffs = flist;  // set the list of differences.
00570   return OKAY;
00571 }
00572 
00573 outcome file_transfer_tentacle::handle_storage_request
00574     (file_transfer_infoton &req, const octopus_request_id &item_id)
00575 {
00576   FUNCDEF("handle_storage_request");
00577   if (_only_comparisons) {
00578     // store an unhandled infoton.
00579     unhandled_request *deny = new unhandled_request(item_id, req.classifier(),
00580         NO_HANDLER);
00581     store_product(deny, item_id);
00582     return NO_HANDLER;
00583   }
00584 
00585   // look up the transfer record.
00586   file_transfer_record *the_rec = _transfers->find(item_id._entity,
00587       req._src_root, req._dest_root);
00588   if (!the_rec) {
00589     LOG(istring("could not find the record for this transfer: item=")
00590         + item_id.text_form() + " src=" + req._src_root + " dest="
00591         + req._dest_root);
00592     return NOT_FOUND;  // not registered, so reject it.
00593   }
00594 
00595   the_rec->_last_active.reset();  // mark it as still active.
00596 
00597   file_transfer_infoton *resp = (file_transfer_infoton *)req.clone();
00598 
00599   if (!the_rec->_diffs) return BAD_INPUT;  // wrong type of object.
00600 
00601   outcome bufret = heavy_file_operations::buffer_files
00602       (_correspondences->translate(the_rec->_src_root), *the_rec->_diffs,
00603       the_rec->_last_sent, resp->_packed_data, _maximum_transfer);
00604   if (bufret != OKAY) {
00605     // complain, but still send.
00606     LOG(istring("buffer files returned an error on item=")
00607         + item_id.text_form() + " src=" + req._src_root + " dest="
00608         + req._dest_root);
00609   }
00610 
00611   if ( (bufret == OKAY) && !resp->_packed_data.length() ) {
00612     // seems like the transfer is done.
00613 
00614     the_rec->_done = true;
00615 //hmmm: mark the record and time out faster?
00616   }
00617 
00618   resp->_request = false;  // it's a response now.
00619   store_product(resp, item_id);
00620   return bufret;
00621 }
00622 
00623 outcome file_transfer_tentacle::handle_storage_response
00624     (file_transfer_infoton &resp, const octopus_request_id &item_id)
00625 {
00626   FUNCDEF("handle_storage_response");
00627   if (_only_comparisons) {
00628     // not spoken here.
00629     return NO_HANDLER;
00630   }
00631 
00632   // look up the transfer record.
00633   file_transfer_record *the_rec = _transfers->find(item_id._entity,
00634       resp._src_root, resp._dest_root);
00635   if (!the_rec) return NOT_FOUND;  // not registered, so reject it.
00636 
00637   the_rec->_last_active.reset();  // mark it as still active.
00638 
00639   if (!resp._packed_data.length()) {
00640     // mark that we're done now.
00641     the_rec->_done = true;
00642   }
00643 
00644   // chew on all the things they sent us.
00645   while (resp._packed_data.length()) {
00646     file_transfer_header found;
00647     if (!found.unpack(resp._packed_data)) {
00648       // bomb out now.
00649       LOG(istring("corruption seen on item=") + item_id.text_form()
00650           + " src=" + resp._src_root + " dest=" + resp._dest_root);
00651       return GARBAGE;
00652     }
00653 
00654     the_rec->_last_sent = found;
00655 
00656     if (found._length > resp._packed_data.length()) {
00657       // another case for leaving--not enough data left in the buffer.
00658       LOG(istring("data underflow seen on item=") + item_id.text_form()
00659           + " src=" + resp._src_root + " dest=" + resp._dest_root);
00660       return GARBAGE;
00661     }
00662     byte_array to_write = resp._packed_data.subarray(0, found._length - 1);
00663     resp._packed_data.zap(0, found._length - 1);
00664 
00665     if (!the_rec->_diffs) return BAD_INPUT;
00666 
00667     const file_info *recorded_info = the_rec->_diffs->find(found._filename);
00668     if (!recorded_info) {
00669       LOG(istring("unrequested file seen: ") + found._filename);
00670       continue;  // maybe there are others that aren't confused.
00671     }
00672 
00673     istring full_file = resp._dest_root + filename::default_separator()
00674         + recorded_info->secondary();
00675 
00676     outcome ret = heavy_file_operations::write_file_chunk(full_file,
00677         found._byte_start, to_write);
00678     if (ret != OKAY) {
00679       LOG(istring("failed to write file chunk: error=")
00680           + heavy_file_operations::outcome_name(ret) + " file=" + full_file
00681           + isprintf(" start=%d len=%d", found._byte_start, found._length));
00682     }
00683   }
00684 
00685   // there is no response product to store.
00686   return OKAY;
00687 }
00688 
00689 // this is the only method that is allowed to invoke the "handle_X" methods
00690 // and it must lock the object beforehand.
00691 
00692 outcome file_transfer_tentacle::consume(infoton &to_chow,
00693     const octopus_request_id &item_id, byte_array &transformed)
00694 {
00695   FUNCDEF("consume");
00696   transformed.reset();
00697   file_transfer_infoton *inf = dynamic_cast<file_transfer_infoton *>(&to_chow);
00698   if (!inf) return DISALLOWED;  // not for us.
00699 
00700   AUTO_LOCK;  // protect our lists while we're working on them.
00701 
00702   switch (inf->_command) {
00703     case file_transfer_infoton::TREE_COMPARISON: {
00704       if (inf->_request) return handle_tree_compare_request(*inf, item_id);
00705       else return handle_tree_compare_response(*inf, item_id);
00706     }
00707     case file_transfer_infoton::PLACE_FILE_CHUNKS: {
00708       if (inf->_request) return handle_storage_request(*inf, item_id);
00709       else return handle_storage_response(*inf, item_id);
00710     }
00711   }
00712   return BAD_INPUT;  // not a recognized command.
00713 }
00714 
00715 outcome file_transfer_tentacle::refresh_now(const istring &source_mapping)
00716 {
00717   FUNCDEF("refresh_now");
00718   AUTO_LOCK;
00719   for (int i = 0; i < _correspondences->elements(); i++) {
00720     file_transfer_record *curr = _correspondences->borrow(i);
00721     if (!curr) continue;
00722     if (curr->_source_mapping != source_mapping) continue;
00723     if (curr->_local_dir) {
00724 #ifdef DEBUG_FILE_TRANSFER_TENTACLE
00725       LOG(istring("refreshing tree for: ent=") + curr->_ent.text_form()
00726           + " src=" + curr->_src_root + " dest=" + curr->_dest_root);
00727 #endif
00728       WHACK(curr->_local_dir);
00729       curr->_local_dir = new directory_tree(curr->_src_root);
00730       curr->_local_dir->calculate();
00731 #ifdef DEBUG_FILE_TRANSFER_TENTACLE
00732       LOG(istring("done refreshing tree for: ent=") + curr->_ent.text_form()
00733           + " src=" + curr->_src_root + " dest=" + curr->_dest_root);
00734 #endif
00735     }
00736     curr->_last_active.reset();  // reset our action time.
00737     return OKAY;
00738   }
00739   return NOT_FOUND;
00740 }
00741 
00742 
00743 #endif //FILE_TRANSFER_TENTACLE_IMPLEMENTATION_FILE
00744 

Generated on Fri Nov 28 04:29:32 2008 for HOOPLE Libraries by  doxygen 1.5.1