1 module rocksdb.database;
2 
3 import std.stdio : writefln;
4 import std.conv : to;
5 import std.string : fromStringz, toStringz;
6 import std.format : format;
7 import core.stdc.stdlib : cfree = free;
8 import core.stdc.string : strlen;
9 
10 import rocksdb.batch,
11        rocksdb.options,
12        rocksdb.iterator,
13        rocksdb.comparator;
14 
15 extern (C) {
16   struct rocksdb_t {};
17 
18   void rocksdb_put(rocksdb_t*, const rocksdb_writeoptions_t*, const char*, size_t, const char*, size_t, char**);
19   char* rocksdb_get(rocksdb_t*, const rocksdb_readoptions_t*, const char*, size_t, size_t*, char**);
20 
21   void rocksdb_write(rocksdb_t*, const rocksdb_writeoptions_t*, rocksdb_writebatch_t*, char**);
22 
23   rocksdb_t* rocksdb_open(const rocksdb_options_t*, const char* name, char** errptr);
24   void rocksdb_close(rocksdb_t*);
25 }
26 
27 class Database {
28   rocksdb_t* db;
29 
30   WriteOptions writeOptions;
31   ReadOptions readOptions;
32 
33   this(DBOptions opts, string path) {
34     char *err = null;
35     this.db = rocksdb_open(opts.opts, toStringz(path), &err);
36 
37     if (err) {
38       throw new Exception(format("Failed to open rocksdb: %s", fromStringz(err)));
39     }
40     
41     this.writeOptions = new WriteOptions;
42     this.readOptions = new ReadOptions;
43   }
44 
45   ~this() {
46     if (this.db) {
47       rocksdb_close(this.db);
48     }
49   }
50 
51   string get(string key, ReadOptions opts=null) {
52     size_t len;
53     immutable char* ckey = toStringz(key);
54 
55     char* err;
56     char* value = rocksdb_get(this.db, (opts ? opts : this.readOptions).opts, ckey, key.length, &len, &err);
57 
58     if (err) {
59       throw new Exception(format("Failed to get: %s", fromStringz(err)));
60     }
61 
62     string result = (value[0..len]).to!string;
63 
64     // string result = fromStringz(value).to!string;
65     cfree(value);
66 
67     return result;
68   }
69 
70   void put(string key, string value, WriteOptions opts = null) {
71     immutable char* ckey = toStringz(key);
72     immutable char* cvalue = toStringz(value);
73 
74     char* err;
75     rocksdb_put(this.db, (opts ? opts : this.writeOptions).opts, ckey, key.length, cvalue, value.length, &err);
76 
77     if (err) {
78       throw new Exception(format("Failed to put: %s", fromStringz(err)));
79     }
80   }
81 
82   void write(WriteBatch batch, WriteOptions opts = null) {
83     char* err;
84     rocksdb_write(this.db, (opts ? opts : this.writeOptions).opts, batch.batch, &err);
85     if (err) {
86       throw new Exception(format("Failed to write: %s", fromStringz(err)));
87     }
88   }
89 
90   Iterator iter(ReadOptions opts = null) {
91     return new Iterator(this, opts ? opts : this.readOptions);
92   }
93 }
94 
95 unittest {
96   import std.stdio : writefln;
97 
98   auto opts = new DBOptions;
99   opts.createIfMissing = true;
100   opts.errorIfExists = false;
101   opts.compression = CompressionType.NONE;
102   opts.enableStatistics();
103 
104   auto db = new Database(opts, "test");
105   db.put("key", "value");
106 
107   assert(db.get("key") == "value");
108   db.put("key", "value2");
109   assert(db.get("key") == "value2");
110 
111   // Benchmarks
112   import std.datetime : benchmark;
113 
114   void writeBench(int times) {
115     for (int i = 0; i < times; i++) {
116       db.put(i.to!string, i.to!string);
117     }
118   }
119 
120   void readBench(int times) {
121     for (int i = 0; i < times; i++) {
122       assert(db.get(i.to!string) == i.to!string);
123     }
124   }
125 
126   auto writeRes = benchmark!(() => writeBench(100_000))(1);
127   writefln("Writing a value 100000 times: %sms", writeRes[0].msecs);
128 
129   auto readRes = benchmark!(() => readBench(100_000))(1);
130   writefln("Reading a value 100000 times: %sms", readRes[0].msecs);
131 
132   // Test batch
133   auto batch = new WriteBatch;
134 
135   void writeBatchBench(int times) {
136     auto batch = new WriteBatch;
137 
138     for (int i = 0; i < times; i++) {
139       batch.put(i.to!string, i.to!string);
140     }
141 
142     assert(batch.count() == times);
143     db.write(batch);
144   }
145 
146   auto writeBatchRes = benchmark!(() => writeBatchBench(100_000))(1);
147   writefln("Batch writing 100000 values: %sms", writeBatchRes[0].msecs);
148   readBench(100_000);
149 
150   writefln("%s", opts.getStatisticsString());
151 
152   bool found = false;
153   int keyCount = 0;
154   auto iter = db.iter();
155 
156   foreach (key, value; iter) {
157     if (key == "key") {
158       assert(value == "value2");
159       found = true;
160     }
161     keyCount++;
162   }
163   destroy(iter);
164 
165   assert(found);
166 
167   writefln("Keys: %s", keyCount);
168   assert(keyCount == 100001);
169 
170   destroy(db);
171 }