1 module rocksdb.database; 2 3 import std.conv : to; 4 import std.file : isDir, exists; 5 import std.array : array; 6 import std.string : fromStringz, toStringz; 7 import std.format : format; 8 9 import core.memory : GC; 10 import core.stdc.string : strlen; 11 12 import rocksdb.batch, 13 rocksdb.options, 14 rocksdb.iterator, 15 rocksdb.queryable, 16 rocksdb.comparator, 17 rocksdb.columnfamily; 18 19 extern (C) { 20 struct rocksdb_t {}; 21 22 void rocksdb_put(rocksdb_t*, const rocksdb_writeoptions_t*, const char*, size_t, const char*, size_t, char**); 23 void rocksdb_put_cf(rocksdb_t*, const rocksdb_writeoptions_t*, rocksdb_column_family_handle_t*, const char*, size_t, const char*, size_t, char**); 24 25 char* rocksdb_get(rocksdb_t*, const rocksdb_readoptions_t*, const char*, size_t, size_t*, char**); 26 char* rocksdb_get_cf(rocksdb_t*, const rocksdb_readoptions_t*, rocksdb_column_family_handle_t*, const char*, size_t, size_t*, char**); 27 28 void rocksdb_multi_get(rocksdb_t*, const rocksdb_readoptions_t*, size_t, const char**, const size_t*, char**, size_t*, char**); 29 void rocksdb_multi_get_cf(rocksdb_t*, const rocksdb_readoptions_t*, const rocksdb_column_family_handle_t*, size_t, const char**, const size_t*, char**, size_t*, char**); 30 31 void rocksdb_delete(rocksdb_t*, const rocksdb_writeoptions_t*, const char*, size_t, char**); 32 void rocksdb_delete_cf(rocksdb_t*, const rocksdb_writeoptions_t*, rocksdb_column_family_handle_t*, const char*, size_t, char**); 33 34 void rocksdb_write(rocksdb_t*, const rocksdb_writeoptions_t*, rocksdb_writebatch_t*, char**); 35 36 rocksdb_t* rocksdb_open(const rocksdb_options_t*, const char*, char**); 37 rocksdb_t* rocksdb_open_column_families(const rocksdb_options_t*, const char*, int, const char**, const rocksdb_options_t**, rocksdb_column_family_handle_t**, char**); 38 39 void rocksdb_close(rocksdb_t*); 40 41 } 42 43 void ensureRocks(char* err) { 44 if (err) { 45 throw new Exception(format("Error: %s", fromStringz(err))); 46 } 47 } 48 49 class Database { 50 mixin Getable; 51 mixin Putable; 52 mixin Removeable; 53 54 rocksdb_t* db; 55 56 DBOptions opts; 57 WriteOptions writeOptions; 58 ReadOptions readOptions; 59 60 ColumnFamily[string] columnFamilies; 61 62 this(DBOptions opts, string path, DBOptions[string] columnFamilies = null) { 63 char* err = null; 64 this.opts = opts; 65 66 string[] existingColumnFamilies; 67 68 // If there is an existing database we can check for existing column families 69 if (exists(path) && isDir(path)) { 70 // First check if the database has any column families 71 existingColumnFamilies = Database.listColumnFamilies(opts, path); 72 } 73 74 if (columnFamilies || existingColumnFamilies.length >= 1) { 75 immutable(char*)[] columnFamilyNames; 76 rocksdb_options_t*[] columnFamilyOptions; 77 78 foreach (k; existingColumnFamilies) { 79 columnFamilyNames ~= toStringz(k); 80 81 if ((k in columnFamilies) !is null) { 82 columnFamilyOptions ~= columnFamilies[k].opts; 83 } else { 84 columnFamilyOptions ~= opts.opts; 85 } 86 } 87 88 rocksdb_column_family_handle_t*[] result; 89 result.length = columnFamilyNames.length; 90 91 this.db = rocksdb_open_column_families( 92 opts.opts, 93 toStringz(path), 94 cast(int)columnFamilyNames.length, 95 columnFamilyNames.ptr, 96 columnFamilyOptions.ptr, 97 result.ptr, 98 &err); 99 100 foreach (idx, handle; result) { 101 this.columnFamilies[existingColumnFamilies[idx]] = new ColumnFamily( 102 this, 103 existingColumnFamilies[idx], 104 handle, 105 ); 106 } 107 } else { 108 this.db = rocksdb_open(opts.opts, toStringz(path), &err); 109 } 110 111 err.ensureRocks(); 112 113 this.writeOptions = new WriteOptions; 114 this.readOptions = new ReadOptions; 115 } 116 117 ~this() { 118 if (this.db) { 119 foreach (k, v; this.columnFamilies) { 120 rocksdb_column_family_handle_destroy(v.cf); 121 } 122 123 rocksdb_close(this.db); 124 } 125 } 126 127 ColumnFamily createColumnFamily(string name, DBOptions opts = null) { 128 char* err = null; 129 130 auto cfh = rocksdb_create_column_family(this.db, (opts ? opts : this.opts).opts, toStringz(name), &err); 131 err.ensureRocks(); 132 133 this.columnFamilies[name] = new ColumnFamily(this, name, cfh); 134 return this.columnFamilies[name]; 135 } 136 137 static string[] listColumnFamilies(DBOptions opts, string path) { 138 char* err = null; 139 size_t numColumnFamilies; 140 141 char** columnFamilies = rocksdb_list_column_families( 142 opts.opts, 143 toStringz(path), 144 &numColumnFamilies, 145 &err); 146 147 err.ensureRocks(); 148 149 string[] result = new string[](numColumnFamilies); 150 151 // Iterate over and convert/copy all column family names 152 for (size_t i = 0; i < numColumnFamilies; i++) { 153 result[i] = fromStringz(columnFamilies[i]).to!string; 154 } 155 156 rocksdb_list_column_families_destroy(columnFamilies, numColumnFamilies); 157 158 return result; 159 } 160 161 ubyte[] getImpl(ubyte[] key, ColumnFamily family, ReadOptions opts = null) { 162 size_t len; 163 char* err; 164 ubyte* value; 165 166 if (family) { 167 value = cast(ubyte*)rocksdb_get_cf( 168 this.db, 169 (opts ? opts : this.readOptions).opts, 170 family.cf, 171 cast(char*)key.ptr, 172 key.length, 173 &len, 174 &err); 175 } else { 176 value = cast(ubyte*)rocksdb_get( 177 this.db, 178 (opts ? opts : this.readOptions).opts, 179 cast(char*)key.ptr, 180 key.length, 181 &len, 182 &err); 183 } 184 185 err.ensureRocks(); 186 GC.addRange(value, len); 187 return cast(ubyte[])value[0..len]; 188 } 189 190 void putImpl(ubyte[] key, ubyte[] value, ColumnFamily family, WriteOptions opts = null) { 191 char* err; 192 193 if (family) { 194 rocksdb_put_cf(this.db, 195 (opts ? opts : this.writeOptions).opts, 196 family.cf, 197 cast(char*)key.ptr, key.length, 198 cast(char*)value.ptr, value.length, 199 &err); 200 } else { 201 rocksdb_put(this.db, 202 (opts ? opts : this.writeOptions).opts, 203 cast(char*)key.ptr, key.length, 204 cast(char*)value.ptr, value.length, 205 &err); 206 } 207 208 err.ensureRocks(); 209 } 210 211 void removeImpl(ubyte[] key, ColumnFamily family, WriteOptions opts = null) { 212 char* err; 213 214 if (family) { 215 rocksdb_delete_cf( 216 this.db, 217 (opts ? opts : this.writeOptions).opts, 218 family.cf, 219 cast(char*)key.ptr, 220 key.length, 221 &err); 222 } else { 223 rocksdb_delete( 224 this.db, 225 (opts ? opts : this.writeOptions).opts, 226 cast(char*)key.ptr, 227 key.length, 228 &err); 229 } 230 231 err.ensureRocks(); 232 } 233 234 ubyte[][] multiGet(ubyte[][] keys, ColumnFamily family = null, ReadOptions opts = null) { 235 char*[] ckeys = new char*[](keys.length); 236 size_t[] ckeysSizes = new size_t[](keys.length); 237 238 foreach (idx, key; keys) { 239 ckeys[idx] = cast(char*)key; 240 ckeysSizes[idx] = key.length; 241 } 242 243 char*[] vals = new char*[](keys.length); 244 size_t[] valsSizes = new size_t[](keys.length); 245 char*[] errs = new char*[](keys.length); 246 247 if (family) { 248 rocksdb_multi_get_cf( 249 this.db, 250 (opts ? opts : this.readOptions).opts, 251 family.cf, 252 keys.length, 253 ckeys.ptr, 254 ckeysSizes.ptr, 255 vals.ptr, 256 valsSizes.ptr, 257 errs.ptr); 258 } else { 259 rocksdb_multi_get( 260 this.db, 261 (opts ? opts : this.readOptions).opts, 262 keys.length, 263 ckeys.ptr, 264 ckeysSizes.ptr, 265 vals.ptr, 266 valsSizes.ptr, 267 errs.ptr); 268 } 269 270 ubyte[][] result = new ubyte[][](keys.length); 271 for (int idx = 0; idx < ckeys.length; idx++) { 272 errs[idx].ensureRocks(); 273 result[idx] = cast(ubyte[])vals[idx][0..valsSizes[idx]]; 274 } 275 276 return result; 277 } 278 279 string[] multiGetString(string[] keys, ColumnFamily family = null, ReadOptions opts = null) { 280 char*[] ckeys = new char*[](keys.length); 281 size_t[] ckeysSizes = new size_t[](keys.length); 282 283 foreach (idx, key; keys) { 284 ckeys[idx] = cast(char*)key.ptr; 285 ckeysSizes[idx] = key.length; 286 } 287 288 char*[] vals = new char*[](keys.length); 289 size_t[] valsSizes = new size_t[](keys.length); 290 char*[] errs = new char*[](keys.length); 291 292 if (family) { 293 rocksdb_multi_get_cf( 294 this.db, 295 (opts ? opts : this.readOptions).opts, 296 family.cf, 297 keys.length, 298 ckeys.ptr, 299 ckeysSizes.ptr, 300 vals.ptr, 301 valsSizes.ptr, 302 errs.ptr); 303 } else { 304 rocksdb_multi_get( 305 this.db, 306 (opts ? opts : this.readOptions).opts, 307 keys.length, 308 ckeys.ptr, 309 ckeysSizes.ptr, 310 vals.ptr, 311 valsSizes.ptr, 312 errs.ptr); 313 } 314 315 string[] result = new string[](keys.length); 316 for (int idx = 0; idx < ckeys.length; idx++) { 317 errs[idx].ensureRocks(); 318 result[idx] = cast(string)vals[idx][0..valsSizes[idx]]; 319 } 320 321 return result; 322 } 323 324 void write(WriteBatch batch, WriteOptions opts = null) { 325 char* err; 326 rocksdb_write(this.db, (opts ? opts : this.writeOptions).opts, batch.batch, &err); 327 err.ensureRocks(); 328 } 329 330 Iterator iter(ReadOptions opts = null) { 331 return new Iterator(this, opts ? opts : this.readOptions); 332 } 333 334 void withIter(void delegate(Iterator) dg, ReadOptions opts = null) { 335 Iterator iter = this.iter(opts); 336 scope (exit) destroy(iter); 337 dg(iter); 338 } 339 340 void withBatch(void delegate(WriteBatch) dg, WriteOptions opts = null) { 341 WriteBatch batch = new WriteBatch; 342 scope (exit) destroy(batch); 343 scope (success) this.write(batch, opts); 344 dg(batch); 345 } 346 347 void close() { 348 destroy(this); 349 } 350 } 351 352 unittest { 353 import std.stdio : writefln; 354 import std.datetime : benchmark; 355 import rocksdb.env : Env; 356 357 writefln("Testing Database"); 358 359 auto env = new Env; 360 env.backgroundThreads = 2; 361 env.highPriorityBackgroundThreads = 1; 362 363 auto opts = new DBOptions; 364 opts.createIfMissing = true; 365 opts.errorIfExists = false; 366 opts.compression = CompressionType.NONE; 367 opts.env = env; 368 369 auto db = new Database(opts, "test"); 370 371 // Test string putting and getting 372 db.putString("key", "value"); 373 assert(db.getString("key") == "value"); 374 db.putString("key", "value2"); 375 assert(db.getString("key") == "value2"); 376 377 ubyte[] key = ['\x00', '\x00']; 378 ubyte[] value = ['\x01', '\x02']; 379 380 // Test byte based putting / getting 381 db.put(key, value); 382 assert(db.get(key) == value); 383 db.remove(key); 384 385 // Benchmarks 386 387 void writeBench(int times) { 388 for (int i = 0; i < times; i++) { 389 db.putString(i.to!string, i.to!string); 390 } 391 } 392 393 void readBench(int times) { 394 for (int i = 0; i < times; i++) { 395 assert(db.getString(i.to!string) == i.to!string); 396 } 397 } 398 399 auto writeRes = benchmark!(() => writeBench(100_000))(1); 400 writefln(" writing a value 100000 times: %sms", writeRes[0].msecs); 401 402 auto readRes = benchmark!(() => readBench(100_000))(1); 403 writefln(" reading a value 100000 times: %sms", readRes[0].msecs); 404 405 // Test batch 406 void writeBatchBench(int times) { 407 db.withBatch((batch) { 408 for (int i = 0; i < times; i++) { 409 batch.putString(i.to!string, i.to!string); 410 } 411 412 assert(batch.count() == times); 413 }); 414 } 415 416 auto writeBatchRes = benchmark!(() => writeBatchBench(100_000))(1); 417 writefln(" batch writing 100000 values: %sms", writeBatchRes[0].msecs); 418 readBench(100_000); 419 420 // Test scanning from a location 421 bool found = false; 422 auto iterFrom = db.iter(); 423 iterFrom.seek("key"); 424 foreach (key, value; iterFrom) { 425 assert(value == "value2"); 426 assert(!found); 427 found = true; 428 } 429 iterFrom.close(); 430 assert(found); 431 432 found = false; 433 int keyCount = 0; 434 auto iter = db.iter(); 435 436 foreach (key, value; iter) { 437 if (key == "key") { 438 assert(value == "value2"); 439 found = true; 440 } 441 keyCount++; 442 } 443 iter.close(); 444 assert(found); 445 assert(keyCount == 100001); 446 destroy(db); 447 }