00001 #ifndef TENTACLE_IMPLEMENTATION_FILE
00002 #define TENTACLE_IMPLEMENTATION_FILE
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018 #include "entity_data_bin.h"
00019 #include "entity_defs.h"
00020 #include "infoton.h"
00021 #include "tentacle.h"
00022
00023 #include <basis/istring.h>
00024 #include <basis/log_base.h>
00025 #include <basis/mutex.h>
00026 #include <data_struct/amorph.cpp>
00027 #include <mechanisms/ithread.h>
00028
00029
00030
00031
00032 #undef GRAB_CONSUMER_LOCK
00033 #define GRAB_CONSUMER_LOCK auto_synchronizer l(*_input_guard)
00034
00035 #undef LOG
00036 #define LOG(t) CLASS_EMERGENCY_LOG(program_wide_logger(), t)
00037
00039
00040 struct infoton_record {
00041 infoton *_product;
00042 octopus_request_id _id;
00043
00044 infoton_record(infoton *product, octopus_request_id id)
00045 : _product(product), _id(id) {}
00046
00047 ~infoton_record() { WHACK(_product); }
00048 };
00049
00050 class queueton : public amorph<infoton_record> {};
00051
00053
00054 class pod_motivator : public ithread
00055 {
00056 public:
00057 pod_motivator(tentacle &parent, int motivational_rate)
00058 : ithread(motivational_rate, ithread::SLACK_INTERVAL),
00059 _parent(parent) {}
00060
00061 void perform_activity(void *formal(ptr)) { _parent.propel_arm(); }
00062
00063 private:
00064 tentacle &_parent;
00065 };
00066
00068
00069 tentacle::tentacle(const string_array &group_name, bool backgrounded,
00070 int motivational_rate)
00071 : _group(new string_array(group_name)),
00072 _pending(new queueton),
00073 _input_guard(new mutex),
00074 _action(NIL),
00075 _products(NIL),
00076 _backgrounded(backgrounded)
00077 {
00078
00079 if (backgrounded)
00080 _action = new pod_motivator(*this, motivational_rate);
00081 }
00082
00083 tentacle::~tentacle()
00084 {
00085 if (_action) _action->stop();
00086 WHACK(_action);
00087 WHACK(_group);
00088 WHACK(_pending);
00089 WHACK(_input_guard);
00090 }
00091
00092 const string_array &tentacle::group() const { return *_group; }
00093
00094 const char *tentacle::outcome_name(const outcome &to_name)
00095 { return common::outcome_name(to_name); }
00096
00097 int tentacle::motivational_rate() const
00098 { if (_action) return _action->sleep_time(); else return 0; }
00099
00100 void tentacle::attach_storage(entity_data_bin &storage)
00101 {
00102 _products = &storage;
00103 if (_action) _action->start(NIL);
00104 }
00105
00106 void tentacle::detach_storage()
00107 {
00108 if (_action) _action->stop();
00109 _products = NIL;
00110 }
00111
00112 bool tentacle::store_product(infoton *product,
00113 const octopus_request_id &original_id)
00114 {
00115 FUNCDEF("store_product");
00116 if (!_products) {
00117 #ifdef DEBUG_TENTACLE
00118 LOG("storage bunker has not been established!");
00119 #endif
00120 return false;
00121 }
00122 return _products->add_item(product, original_id);
00123 }
00124
00125 outcome tentacle::enqueue(infoton *to_chow, const octopus_request_id &item_id)
00126 {
00127 GRAB_CONSUMER_LOCK;
00128 int max_size = 0;
00129
00130
00131
00132
00133 if (_products)
00134 max_size = _products->max_bytes_per_entity();
00135 int curr_size = 0;
00136 if (max_size) {
00137
00138 for (int i = 0; i < _pending->elements(); i++) {
00139 curr_size += _pending->borrow(i)->_product->packed_size();
00140 }
00141 if (curr_size + to_chow->packed_size() > max_size) {
00142 WHACK(to_chow);
00143 return NO_SPACE;
00144 }
00145 }
00146 *_pending += new infoton_record(to_chow, item_id);
00147
00148
00149 return OKAY;
00150 }
00151
00152 infoton *tentacle::next_request(octopus_request_id &item_id)
00153 {
00154 GRAB_CONSUMER_LOCK;
00155 if (!_pending->elements()) return NIL;
00156 infoton *to_return = (*_pending)[0]->_product;
00157 (*_pending)[0]->_product = NIL;
00158
00159 item_id = (*_pending)[0]->_id;
00160 _pending->zap(0, 0);
00161 return to_return;
00162 }
00163
00164 void tentacle::propel_arm()
00165 {
00166 FUNCDEF("propel_arm");
00167 infoton *next_item = NIL;
00168 do {
00169 octopus_request_id id;
00170 next_item = next_request(id);
00171 if (!next_item) break;
00172 byte_array ignored;
00173 outcome ret = consume(*next_item, id, ignored);
00174 if (ret != OKAY) {
00175 #ifdef DEBUG_TENTACLE
00176 LOG(istring("failed to act on ") + next_item->classifier().text_form());
00177 #endif
00178 }
00179 WHACK(next_item);
00180 } while (next_item);
00181 }
00182
00183
00184 #endif //TENTACLE_IMPLEMENTATION_FILE
00185