1 module rocksdb.columnfamily;
2 
3 import rocksdb.options : rocksdb_options_t, ReadOptions, WriteOptions;
4 import rocksdb.iterator : Iterator;
5 import rocksdb.database : Database, rocksdb_t, ensureRocks;
6 import rocksdb.queryable : Getable, Putable, Removeable;
7 
8 extern (C) {
9   struct rocksdb_column_family_handle_t {};
10 
11   char** rocksdb_list_column_families(const rocksdb_options_t*, const char*, size_t*, char**);
12   void rocksdb_list_column_families_destroy(char**, size_t);
13 
14   rocksdb_column_family_handle_t* rocksdb_create_column_family(rocksdb_t*, const rocksdb_options_t*, const char*, char**);
15   void rocksdb_drop_column_family(rocksdb_t*, rocksdb_column_family_handle_t*, char**);
16   void rocksdb_column_family_handle_destroy(rocksdb_column_family_handle_t*);
17 }
18 
19 class ColumnFamily {
20   mixin Getable;
21   mixin Putable;
22   mixin Removeable;
23 
24   Database db;
25   string name;
26   rocksdb_column_family_handle_t* cf;
27 
28   this(Database db, string name, rocksdb_column_family_handle_t* cf) {
29     this.db = db;
30     this.name = name;
31     this.cf = cf;
32   }
33 
34   Iterator iter(ReadOptions opts = null) {
35     return new Iterator(this.db, this, opts ? opts : this.db.readOptions);
36   }
37 
38   void withIter(void delegate(Iterator) dg, ReadOptions opts = null) {
39     Iterator iter = this.iter(opts);
40     scope (exit) destroy(iter);
41     dg(iter);
42   }
43 
44   void drop() {
45     char* err = null;
46     rocksdb_drop_column_family(this.db.db, this.cf, &err);
47     err.ensureRocks();
48   }
49 
50   ubyte[][] multiGet(ubyte[][] keys, ReadOptions opts = null) {
51     return this.db.multiGet(keys, this, opts);
52   }
53 
54   string[] multiGetString(string[] keys, ReadOptions opts = null) {
55     return this.db.multiGetString(keys, this, opts);
56   }
57 
58   ubyte[] getImpl(ubyte[] key, ColumnFamily family, ReadOptions opts = null) {
59     assert(family == this || family is null);
60     return this.db.getImpl(key, this, opts);
61   }
62 
63   void putImpl(ubyte[] key, ubyte[] value, ColumnFamily family, WriteOptions opts = null) {
64     assert(family == this || family is null);
65     this.db.putImpl(key, value, this, opts);
66   }
67 
68   void removeImpl(ubyte[] key, ColumnFamily family, WriteOptions opts = null) {
69     assert(family == this || family is null);
70     this.db.removeImpl(key, this, opts);
71   }
72 }
73 
74 unittest {
75   import std.stdio : writefln;
76   import std.datetime : benchmark;
77   import std.conv : to;
78   import std.algorithm.searching : startsWith;
79   import rocksdb.options : DBOptions, CompressionType;
80 
81   writefln("Testing Column Families");
82 
83   // DB Options
84   auto opts = new DBOptions;
85   opts.createIfMissing = true;
86   opts.errorIfExists = false;
87   opts.compression = CompressionType.NONE;
88 
89   // Create the database (if it does not exist)
90   auto db = new Database(opts, "test");
91 
92   string[] columnFamilies = [
93     "test",
94     "test1",
95     "test2",
96     "test3",
97     "test4",
98     "wow",
99   ];
100 
101   // create a bunch of column families
102   foreach (cf; columnFamilies) {
103     if ((cf in db.columnFamilies) is null) {
104       db.createColumnFamily(cf);
105     }
106   }
107 
108   db.close();
109   db = new Database(opts, "test");
110   scope (exit) destroy(db);
111 
112   // Test column family listing
113   assert(Database.listColumnFamilies(opts, "test").length == columnFamilies.length + 1);
114 
115   void testColumnFamily(ColumnFamily cf, int times) {
116     for (int i = 0; i < times; i++) {
117       cf.putString(cf.name ~ i.to!string, i.to!string);
118     }
119 
120     for (int i = 0; i < times; i++) {
121       assert(cf.getString(cf.name ~ i.to!string) == i.to!string);
122     }
123 
124     cf.withIter((iter) {
125       foreach (key, value; iter) {
126         assert(key.startsWith(cf.name));
127       }
128     });
129   }
130 
131   foreach (name, cf; db.columnFamilies) {
132     if (name == "default") continue;
133 
134     writefln("  %s", name);
135     testColumnFamily(cf, 1000);
136   }
137 }