grid_processor.cpp

Go to the documentation of this file.
00001 /*****************************************************************************\
00002 *                                                                             *
00003 *  Name   : grid_processor                                                    *
00004 *  Author : Chris Koeritz                                                     *
00005 *                                                                             *
00006 *******************************************************************************
00007 * Copyright (c) 2008-$now By Author.  This program is free software; you can  *
00008 * redistribute it and/or modify it under the terms of the GNU General Public  *
00009 * License as published by the Free Software Foundation; either version 2 of   *
00010 * the License or (at your option) any later version.  This is online at:      *
00011 *     http://www.fsf.org/copyleft/gpl.html                                    *
00012 * Please send any updates to: fred@gruntose.com                               *
00013 \*****************************************************************************/
00014 
00015 #include "common.h"
00016 #include "grid_processor.h"
00017 #include "login_info.h"
00018 
00019 #include <basis/function.h>
00020 #include <basis/log_base.h>
00021 #include <basis/portable.h>
00022 #include <basis/set.cpp>
00023 #include <basis/string_array.h>
00024 #include <data_struct/deep_array.cpp>
00025 #include <mechanisms/time_stamp.cpp>
00026 #include <opsystem/byte_filer.h>
00027 #include <opsystem/directory.h>
00028 #include <opsystem/filename.h>
00029 #include <opsystem/ini_config.h>
00030 #include <textual/list_parsing.h>
00031 
00032 #include <malloc.h>
00033 
00034 #define LOG(s) CLASS_EMERGENCY_LOG(program_wide_logger(), filename(portable::application_name()).basename().raw() + " " + s)
00035 
00036 const int GRID_CHECK_PAUSE = int(2.5 * MINUTE_ms);
00037 
00039 
00040 class float_array : public array<float> {};
00041 
00043 
00044 grid_processor::grid_processor(query_handler &work_with)
00045 : _querier(work_with),
00046   _startup_time(new time_stamp),
00047   _complain_on_missing_grid(true)
00048 {
00049 }
00050 
00051 grid_processor::~grid_processor()
00052 {
00053   WHACK(_startup_time);
00054 }
00055 
00056 bool grid_processor::write_bogus_row(byte_filer &df_file, int data_source,
00057    int row_number)
00058 {
00059   FUNCDEF("write_bogus_row");
00060   DBPROCESS *current_db = _querier.current_db();
00061   if (!current_db) { LOG("ERROR: null database object"); return false; }
00062 
00063 //LOG("got to fudging rows now...");
00064 
00065   bool to_return = true;
00066   for (int i = 1; i <= dbnumcols(current_db); i++) {
00067     string_array items;
00068     // add the datafield id.
00069     items += isprintf("%d", row_number * 100 + i);
00070 
00071     // add the status.
00072     items += "UNAVAILABLE";
00073     int type = dbcoltype(current_db, i);
00074     switch (type) {
00075       case SYBINT1:
00076       case SYBINT2:
00077       case SYBINT4: {
00078         items += "INT32";
00079         // multiply by the factor we were given to calculate the final value.
00080         items += "-1";
00081         break;
00082       }
00083       case SYBCHAR:
00084       case SYBVARCHAR:
00085       case SYBTEXT: {
00086 //STRINGBIND
00087 //VARYCHARBIND
00088         items += "STRING";
00089         items += "blank";
00090         break;
00091       }
00092       default: {
00093         items += "STRING";
00094         items += "unhandled";
00095         break;
00096       }
00097     }
00098 
00099     // create this line as a CSV formatted string and put it in the file.
00100     istring output_line;
00101     list_parsing::create_csv_line(items, output_line);
00102     output_line += log_base::platform_ending();
00103     df_file.write(output_line);
00104   }
00105 
00106   return to_return;
00107 }
00108 
00109 bool grid_processor::write_fields_for_row(byte_filer &df_file, int data_source,
00110     const istring &key_column, const string_array &row_names,
00111     float_array &multipliers, int_set &rows_seen)
00112 {
00113   FUNCDEF("write_fields_for_row");
00114   DBPROCESS *current_db = _querier.current_db();
00115   if (!current_db) { LOG("ERROR: null database object"); return false; }
00116 
00117   int current_row = -1;  // until we know which row it is.
00118 
00119 //LOG(isprintf("got numcols = %d", dbnumcols(current_db)));
00120 
00121   // first figure out which freaking row this is.
00122   for (int i = 1; i <= dbnumcols(current_db); i++) {
00123     if (current_row != -1) break;  // found a match already.
00124     istring cur_col_name = dbcolname(current_db, i);
00125     if (cur_col_name == key_column) {
00126       // this is the right column; it's the key.  now compare its values.
00127 //LOG(isprintf("found column same as key at col# %d", i));
00128       istring found_at_i = _querier.get_bound_string(i);
00129       // handle a key that's an integer column.
00130       if ( (dbcoltype(current_db, i) == SYBINT1)
00131           || (dbcoltype(current_db, i) == SYBINT2)
00132           || (dbcoltype(current_db, i) == SYBINT4) ) {
00133         found_at_i = isprintf("%d", _querier.get_bound_int(i));
00134       }
00135 
00136       // search through the rows and see if this is an expected row.
00137       for (int j = 0; j < row_names.length(); j++) {
00138         if (found_at_i.iequals(row_names[j])) {
00139           // found the right row name for the row we got from DB.
00140           current_row = j;
00141           break;
00142         }
00143       }
00144 //LOG("breaking out due to having found correct column");
00145       // we found the right column already, so stop looking.
00146       break;
00147     }
00148   }
00149 
00150 //LOG(isprintf("db num cols = %d", dbnumcols(current_db)));
00151 
00152   if (negative(current_row)) {
00153     // we never found the column; complain.
00154     LOG(istring("ERROR: could not find column '") + key_column
00155         + "' to determine what row this is.  columns are:");
00156     istring show_cols;
00157     for (int i = 1; i <= dbnumcols(current_db); i++) {
00158       if (i > 1) show_cols += "\t";
00159       show_cols += dbcolname(current_db, i);
00160     }
00161     LOG(show_cols);
00162     return false;
00163   }
00164 
00165 //LOG("got to scanning rows now...");
00166 
00167   // we know what row we *think* it's at, but if there were multiple of
00168   // the same row, we need to fill in all the lines with the one match.
00169   istring matching_row_name = row_names[current_row];
00170   int_set rows_to_fill;
00171   for (int j = 0; j < row_names.length(); j++) {
00172     // at least one of these should be our original match.
00173     if (matching_row_name.iequals(row_names[j]))
00174       rows_to_fill += j;
00175   }
00176 
00177   rows_seen += rows_to_fill;  // remember which ones we already hit.
00178 
00179   bool to_return = true;
00180   for (int fill_which = 0; fill_which < rows_to_fill.length(); fill_which++) {
00181     // get the next row # on display that gets this line of data.
00182     int row_to_store = rows_to_fill[fill_which];
00183     for (int i = 1; i <= dbnumcols(current_db); i++) {
00184       string_array items;
00185       // add the datafield id.
00186       items += isprintf("%d", row_to_store * 100 + i);
00187 
00188       // add the status.
00189       items += "OK";
00190       int type = dbcoltype(current_db, i);
00191       switch (type) {
00192         case SYBINT1:
00193         case SYBINT2:
00194         case SYBINT4: {
00195           items += "INT32";
00196           // multiply by the factor we were given to calculate the final value.
00197           int new_value = int(multipliers[i - 1] * _querier.get_bound_int(i));
00198           items += isprintf("%d", new_value);
00199           break;
00200         }
00201         case SYBCHAR:
00202         case SYBVARCHAR:
00203         case SYBTEXT: {
00204 //STRINGBIND
00205 //VARYCHARBIND
00206           items += "STRING";
00207           istring found = _querier.get_bound_string(i);
00208           found.shrink();  // get proper length.
00209           items += found;
00210           break;
00211         }
00212         default: {
00213           items += "STRING";
00214           items += "unhandled";
00215           break;
00216         }
00217       }
00218 
00219       // create this line as a CSV formatted string and put it in the file.
00220       istring output_line;
00221       list_parsing::create_csv_line(items, output_line);
00222       output_line += log_base::platform_ending();
00223       df_file.write(output_line);
00224     }
00225   }
00226   return to_return;
00227 }
00228 
00229 void grid_processor::write_data_fields(int data_source,
00230     const istring &df_storage_dir, const istring &key_column,
00231     const string_array &row_names, float_array &multipliers)
00232 {
00233   FUNCDEF("write_data_fields");
00234   DBPROCESS *current_db = _querier.current_db();
00235   if (!current_db) { LOG("ERROR: null database object"); return; }
00236   int db_result = REG_ROW;
00237 
00238   bool bind_worked = _querier.bind_columns();
00239   if (!bind_worked) {
00240 //hmmm: bail out when the result comes back other than success.
00241     return;
00242   }
00243 
00244   // open up our output file where we will dump all the result data.
00245   byte_filer df_file(istring(df_storage_dir)
00246       + isprintf("/df_%d.csv", data_source), "wb");
00247 
00248   int_set rows_seen;  // tracks which rows were handled.
00249 
00250   while (db_result == REG_ROW) {
00251     // see whether there's a next row.
00252     db_result = dbnextrow(current_db);
00253     if (db_result == REG_ROW) {
00254       write_fields_for_row(df_file, data_source, key_column, row_names,
00255           multipliers, rows_seen);
00256     } else if (db_result != NO_MORE_ROWS) {
00257       // report that an unexpected error occurred.      
00258       LOG(isprintf("ERROR: failure to get next row, error code is %d", db_result));
00259     }
00260   }
00261 
00262   int_set all_rows_expected;
00263   for (int i = 0; i < row_names.length(); i++) {
00264     all_rows_expected += i;
00265   }
00266   int_set missing_rows = all_rows_expected - rows_seen;
00267   for (int i = 0; i < missing_rows.length(); i++) {
00268     int one_row_missing = missing_rows[i];
00269     write_bogus_row(df_file, data_source, one_row_missing);
00270   }
00271 
00272   df_file.close();
00273 }
00274 
00275 bool grid_processor::process_query_as_df(const istring &query, int data_source,
00276     const istring &df_storage_dir, const istring &key_column,
00277     const string_array &row_names, float_array &multipliers)
00278 {
00279   FUNCDEF("process_query_as_df");
00280   DBPROCESS *current_db = _querier.current_db();
00281   if (!current_db) { LOG("ERROR: null database object"); return false; }
00282 //LOG(istring("process_query_as_df on: ") + query);
00283   dbfreebuf(current_db);
00284   dbcmd(current_db, query.s());
00285   dbsqlexec(current_db);
00286 
00287   if (dbresults(current_db) != SUCCEED) {
00288     LOG(isprintf("ERROR: was expecting a result set and got none."));
00290     return false;
00291   }
00292   write_data_fields(data_source, df_storage_dir, key_column,
00293       row_names, multipliers);
00294   return true;
00295 }
00296 
00297 void grid_processor::reset_data_fields(const istring &df_storage_dir,
00298     int data_source)
00299 {
00300   FUNCDEF("reset_data_fields");
00301   byte_filer df_file(istring(df_storage_dir)
00302       + isprintf("/df_%d.csv", data_source), "rb");
00303   string_array lines;
00304   while (!df_file.eof()) {
00305     istring line;
00306     int chars = df_file.getline(line, 2048);
00307       // ridiculous but sufficient limit.
00308     if (chars > 0) {
00309       lines += line;
00310     }
00311   }
00312   df_file.close();
00313   df_file.open(istring(df_storage_dir)
00314       + isprintf("/df_%d.csv", data_source), "wb");
00315   for (int i = 0; i < lines.length(); i++) {
00316     istring line = lines[i];
00317     int comma_indy = line.find(',', 0);
00318     if (negative(comma_indy)) {
00319       LOG(istring("ERROR: failed to parse line: ") + line);
00320       return;  // hose-up.  stop.
00321     }
00322     int start_word = comma_indy + 2;  // skip comma and quote.
00323     istring word_accum;
00324     int last_word_indy = -1;
00325     for (int j = start_word; j < line.length(); j++) {
00326       char curr = line[j];
00327       if ( (curr != ',') && (curr != '"') ) {
00328         word_accum += istring(curr, 1);
00329         last_word_indy = j;
00330       } else break;  // done looking.
00331     }
00332     if (!word_accum.iequals("unavailable")) {
00333       // take out the wrong word and replace it.
00334       istring new_line = line.substring(0, start_word - 1);
00335       new_line += "UNAVAILABLE";
00336       new_line += line.substring(last_word_indy + 1, line.end());
00337       df_file.write(new_line);
00338     } else {
00339       // no processing on the line.
00340       df_file.write(line);
00341     }
00342   }
00343   df_file.close();
00344 }
00345 
00346 void grid_processor::process_grid_files(const istring &grid_storage_folder,
00347     const istring &df_storage_dir, database_login_info &persistent_login_info)
00348 {
00349     FUNCDEF("process_grid_files");
00350     // locate all grid definition files in the directory stated.
00351     directory grid_dir(grid_storage_folder, "*.def");
00352 
00353     if (_complain_on_missing_grid && !grid_dir.files().length()
00354         && (time_stamp().value() - _startup_time->value()
00355             > GRID_CHECK_PAUSE) ) {
00356       // there's no grid file after a substantial pause, that's worth
00357       // reporting to the user.
00358       LOG(isprintf("There are no grid definition files after %.1f minutes; "
00359           "this seems wrong.", float(GRID_CHECK_PAUSE) / MINUTE_ms));
00360       _complain_on_missing_grid = false;  // record that we already complained.
00361     }
00362 
00363     // for each file in that directory, process the database queries...
00364     for (int file_indy = 0; file_indy < grid_dir.files().length();
00365         file_indy++) {
00366       _complain_on_missing_grid = false;  // record that we saw a filename.
00367 
00368       istring current_file = grid_dir.files()[file_indy];
00369       if (_last_grid_seen != current_file) {
00370         LOG(istring("reading grid file: ") + current_file);
00371         _last_grid_seen = current_file;
00372       }
00373       // calculate which data source this grid file is for.
00374       istring just_ds = current_file.substring(0, current_file.length() - 5);
00375         // pare off the ".def" ending.
00376       int underscore_indy = just_ds.find('_', just_ds.end(), true);
00377       just_ds.zap(0, underscore_indy);
00378       int data_source = just_ds.convert(-1);
00379       if (data_source == -1) {
00380         LOG(istring("ERROR: failed to parse data source ID from file name: ")
00381             + current_file);
00382         continue;
00383       }
00384   //LOG(isprintf("ds id is %d", data_source));
00385 
00386       // create the full path to the grid definition.
00387       istring grid_def = grid_dir.path() + "/" + current_file;
00388 
00389       bool db_ready = _querier.prepare_for_query(grid_def, persistent_login_info);
00390       if (!db_ready) {
00391         // prepare_for_query already does enough logging.  just bail on this.
00392         reset_data_fields(df_storage_dir, data_source);
00393         continue;
00394       }
00395 
00396   //could be moved to a read db grid query method...
00397       // open grid definition as an INI file.
00398       ini_configurator ini(grid_def);
00399       // read the column names from the ini file.
00400       string_array col_names;
00401       float_array multipliers;  // values to multiply int fields by.
00402       istring curr_item = "bogus";  // satisfy loop condition.
00403       for (int i = 0; curr_item != ""; i++) {
00404         curr_item = ini.load(isprintf("column_%d", i), "dbcol", "");
00405         if (curr_item.t()) {
00406           istring mult = ini.load(isprintf("column_%d", i), "multiplier", "");
00407   //LOG(istring("found mult of ") + mult);
00408           if (mult.t()) multipliers += mult.convert(1.0);
00409           else multipliers += 1.0;
00410   //LOG(isprintf("calculated mult of %f", multipliers[multipliers.length() - 1]));
00411           col_names += curr_item;
00412         }
00413       }
00414       if (!col_names.length()) {
00415         LOG("ERROR: there was no column info garnered from grid definition.");
00416         reset_data_fields(df_storage_dir, data_source);
00417         continue;
00418       }
00419 
00420       istring key_column = ini.load("rows", "key_column", "");
00421       if (!key_column) {
00422         LOG("ERROR: the key column for row queries seems absent from grid definition.");
00423         reset_data_fields(df_storage_dir, data_source);
00424         continue;
00425       }
00426 
00427       // read the row information from the ini file.
00428       string_array row_keys;
00429       curr_item = "bogus";
00430       for (int i = 0; curr_item != ""; i++) {
00431         curr_item = ini.load("rows", isprintf("rowkey_%d", i), "");
00432         if (curr_item.t()) {
00433           row_keys += curr_item;
00434   //LOG(isprintf("row key #%d is ", i) + curr_item);
00435         }
00436       }
00437       if (!row_keys.length()) {
00438         LOG("ERROR: there was no row key info found in grid definition.");
00439         reset_data_fields(df_storage_dir, data_source);
00440         continue;
00441       }
00442 
00443       // first line in the query picks our escape character.
00444       istring query;
00445       // now we start the actual query for data.
00446       query += "select ";
00447       bool saw_key_col = false;
00448       for (int i = 0; i < col_names.length(); i++) {
00449         if (i) query += ", ";
00450         istring safe_colname = col_names[i];
00451         common_database_support::nerf_special_characters(safe_colname,
00452             DB_FREETDS_SQL_ESCAPE_CHARACTER);
00453         query += safe_colname;
00454         // make sure the query has the key column in it someplace.  this
00455         // allows us to know the order that the results should be shown in.
00456         if (col_names[i] == key_column) saw_key_col = true;
00457       }
00458 
00459       istring temp_key_col = key_column;
00460       common_database_support::nerf_special_characters(temp_key_col,
00461           DB_FREETDS_SQL_ESCAPE_CHARACTER);
00462 
00463       if (!saw_key_col) {
00464         // we assume there are other columns other than the key column; if not,
00465         // then what kind of query is this anyway?
00466         query += istring(", ") + temp_key_col;
00467       }
00468 
00469       query += " from ";
00470       query += persistent_login_info.table_name;
00471       query += " where ";
00472       query += temp_key_col;  // pre-nerfed.
00473       query += " in (";
00474       for (int i = 0; i < row_keys.length(); i++) {
00475         if (i) query += ", ";
00476         istring temp_rowkey = row_keys[i];
00477         common_database_support::nerf_special_characters(temp_rowkey,
00478             DB_FREETDS_SQL_ESCAPE_CHARACTER);
00479         // quotes are needed around string values and will work with non-strings.
00480         query += istring("'") + temp_rowkey + "'";
00481       }
00482       query += ")";
00483   //end of read db grid query method.
00484 
00485       // now chew on the database and store the results as data fields.
00486       bool worked = process_query_as_df(query, data_source, df_storage_dir,
00487           key_column, row_keys, multipliers);
00488       if (!worked) {
00489         LOG("ERROR: failed to process the query and acquire data fields.");
00490         reset_data_fields(df_storage_dir, data_source);
00491       }
00492       dbcanquery(_querier.current_db());
00493     }
00494   }
00495 

Generated on Thu Nov 20 04:28:56 2008 for HOOPLE Libraries by  doxygen 1.5.1