00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015 #include "common.h"
00016 #include "grid_processor.h"
00017 #include "login_info.h"
00018
00019 #include <basis/function.h>
00020 #include <basis/log_base.h>
00021 #include <basis/portable.h>
00022 #include <basis/set.cpp>
00023 #include <basis/string_array.h>
00024 #include <data_struct/deep_array.cpp>
00025 #include <mechanisms/time_stamp.cpp>
00026 #include <opsystem/byte_filer.h>
00027 #include <opsystem/directory.h>
00028 #include <opsystem/filename.h>
00029 #include <opsystem/ini_config.h>
00030 #include <textual/list_parsing.h>
00031
00032 #include <malloc.h>
00033
00034 #define LOG(s) CLASS_EMERGENCY_LOG(program_wide_logger(), filename(portable::application_name()).basename().raw() + " " + s)
00035
00036 const int GRID_CHECK_PAUSE = int(2.5 * MINUTE_ms);
00037
00039
00040 class float_array : public array<float> {};
00041
00043
00044 grid_processor::grid_processor(query_handler &work_with)
00045 : _querier(work_with),
00046 _startup_time(new time_stamp),
00047 _complain_on_missing_grid(true)
00048 {
00049 }
00050
00051 grid_processor::~grid_processor()
00052 {
00053 WHACK(_startup_time);
00054 }
00055
00056 bool grid_processor::write_bogus_row(byte_filer &df_file, int data_source,
00057 int row_number)
00058 {
00059 FUNCDEF("write_bogus_row");
00060 DBPROCESS *current_db = _querier.current_db();
00061 if (!current_db) { LOG("ERROR: null database object"); return false; }
00062
00063
00064
00065 bool to_return = true;
00066 for (int i = 1; i <= dbnumcols(current_db); i++) {
00067 string_array items;
00068
00069 items += isprintf("%d", row_number * 100 + i);
00070
00071
00072 items += "UNAVAILABLE";
00073 int type = dbcoltype(current_db, i);
00074 switch (type) {
00075 case SYBINT1:
00076 case SYBINT2:
00077 case SYBINT4: {
00078 items += "INT32";
00079
00080 items += "-1";
00081 break;
00082 }
00083 case SYBCHAR:
00084 case SYBVARCHAR:
00085 case SYBTEXT: {
00086
00087
00088 items += "STRING";
00089 items += "blank";
00090 break;
00091 }
00092 default: {
00093 items += "STRING";
00094 items += "unhandled";
00095 break;
00096 }
00097 }
00098
00099
00100 istring output_line;
00101 list_parsing::create_csv_line(items, output_line);
00102 output_line += log_base::platform_ending();
00103 df_file.write(output_line);
00104 }
00105
00106 return to_return;
00107 }
00108
00109 bool grid_processor::write_fields_for_row(byte_filer &df_file, int data_source,
00110 const istring &key_column, const string_array &row_names,
00111 float_array &multipliers, int_set &rows_seen)
00112 {
00113 FUNCDEF("write_fields_for_row");
00114 DBPROCESS *current_db = _querier.current_db();
00115 if (!current_db) { LOG("ERROR: null database object"); return false; }
00116
00117 int current_row = -1;
00118
00119
00120
00121
00122 for (int i = 1; i <= dbnumcols(current_db); i++) {
00123 if (current_row != -1) break;
00124 istring cur_col_name = dbcolname(current_db, i);
00125 if (cur_col_name == key_column) {
00126
00127
00128 istring found_at_i = _querier.get_bound_string(i);
00129
00130 if ( (dbcoltype(current_db, i) == SYBINT1)
00131 || (dbcoltype(current_db, i) == SYBINT2)
00132 || (dbcoltype(current_db, i) == SYBINT4) ) {
00133 found_at_i = isprintf("%d", _querier.get_bound_int(i));
00134 }
00135
00136
00137 for (int j = 0; j < row_names.length(); j++) {
00138 if (found_at_i.iequals(row_names[j])) {
00139
00140 current_row = j;
00141 break;
00142 }
00143 }
00144
00145
00146 break;
00147 }
00148 }
00149
00150
00151
00152 if (negative(current_row)) {
00153
00154 LOG(istring("ERROR: could not find column '") + key_column
00155 + "' to determine what row this is. columns are:");
00156 istring show_cols;
00157 for (int i = 1; i <= dbnumcols(current_db); i++) {
00158 if (i > 1) show_cols += "\t";
00159 show_cols += dbcolname(current_db, i);
00160 }
00161 LOG(show_cols);
00162 return false;
00163 }
00164
00165
00166
00167
00168
00169 istring matching_row_name = row_names[current_row];
00170 int_set rows_to_fill;
00171 for (int j = 0; j < row_names.length(); j++) {
00172
00173 if (matching_row_name.iequals(row_names[j]))
00174 rows_to_fill += j;
00175 }
00176
00177 rows_seen += rows_to_fill;
00178
00179 bool to_return = true;
00180 for (int fill_which = 0; fill_which < rows_to_fill.length(); fill_which++) {
00181
00182 int row_to_store = rows_to_fill[fill_which];
00183 for (int i = 1; i <= dbnumcols(current_db); i++) {
00184 string_array items;
00185
00186 items += isprintf("%d", row_to_store * 100 + i);
00187
00188
00189 items += "OK";
00190 int type = dbcoltype(current_db, i);
00191 switch (type) {
00192 case SYBINT1:
00193 case SYBINT2:
00194 case SYBINT4: {
00195 items += "INT32";
00196
00197 int new_value = int(multipliers[i - 1] * _querier.get_bound_int(i));
00198 items += isprintf("%d", new_value);
00199 break;
00200 }
00201 case SYBCHAR:
00202 case SYBVARCHAR:
00203 case SYBTEXT: {
00204
00205
00206 items += "STRING";
00207 istring found = _querier.get_bound_string(i);
00208 found.shrink();
00209 items += found;
00210 break;
00211 }
00212 default: {
00213 items += "STRING";
00214 items += "unhandled";
00215 break;
00216 }
00217 }
00218
00219
00220 istring output_line;
00221 list_parsing::create_csv_line(items, output_line);
00222 output_line += log_base::platform_ending();
00223 df_file.write(output_line);
00224 }
00225 }
00226 return to_return;
00227 }
00228
00229 void grid_processor::write_data_fields(int data_source,
00230 const istring &df_storage_dir, const istring &key_column,
00231 const string_array &row_names, float_array &multipliers)
00232 {
00233 FUNCDEF("write_data_fields");
00234 DBPROCESS *current_db = _querier.current_db();
00235 if (!current_db) { LOG("ERROR: null database object"); return; }
00236 int db_result = REG_ROW;
00237
00238 bool bind_worked = _querier.bind_columns();
00239 if (!bind_worked) {
00240
00241 return;
00242 }
00243
00244
00245 byte_filer df_file(istring(df_storage_dir)
00246 + isprintf("/df_%d.csv", data_source), "wb");
00247
00248 int_set rows_seen;
00249
00250 while (db_result == REG_ROW) {
00251
00252 db_result = dbnextrow(current_db);
00253 if (db_result == REG_ROW) {
00254 write_fields_for_row(df_file, data_source, key_column, row_names,
00255 multipliers, rows_seen);
00256 } else if (db_result != NO_MORE_ROWS) {
00257
00258 LOG(isprintf("ERROR: failure to get next row, error code is %d", db_result));
00259 }
00260 }
00261
00262 int_set all_rows_expected;
00263 for (int i = 0; i < row_names.length(); i++) {
00264 all_rows_expected += i;
00265 }
00266 int_set missing_rows = all_rows_expected - rows_seen;
00267 for (int i = 0; i < missing_rows.length(); i++) {
00268 int one_row_missing = missing_rows[i];
00269 write_bogus_row(df_file, data_source, one_row_missing);
00270 }
00271
00272 df_file.close();
00273 }
00274
00275 bool grid_processor::process_query_as_df(const istring &query, int data_source,
00276 const istring &df_storage_dir, const istring &key_column,
00277 const string_array &row_names, float_array &multipliers)
00278 {
00279 FUNCDEF("process_query_as_df");
00280 DBPROCESS *current_db = _querier.current_db();
00281 if (!current_db) { LOG("ERROR: null database object"); return false; }
00282
00283 dbfreebuf(current_db);
00284 dbcmd(current_db, query.s());
00285 dbsqlexec(current_db);
00286
00287 if (dbresults(current_db) != SUCCEED) {
00288 LOG(isprintf("ERROR: was expecting a result set and got none."));
00290 return false;
00291 }
00292 write_data_fields(data_source, df_storage_dir, key_column,
00293 row_names, multipliers);
00294 return true;
00295 }
00296
00297 void grid_processor::reset_data_fields(const istring &df_storage_dir,
00298 int data_source)
00299 {
00300 FUNCDEF("reset_data_fields");
00301 byte_filer df_file(istring(df_storage_dir)
00302 + isprintf("/df_%d.csv", data_source), "rb");
00303 string_array lines;
00304 while (!df_file.eof()) {
00305 istring line;
00306 int chars = df_file.getline(line, 2048);
00307
00308 if (chars > 0) {
00309 lines += line;
00310 }
00311 }
00312 df_file.close();
00313 df_file.open(istring(df_storage_dir)
00314 + isprintf("/df_%d.csv", data_source), "wb");
00315 for (int i = 0; i < lines.length(); i++) {
00316 istring line = lines[i];
00317 int comma_indy = line.find(',', 0);
00318 if (negative(comma_indy)) {
00319 LOG(istring("ERROR: failed to parse line: ") + line);
00320 return;
00321 }
00322 int start_word = comma_indy + 2;
00323 istring word_accum;
00324 int last_word_indy = -1;
00325 for (int j = start_word; j < line.length(); j++) {
00326 char curr = line[j];
00327 if ( (curr != ',') && (curr != '"') ) {
00328 word_accum += istring(curr, 1);
00329 last_word_indy = j;
00330 } else break;
00331 }
00332 if (!word_accum.iequals("unavailable")) {
00333
00334 istring new_line = line.substring(0, start_word - 1);
00335 new_line += "UNAVAILABLE";
00336 new_line += line.substring(last_word_indy + 1, line.end());
00337 df_file.write(new_line);
00338 } else {
00339
00340 df_file.write(line);
00341 }
00342 }
00343 df_file.close();
00344 }
00345
00346 void grid_processor::process_grid_files(const istring &grid_storage_folder,
00347 const istring &df_storage_dir, database_login_info &persistent_login_info)
00348 {
00349 FUNCDEF("process_grid_files");
00350
00351 directory grid_dir(grid_storage_folder, "*.def");
00352
00353 if (_complain_on_missing_grid && !grid_dir.files().length()
00354 && (time_stamp().value() - _startup_time->value()
00355 > GRID_CHECK_PAUSE) ) {
00356
00357
00358 LOG(isprintf("There are no grid definition files after %.1f minutes; "
00359 "this seems wrong.", float(GRID_CHECK_PAUSE) / MINUTE_ms));
00360 _complain_on_missing_grid = false;
00361 }
00362
00363
00364 for (int file_indy = 0; file_indy < grid_dir.files().length();
00365 file_indy++) {
00366 _complain_on_missing_grid = false;
00367
00368 istring current_file = grid_dir.files()[file_indy];
00369 if (_last_grid_seen != current_file) {
00370 LOG(istring("reading grid file: ") + current_file);
00371 _last_grid_seen = current_file;
00372 }
00373
00374 istring just_ds = current_file.substring(0, current_file.length() - 5);
00375
00376 int underscore_indy = just_ds.find('_', just_ds.end(), true);
00377 just_ds.zap(0, underscore_indy);
00378 int data_source = just_ds.convert(-1);
00379 if (data_source == -1) {
00380 LOG(istring("ERROR: failed to parse data source ID from file name: ")
00381 + current_file);
00382 continue;
00383 }
00384
00385
00386
00387 istring grid_def = grid_dir.path() + "/" + current_file;
00388
00389 bool db_ready = _querier.prepare_for_query(grid_def, persistent_login_info);
00390 if (!db_ready) {
00391
00392 reset_data_fields(df_storage_dir, data_source);
00393 continue;
00394 }
00395
00396
00397
00398 ini_configurator ini(grid_def);
00399
00400 string_array col_names;
00401 float_array multipliers;
00402 istring curr_item = "bogus";
00403 for (int i = 0; curr_item != ""; i++) {
00404 curr_item = ini.load(isprintf("column_%d", i), "dbcol", "");
00405 if (curr_item.t()) {
00406 istring mult = ini.load(isprintf("column_%d", i), "multiplier", "");
00407
00408 if (mult.t()) multipliers += mult.convert(1.0);
00409 else multipliers += 1.0;
00410
00411 col_names += curr_item;
00412 }
00413 }
00414 if (!col_names.length()) {
00415 LOG("ERROR: there was no column info garnered from grid definition.");
00416 reset_data_fields(df_storage_dir, data_source);
00417 continue;
00418 }
00419
00420 istring key_column = ini.load("rows", "key_column", "");
00421 if (!key_column) {
00422 LOG("ERROR: the key column for row queries seems absent from grid definition.");
00423 reset_data_fields(df_storage_dir, data_source);
00424 continue;
00425 }
00426
00427
00428 string_array row_keys;
00429 curr_item = "bogus";
00430 for (int i = 0; curr_item != ""; i++) {
00431 curr_item = ini.load("rows", isprintf("rowkey_%d", i), "");
00432 if (curr_item.t()) {
00433 row_keys += curr_item;
00434
00435 }
00436 }
00437 if (!row_keys.length()) {
00438 LOG("ERROR: there was no row key info found in grid definition.");
00439 reset_data_fields(df_storage_dir, data_source);
00440 continue;
00441 }
00442
00443
00444 istring query;
00445
00446 query += "select ";
00447 bool saw_key_col = false;
00448 for (int i = 0; i < col_names.length(); i++) {
00449 if (i) query += ", ";
00450 istring safe_colname = col_names[i];
00451 common_database_support::nerf_special_characters(safe_colname,
00452 DB_FREETDS_SQL_ESCAPE_CHARACTER);
00453 query += safe_colname;
00454
00455
00456 if (col_names[i] == key_column) saw_key_col = true;
00457 }
00458
00459 istring temp_key_col = key_column;
00460 common_database_support::nerf_special_characters(temp_key_col,
00461 DB_FREETDS_SQL_ESCAPE_CHARACTER);
00462
00463 if (!saw_key_col) {
00464
00465
00466 query += istring(", ") + temp_key_col;
00467 }
00468
00469 query += " from ";
00470 query += persistent_login_info.table_name;
00471 query += " where ";
00472 query += temp_key_col;
00473 query += " in (";
00474 for (int i = 0; i < row_keys.length(); i++) {
00475 if (i) query += ", ";
00476 istring temp_rowkey = row_keys[i];
00477 common_database_support::nerf_special_characters(temp_rowkey,
00478 DB_FREETDS_SQL_ESCAPE_CHARACTER);
00479
00480 query += istring("'") + temp_rowkey + "'";
00481 }
00482 query += ")";
00483
00484
00485
00486 bool worked = process_query_as_df(query, data_source, df_storage_dir,
00487 key_column, row_keys, multipliers);
00488 if (!worked) {
00489 LOG("ERROR: failed to process the query and acquire data fields.");
00490 reset_data_fields(df_storage_dir, data_source);
00491 }
00492 dbcanquery(_querier.current_db());
00493 }
00494 }
00495