26 #include <boost/program_options.hpp>
27 #include <boost/program_options/positional_options.hpp>
36 #include "binary_reader.hpp"
38 using namespace swarm;
41 namespace po = boost::program_options;
43 using boost::shared_ptr;
44 using gpulog::logrecord;
46 int recordsLimit = 1000;
48 const int CACHESIZE = 1024*1024*64 ;
50 typedef long long idx_t;
52 struct logdb_primary_key {
60 po::variables_map argvars_map;
61 string inputFileName, outputFileName;
65 idx_t start_recno = 0;
68 idx_t starting_position = 0;
70 void parse_commandline_and_config(
int argc,
char* argv[]){
72 po::options_description desc(
"Usage:\n \tlog2db [options]\nOptions");
74 (
"input,i", po::value<std::string>(),
"Binary log file from swarm binary_writer")
75 (
"output,o", po::value<std::string>(),
"Name of the database output file")
76 (
"number,n", po::value<int>(),
"Number of records to convert")
77 (
"verbose,v", po::value<int>(),
"Verbosity level")
78 (
"position,p", po::value<idx_t>(),
"Absolute position in the input file where the conversion should begin (in bytes)")
79 (
"recno,r", po::value<idx_t>(),
"Starting record number")
80 (
"dump,d",
"Dump all the records up to the number")
81 (
"quiet,q",
"Suppress all messages")
82 (
"help,h",
"Help message")
87 po::variables_map &vm = argvars_map;
88 po::store(po::command_line_parser(argc, argv).
89 options(desc).run(), vm);
94 if (vm.count(
"help")) { std::cout << desc <<
"\n"; exit(1); }
95 if (vm.count(
"verbose") ) DEBUG_LEVEL = vm[
"verbose"].as<int>();
96 if (vm.count(
"quiet") ) DEBUG_LEVEL = -1;
99 inputFileName = vm[
"input"].as<string>();
101 cerr <<
"Name of input file is required" << endl;
105 if(vm.count(
"output"))
106 outputFileName = vm[
"output"].as<string>();
108 cerr <<
"Name of output file is required" << endl;
112 if(vm.count(
"number"))
113 recordsLimit = vm[
"number"].as<int>();
115 if(vm.count(
"recno"))
116 start_recno = vm[
"recno"].as<idx_t>();
118 if(vm.count(
"position"))
119 starting_position = vm[
"position"].as<idx_t>();
133 data->set_flags(DB_DBT_APPMALLOC);
134 data->set_size(
sizeof(T));
135 data->set_data(
new T(t));
139 bool extract_from_ptr(
void* ptr,
size_t size,
double& time,
int& sys){
140 logrecord l((
char*)ptr);
141 assert(l.len() == size);
145 case 1:
case 2:
case 11:
case 15:
case 16:
151 time = numeric_limits<double>::quiet_NaN();
167 logdb_primary_key& pkey = *(logdb_primary_key*) key->get_data();
172 int lr_extract_evtid(Db *secondary,
const Dbt *key,
const Dbt *data, Dbt *result) {
173 logdb_primary_key& pkey = *(logdb_primary_key*) key->get_data();
178 int lr_extract_time(Db *secondary,
const Dbt *key,
const Dbt *data, Dbt *result) {
179 logdb_primary_key& pkey = *(logdb_primary_key*) key->get_data();
191 const size_t len =
sizeof(logdb_primary_key);
192 if(k1->size < k2->size)
194 else if(k1->size > k2->size)
197 if( (k1->size == len) && (k2->size == len) ) {
199 logdb_primary_key& a = *(logdb_primary_key*)(k1->data);
200 logdb_primary_key& b = *(logdb_primary_key*)(k2->data);
202 if(a.time < b.time)
return -1;
203 else if(a.time > b.time)
return 1;
204 else if(a.system_id < b.system_id)
return -1;
205 else if(a.system_id > b.system_id)
return 1;
206 else if(a.event_id < b.event_id)
return -1;
207 else if(a.event_id > b.event_id)
return 1;
208 else if(a.recno < b.recno)
return -1;
209 else if(a.recno > b.recno)
return 1;
216 int compare_time(DB* db,
const DBT *k1,
const DBT*
k2){
217 if(k1->size < k2->size)
219 else if(k1->size > k2->size)
222 if( (k1->size == 8) && (k2->size == 8) ) {
223 double& a = *(
double*)(k1->data);
224 double& b = *(
double*)(k2->data);
226 else if(a > b)
return 1;
261 using gpulog::logrecord;
265 volatile bool interruption_received =
false;
274 printf(
"Received signal %d. interrupting execution\n", signum);
275 if(interruption_received)
278 interruption_received =
true;
282 int main(
int argc,
char* argv[]){
283 parse_commandline_and_config(argc,argv);
289 Db primary(NULL, 0), system_idx(NULL, 0), time_idx(NULL, 0), event_idx(NULL, 0);
290 binary_reader input_reader(input);
292 input.open(inputFileName.c_str(), ios::binary);
293 if(!input_reader.validate())
294 throw std::runtime_error(
"The input file is not valid");
298 cout <<
"Starting to convert at position " << starting_position <<
299 " with record number " << start_recno << endl;
301 idx_t current_recno = start_recno;
306 if(starting_position > 0)
307 input_reader.seek( starting_position );
318 primary.set_cachesize(0,CACHESIZE,0);
320 primary.open(NULL, (outputFileName+
".p.db").c_str(), NULL, DB_BTREE, DB_CREATE, 0);
324 system_idx.set_cachesize(0,CACHESIZE/4,0);
325 system_idx.set_flags(DB_DUP | DB_DUPSORT);
326 system_idx.open(NULL, (outputFileName+
".sys.db").c_str(), NULL, DB_BTREE, DB_CREATE , 0);
331 time_idx.set_cachesize(0,CACHESIZE/4,0);
332 time_idx.set_flags(DB_DUP | DB_DUPSORT);
333 time_idx.set_bt_compare(compare_time);
334 time_idx.open(NULL, (outputFileName+
".time.db").c_str(), NULL, DB_BTREE, DB_CREATE , 0);
336 event_idx.set_cachesize(0,CACHESIZE/4,0);
337 event_idx.set_flags(DB_DUP | DB_DUPSORT);
338 event_idx.open(NULL, (outputFileName+
".evt.db").c_str(), NULL, DB_BTREE, DB_CREATE , 0);
344 primary.associate(NULL, &time_idx , &lr_extract_time , DB_IMMUTABLE_KEY);
345 primary.associate(NULL, &event_idx , &lr_extract_evtid, DB_IMMUTABLE_KEY);
348 logdb_primary_key pkey;
349 Dbt key(&pkey,
sizeof(pkey)),data;
352 for(
int i=0; (i < recordsLimit) && !interruption_received; i++) {
354 logrecord l = input_reader.next();
365 pkey.event_id = l.msgid();
366 extract_from_ptr((
void*)l.ptr, l.len(), pkey.time, pkey.system_id);
367 pkey.recno = current_recno;
369 data.set_data((
void*)l.ptr);
370 data.set_size(l.len());
371 primary.put(NULL,&key,&data,0);
379 cout <<
"Processed the input file up to position " << input_reader.tellg() <<
380 " and record number " << current_recno << endl;
392 if(interruption_received)