Swarm-NG  1.1
bdb_database.cpp
1 #include "../common.hpp"
2 
3 #include <libgen.h>
4 #include <unistd.h>
5 #include <limits>
6 #include <boost/concept_check.hpp>
7 
8 #include "bdb_database.hpp"
9 
10 namespace swarm { namespace log {
11 
12 using gpulog::logrecord;
13 
14 const int CACHESIZE = 1024*1024*64 ;
15 
16 const char* fileFormatVersion = "1";
17 const char* swarmngVersion = "1.1";
18 
19 
20 bool operator <(const pkey_t& a, const pkey_t& b){
21  if(a.time == b.time)
22  return a.system_event_id < b.system_event_id;
23  else
24  return a.time < b.time;
25 }
26 
27 
34 template<typename T>
35 void put_in_dbt(const T& t, Dbt* data){
36  data->set_flags(DB_DBT_APPMALLOC);
37  data->set_size(sizeof(T));
38  data->set_data(new T(t));
39 }
40 
41 
51 int lr_extract_sysid(Db *secondary, const Dbt *key, const Dbt *data, Dbt *result) {
52  pkey_t& pkey = *(pkey_t*) key->get_data();
53  put_in_dbt(pkey.system_id(), result);
54  return 0;
55 }
56 
57 int lr_extract_evtid(Db *secondary, const Dbt *key, const Dbt *data, Dbt *result) {
58  pkey_t& pkey = *(pkey_t*) key->get_data();
59  put_in_dbt(pkey.event_id(), result);
60  return 0;
61 }
62 
63 int lr_extract_time(Db *secondary, const Dbt *key, const Dbt *data, Dbt *result) {
64  pkey_t& pkey = *(pkey_t*) key->get_data();
65  put_in_dbt(pkey.time , result);
66  return 0;
67 }
68 
69 
70 
74 template<typename T>
75 int bdb_compare(DB* db, const DBT *k1, const DBT* k2){
76  if(k1->size < k2->size)
77  return -1;
78  else if(k1->size > k2->size)
79  return 1;
80  else{
81  if( (k1->size == sizeof(T)) && (k2->size == sizeof(T)) ) {
82  T& a = *(T*)(k1->data);
83  T& b = *(T*)(k2->data);
84  if(a < b) return -1;
85  else if(b < a) return 1;
86  else return 0;
87  }else{
88  return 0;
89  }
90  }
91 }
92 
93 const int create_mode = 00600;
94 
95 void bdb_database::openEnv(const std::string& basedir){
96  std::cerr << "Initializing BDB environment in `" << basedir << "`" << std::endl;
97  env->open(basedir.c_str(),DB_CREATE | DB_INIT_CDB | DB_INIT_MPOOL,create_mode);
98 }
99 
100 DbEnv* bdb_database::createDefaultEnv(){
101  DbEnv* env = new DbEnv(0);
102  env->set_cachesize(0,CACHESIZE,0);
103  return env;
104 }
105 
106 std::string directory_name(const std::string& s){
107  //std::cerr << "dirname for `"<< s << "`";
108  char* w = strdup(s.c_str());
109  std::string d(dirname(w));
110  free(w);
111  //std::cerr << " is `" << d << "`" << std::endl;
112  return d;
113 }
114 
115 std::string base_name(const std::string& s){
116  //std::cerr << "basename for `"<< s << "`";
117  char* w = strdup(s.c_str());
118  std::string d(basename(w));
119  free(w);
120  //std::cerr << " is `" << d << "`" << std::endl;
121  return d;
122 }
123 
124 void bdb_database::openInternal(const std::string& pathName, int open_mode){
125 
126  openEnv(directory_name(pathName));
127  std::string fileName = base_name(pathName);
128 
129  const char * fn = fileName.c_str();
130 
131  metadata.open(NULL, fn, "metadata", DB_BTREE, open_mode, create_mode);
132 
133  primary.set_bt_compare(bdb_compare<pkey_t>);
134  primary.open(NULL, fn, "primary", DB_BTREE, open_mode, create_mode);
135 
136  // Open up the system index database, it has to support
137  // duplicates and it is given a smaller cache size
138  // system_idx.set_cachesize(0,CACHESIZE,0);
139  system_idx.set_flags(DB_DUP | DB_DUPSORT);
140  system_idx.set_bt_compare(bdb_compare<sysid_t>);
141  system_idx.set_dup_compare(bdb_compare<pkey_t>);
142  system_idx.open(NULL, fn, "system_idx", DB_BTREE, open_mode , create_mode);
143 
144  // Open up the time index database, it has to support
145  // duplicates because our index is not a unique index and
146  // it takes a smaller cache size
147  // time_idx.set_cachesize(0,CACHESIZE,0);
148  time_idx.set_flags(DB_DUP | DB_DUPSORT);
149  time_idx.set_bt_compare(bdb_compare<float>);
150  time_idx.set_dup_compare(bdb_compare<pkey_t>);
151  time_idx.open(NULL, fn, "time_idx", DB_BTREE, open_mode , create_mode);
152 
153  // event_idx.set_cachesize(0,CACHESIZE,0);
154  event_idx.set_flags(DB_DUP | DB_DUPSORT);
155  event_idx.set_bt_compare(bdb_compare<evtid_t>);
156  event_idx.set_dup_compare(bdb_compare<pkey_t>);
157  event_idx.open(NULL, fn, "event_idx", DB_BTREE, open_mode , create_mode);
158 
159  // Associate the primary table with the indices
160  // the lr_extract_* is the function that defines
161  // the indexing scheme
162  primary.associate(NULL, &system_idx, &lr_extract_sysid, DB_IMMUTABLE_KEY);
163  primary.associate(NULL, &time_idx , &lr_extract_time , DB_IMMUTABLE_KEY);
164  primary.associate(NULL, &event_idx , &lr_extract_evtid, DB_IMMUTABLE_KEY);
165 }
166 
167 void bdb_database::openForReading(const std::string& fileName) {
168  openInternal(fileName, DB_RDONLY);
169  validateVersionInfo();
170 }
171 
172 void bdb_database::create(const std::string& fileName){
173  openInternal(fileName, DB_CREATE);
174  fillVersionInfo();
175 }
176 
177 void bdb_database::createEmpty(const std::string& fileName){
178  openInternal(fileName, DB_CREATE );
179  fillVersionInfo();
180 }
181 
182 void bdb_database::put(logrecord& lr){
183 
184  double time; int sys;
185 
186  // Based on the implementation in query.cpp
187  switch(lr.msgid()){
188  case 1: case 2: case 11: case 15: case 16:
189  lr >> time >> sys;
190  break;
191 
192  default:
193  sys = 0;
194  time = std::numeric_limits<double>::quiet_NaN();
195  break;
196  }
197 
198  pkey_t pkey( (float) time, sys, lr.msgid());
199 
200  Dbt key(&pkey,sizeof(pkey));
201  Dbt data((void*)lr.ptr,lr.len());
202  primary.put(NULL,&key,&data,0);
203 }
204 
205 void bdb_database::addMetaData(const std::string name, const std::string value){
206  Dbt key((void *)name.data(),name.size()), data((void*)value.data(), value.size());
207  metadata.put(NULL,&key,&data,0);
208 }
209 std::string bdb_database::getMetaData(const std::string name) {
210  Dbt key((void*)name.data(),name.size()), data;
211  data.set_flags(DB_DBT_MALLOC);
212 
213  metadata.get(NULL,&key,&data,0);
214 
215  // Create a new string and free the buffer
216  size_t n = data.get_size();
217  char* ptr = (char*) data.get_data();
218  std::string value(data.get_size(),0);
219  std::copy(ptr, ptr+n, value.begin());
220  free(ptr); data.set_data(0);
221 
222  return value;
223 }
224 
225 void bdb_database::fillVersionInfo() {
226  addMetaData("fileFormatVersion", fileFormatVersion);
227  addMetaData("swarmngVersion", swarmngVersion);
228 }
229 bool bdb_database::validateVersionInfo() {
230  return (getMetaData("fileFormatVersion") == fileFormatVersion ) ;
231  // && (getMetaData("swarmngVersion") == swarmngVersion );
232 }
233 
234 Pprimary_cursor_t bdb_database::primary_cursor(){
235  shared_ptr<primary_cursor_t> c(new primary_cursor_t);
236  primary.cursor(0,&c->_c,0);
237  return c;
238 }
239 
240 void bdb_database::close(){
241  event_idx.close(0);
242  time_idx.close(0);
243  system_idx.close(0);
244  primary.close(0);
245 }
246 
247 
248 
249 void primary_cursor_t::close(){
250  _c->close();
251 }
252 
253 bool primary_cursor_t::get(pkey_t& key, lrw_t& lr, uint32_t flags){
254  Dbt k;
255  Dbt d;
256  k.set_data(&key);
257  d.set_data(lr.ptr);
258  k.set_ulen(sizeof(key));
259  d.set_ulen(lr.len);
260  d.set_flags(DB_DBT_USERMEM);
261  k.set_flags(DB_DBT_USERMEM);
262  return _c->get(&k,&d,flags) == 0;
263 }
264 
265 bool primary_cursor_t::position_at(pkey_t& key,lrw_t& lr){
266  Dbt k;
267  Dbt d;
268  k.set_data(&key);
269  d.set_data(lr.ptr);
270  k.set_ulen(sizeof(key));
271  d.set_ulen(lr.len);
272  d.set_flags(DB_DBT_USERMEM);
273  k.set_flags(DB_DBT_USERMEM);
274  k.set_size(sizeof(key));
275  return _c->get(&k,&d,DB_SET_RANGE) == 0;
276 }
277 
278 
279 void bdb_database::flush()
280 {
281  metadata.sync(0);
282  primary.sync(0);time_idx.sync(0); event_idx.sync(0); system_idx.sync(0);
283 }
284 
285 
286 } } // close namespace log :: swarm