1 module rocksdb.database;
2 
3 import std.conv : to;
4 import std.file : isDir, exists;
5 import std.array : array;
6 import std.string : fromStringz, toStringz;
7 import std.format : format;
8 
9 import core.memory : GC;
10 import core.stdc.string : strlen;
11 
12 import rocksdb.batch,
13        rocksdb.options,
14        rocksdb.iterator,
15        rocksdb.queryable,
16        rocksdb.comparator,
17        rocksdb.columnfamily;
18 
19 extern (C) {
20   struct rocksdb_t {};
21 
22   void rocksdb_put(rocksdb_t*, const rocksdb_writeoptions_t*, const char*, size_t, const char*, size_t, char**);
23   void rocksdb_put_cf(rocksdb_t*, const rocksdb_writeoptions_t*, rocksdb_column_family_handle_t*, const char*, size_t, const char*, size_t, char**);
24 
25   char* rocksdb_get(rocksdb_t*, const rocksdb_readoptions_t*, const char*, size_t, size_t*, char**);
26   char* rocksdb_get_cf(rocksdb_t*, const rocksdb_readoptions_t*, rocksdb_column_family_handle_t*, const char*, size_t, size_t*, char**);
27 
28   void rocksdb_multi_get(rocksdb_t*, const rocksdb_readoptions_t*, size_t, const char**, const size_t*, char**, size_t*, char**);
29   void rocksdb_multi_get_cf(rocksdb_t*, const rocksdb_readoptions_t*, const rocksdb_column_family_handle_t*, size_t, const char**, const size_t*, char**, size_t*, char**);
30 
31   void rocksdb_delete(rocksdb_t*, const rocksdb_writeoptions_t*, const char*, size_t, char**);
32   void rocksdb_delete_cf(rocksdb_t*, const rocksdb_writeoptions_t*, rocksdb_column_family_handle_t*, const char*, size_t, char**);
33 
34   void rocksdb_write(rocksdb_t*, const rocksdb_writeoptions_t*, rocksdb_writebatch_t*, char**);
35 
36   rocksdb_t* rocksdb_open(const rocksdb_options_t*, const char*, char**);
37   rocksdb_t* rocksdb_open_column_families(const rocksdb_options_t*, const char*, int, const char**, const rocksdb_options_t**, rocksdb_column_family_handle_t**, char**);
38 
39   void rocksdb_close(rocksdb_t*);
40 
41 }
42 
43 void ensureRocks(char* err) {
44   if (err) {
45     throw new Exception(format("Error: %s", fromStringz(err)));
46   }
47 }
48 
49 class Database {
50   mixin Getable;
51   mixin Putable;
52   mixin Removeable;
53 
54   rocksdb_t* db;
55 
56   DBOptions opts;
57   WriteOptions writeOptions;
58   ReadOptions readOptions;
59 
60   ColumnFamily[string] columnFamilies;
61 
62   this(DBOptions opts, string path, DBOptions[string] columnFamilies = null) {
63     char* err = null;
64     this.opts = opts;
65 
66     string[] existingColumnFamilies;
67 
68     // If there is an existing database we can check for existing column families
69     if (exists(path) && isDir(path)) {
70       // First check if the database has any column families
71       existingColumnFamilies = Database.listColumnFamilies(opts, path);
72     }
73 
74     if (columnFamilies || existingColumnFamilies.length >= 1) {
75       immutable(char*)[] columnFamilyNames;
76       rocksdb_options_t*[] columnFamilyOptions;
77 
78       foreach (k; existingColumnFamilies) {
79         columnFamilyNames ~= toStringz(k);
80 
81         if ((k in columnFamilies) !is null) {
82           columnFamilyOptions ~= columnFamilies[k].opts;
83         } else {
84           columnFamilyOptions ~= opts.opts;
85         }
86       }
87 
88       rocksdb_column_family_handle_t*[] result;
89       result.length = columnFamilyNames.length;
90 
91       this.db = rocksdb_open_column_families(
92         opts.opts,
93         toStringz(path),
94         cast(int)columnFamilyNames.length,
95         columnFamilyNames.ptr,
96         columnFamilyOptions.ptr,
97         result.ptr,
98         &err);
99 
100       foreach (idx, handle; result) {
101         this.columnFamilies[existingColumnFamilies[idx]] = new ColumnFamily(
102           this,
103           existingColumnFamilies[idx],
104           handle,
105         );
106       }
107     } else {
108       this.db = rocksdb_open(opts.opts, toStringz(path), &err);
109     }
110 
111     err.ensureRocks();
112 
113     this.writeOptions = new WriteOptions;
114     this.readOptions = new ReadOptions;
115   }
116 
117   ~this() {
118     if (this.db) {
119       foreach (k, v; this.columnFamilies) {
120         rocksdb_column_family_handle_destroy(v.cf);
121       }
122 
123       rocksdb_close(this.db);
124     }
125   }
126 
127   ColumnFamily createColumnFamily(string name, DBOptions opts = null) {
128     char* err = null;
129 
130     auto cfh = rocksdb_create_column_family(this.db, (opts ? opts : this.opts).opts, toStringz(name), &err);
131     err.ensureRocks();
132 
133     this.columnFamilies[name] = new ColumnFamily(this, name, cfh);
134     return this.columnFamilies[name];
135   }
136 
137   static string[] listColumnFamilies(DBOptions opts, string path) {
138     char* err = null;
139     size_t numColumnFamilies;
140 
141     char** columnFamilies = rocksdb_list_column_families(
142       opts.opts,
143       toStringz(path),
144       &numColumnFamilies,
145       &err);
146 
147     err.ensureRocks();
148 
149     string[] result = new string[](numColumnFamilies);
150 
151     // Iterate over and convert/copy all column family names
152     for (size_t i = 0; i < numColumnFamilies; i++) {
153       result[i] = fromStringz(columnFamilies[i]).to!string;
154     }
155 
156     rocksdb_list_column_families_destroy(columnFamilies, numColumnFamilies);
157 
158     return result;
159   }
160 
161   ubyte[] getImpl(ubyte[] key, ColumnFamily family, ReadOptions opts = null) {
162     size_t len;
163     char* err;
164     ubyte* value;
165 
166     if (family) {
167       value = cast(ubyte*)rocksdb_get_cf(
168         this.db,
169         (opts ? opts : this.readOptions).opts,
170         family.cf,
171         cast(char*)key.ptr,
172         key.length,
173         &len,
174         &err);
175     } else {
176       value = cast(ubyte*)rocksdb_get(
177         this.db,
178         (opts ? opts : this.readOptions).opts,
179         cast(char*)key.ptr,
180         key.length,
181         &len,
182         &err);
183     }
184 
185     err.ensureRocks();
186     GC.addRange(value, len);
187     return cast(ubyte[])value[0..len];
188   }
189 
190   void putImpl(ubyte[] key, ubyte[] value, ColumnFamily family, WriteOptions opts = null) {
191     char* err;
192 
193     if (family) {
194       rocksdb_put_cf(this.db,
195         (opts ? opts : this.writeOptions).opts,
196         family.cf,
197         cast(char*)key.ptr, key.length,
198         cast(char*)value.ptr, value.length,
199         &err);
200     } else {
201       rocksdb_put(this.db,
202         (opts ? opts : this.writeOptions).opts,
203         cast(char*)key.ptr, key.length,
204         cast(char*)value.ptr, value.length,
205         &err);
206     }
207 
208     err.ensureRocks();
209   }
210 
211   void removeImpl(ubyte[] key, ColumnFamily family, WriteOptions opts = null) {
212     char* err;
213 
214     if (family) {
215       rocksdb_delete_cf(
216         this.db,
217         (opts ? opts : this.writeOptions).opts,
218         family.cf,
219         cast(char*)key.ptr,
220         key.length,
221         &err);
222     } else {
223       rocksdb_delete(
224         this.db,
225         (opts ? opts : this.writeOptions).opts,
226         cast(char*)key.ptr,
227         key.length,
228         &err);
229     }
230 
231     err.ensureRocks();
232   }
233 
234   ubyte[][] multiGet(ubyte[][] keys, ColumnFamily family = null, ReadOptions opts = null) {
235     char*[] ckeys = new char*[](keys.length);
236     size_t[] ckeysSizes = new size_t[](keys.length);
237 
238     foreach (idx, key; keys) {
239       ckeys[idx] = cast(char*)key;
240       ckeysSizes[idx] = key.length;
241     }
242 
243     char*[] vals = new char*[](keys.length);
244     size_t[] valsSizes = new size_t[](keys.length);
245     char*[] errs = new char*[](keys.length);
246 
247     if (family) {
248       rocksdb_multi_get_cf(
249         this.db,
250         (opts ? opts : this.readOptions).opts,
251         family.cf,
252         keys.length,
253         ckeys.ptr,
254         ckeysSizes.ptr,
255         vals.ptr,
256         valsSizes.ptr,
257         errs.ptr);
258     } else {
259       rocksdb_multi_get(
260         this.db,
261         (opts ? opts : this.readOptions).opts,
262         keys.length,
263         ckeys.ptr,
264         ckeysSizes.ptr,
265         vals.ptr,
266         valsSizes.ptr,
267         errs.ptr);
268     }
269 
270     ubyte[][] result = new ubyte[][](keys.length);
271     for (int idx = 0; idx < ckeys.length; idx++) {
272       errs[idx].ensureRocks();
273       result[idx] = cast(ubyte[])vals[idx][0..valsSizes[idx]];
274     }
275 
276     return result;
277   }
278 
279   string[] multiGetString(string[] keys, ColumnFamily family = null, ReadOptions opts = null) {
280     char*[] ckeys = new char*[](keys.length);
281     size_t[] ckeysSizes = new size_t[](keys.length);
282 
283     foreach (idx, key; keys) {
284       ckeys[idx] = cast(char*)key.ptr;
285       ckeysSizes[idx] = key.length;
286     }
287 
288     char*[] vals = new char*[](keys.length);
289     size_t[] valsSizes = new size_t[](keys.length);
290     char*[] errs = new char*[](keys.length);
291 
292     if (family) {
293       rocksdb_multi_get_cf(
294         this.db,
295         (opts ? opts : this.readOptions).opts,
296         family.cf,
297         keys.length,
298         ckeys.ptr,
299         ckeysSizes.ptr,
300         vals.ptr,
301         valsSizes.ptr,
302         errs.ptr);
303     } else {
304       rocksdb_multi_get(
305         this.db,
306         (opts ? opts : this.readOptions).opts,
307         keys.length,
308         ckeys.ptr,
309         ckeysSizes.ptr,
310         vals.ptr,
311         valsSizes.ptr,
312         errs.ptr);
313     }
314 
315     string[] result = new string[](keys.length);
316     for (int idx = 0; idx < ckeys.length; idx++) {
317       errs[idx].ensureRocks();
318       result[idx] = cast(string)vals[idx][0..valsSizes[idx]];
319     }
320 
321     return result;
322   }
323 
324   void write(WriteBatch batch, WriteOptions opts = null) {
325     char* err;
326     rocksdb_write(this.db, (opts ? opts : this.writeOptions).opts, batch.batch, &err);
327     err.ensureRocks();
328   }
329 
330   Iterator iter(ReadOptions opts = null) {
331     return new Iterator(this, opts ? opts : this.readOptions);
332   }
333 
334   void withIter(void delegate(Iterator) dg, ReadOptions opts = null) {
335     Iterator iter = this.iter(opts);
336     scope (exit) destroy(iter);
337     dg(iter);
338   }
339 
340   void withBatch(void delegate(WriteBatch) dg, WriteOptions opts = null) {
341     WriteBatch batch = new WriteBatch;
342     scope (exit) destroy(batch);
343     scope (success) this.write(batch, opts);
344     dg(batch);
345   }
346 
347   void close() {
348     destroy(this);
349   }
350 }
351 
352 unittest {
353   import std.stdio : writefln;
354   import std.datetime : benchmark;
355   import rocksdb.env : Env;
356 
357   writefln("Testing Database");
358 
359   auto env = new Env;
360   env.backgroundThreads = 2;
361   env.highPriorityBackgroundThreads = 1;
362 
363   auto opts = new DBOptions;
364   opts.createIfMissing = true;
365   opts.errorIfExists = false;
366   opts.compression = CompressionType.NONE;
367   opts.env = env;
368 
369   auto db = new Database(opts, "test");
370 
371   // Test string putting and getting
372   db.putString("key", "value");
373   assert(db.getString("key") == "value");
374   db.putString("key", "value2");
375   assert(db.getString("key") == "value2");
376 
377   ubyte[] key = ['\x00', '\x00'];
378   ubyte[] value = ['\x01', '\x02'];
379 
380   // Test byte based putting / getting
381   db.put(key, value);
382   assert(db.get(key) == value);
383   db.remove(key);
384 
385   // Benchmarks
386 
387   void writeBench(int times) {
388     for (int i = 0; i < times; i++) {
389       db.putString(i.to!string, i.to!string);
390     }
391   }
392 
393   void readBench(int times) {
394     for (int i = 0; i < times; i++) {
395       assert(db.getString(i.to!string) == i.to!string);
396     }
397   }
398 
399   auto writeRes = benchmark!(() => writeBench(100_000))(1);
400   writefln("  writing a value 100000 times: %sms", writeRes[0].msecs);
401 
402   auto readRes = benchmark!(() => readBench(100_000))(1);
403   writefln("  reading a value 100000 times: %sms", readRes[0].msecs);
404 
405   // Test batch
406   void writeBatchBench(int times) {
407     db.withBatch((batch) {
408       for (int i = 0; i < times; i++) {
409         batch.putString(i.to!string, i.to!string);
410       }
411 
412       assert(batch.count() == times);
413     });
414   }
415 
416   auto writeBatchRes = benchmark!(() => writeBatchBench(100_000))(1);
417   writefln("  batch writing 100000 values: %sms", writeBatchRes[0].msecs);
418   readBench(100_000);
419 
420   // Test scanning from a location
421   bool found = false;
422   auto iterFrom = db.iter();
423   iterFrom.seek("key");
424   foreach (key, value; iterFrom) {
425     assert(value == "value2");
426     assert(!found);
427     found = true;
428   }
429   iterFrom.close();
430   assert(found);
431 
432   found = false;
433   int keyCount = 0;
434   auto iter = db.iter();
435 
436   foreach (key, value; iter) {
437     if (key == "key") {
438       assert(value == "value2");
439       found = true;
440     }
441     keyCount++;
442   }
443   iter.close();
444   assert(found);
445   assert(keyCount == 100001);
446   destroy(db);
447 }