Skip to content

Commit

Permalink
Implements TransactionDB with WriteBatch.
Browse files Browse the repository at this point in the history
  • Loading branch information
yuanyuanzhao3 committed Dec 15, 2022
1 parent 5375896 commit 1164ce9
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 1 deletion.
11 changes: 11 additions & 0 deletions transactiondb.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,17 @@ func (db *TransactionDB) PutCF(opts *WriteOptions, cf *ColumnFamilyHandle, key,
return nil
}

// Write writes a WriteBatch to the database
func (db *TransactionDB) Write(opts *WriteOptions, batch *WriteBatch) error {
var cErr *C.char
C.rocksdb_transactiondb_write(db.c, opts.c, batch.c, &cErr)
if cErr != nil {
defer C.rocksdb_free(unsafe.Pointer(cErr))
return errors.New(C.GoString(cErr))
}
return nil
}

// Delete removes the data associated with the key from the database.
func (db *TransactionDB) Delete(opts *WriteOptions, key []byte) error {
var (
Expand Down
64 changes: 63 additions & 1 deletion transactiondb_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package gorocksdb

import (
"fmt"
"io/ioutil"
"testing"

Expand Down Expand Up @@ -135,10 +136,71 @@ func TestTransactionDBCRUD(t *testing.T) {

}

func TestTransactionDBWriteBatchColumnFamilies(t *testing.T) {
test_cf_names := []string{"default", "cf1", "cf2"}
db, cf_handles := newTestTransactionDBColumnFamilies(t, "TestOpenTransactionDbColumnFamilies", test_cf_names)
ensure.True(t, len(cf_handles) == 3)
defer db.Close()

var (
wo = NewDefaultWriteOptions()
ro = NewDefaultReadOptions()
)

// WriteBatch PutCF
{
batch := NewWriteBatch()
for h_idx := 1; h_idx <= 2; h_idx++ {
for k_idx := 0; k_idx <= 2; k_idx++ {
batch.PutCF(cf_handles[h_idx], []byte(fmt.Sprintf("%s_key_%d", test_cf_names[h_idx], k_idx)),
[]byte(fmt.Sprintf("%s_value_%d", test_cf_names[h_idx], k_idx)))
}
}
ensure.Nil(t, db.Write(wo, batch))
batch.Destroy()
}

// Read back
{
for h_idx := 1; h_idx <= 2; h_idx++ {
for k_idx := 0; k_idx <= 2; k_idx++ {
data, err := db.GetCF(ro, cf_handles[h_idx], []byte(fmt.Sprintf("%s_key_%d", test_cf_names[h_idx], k_idx)))
ensure.Nil(t, err)
ensure.DeepEqual(t, data.Data(), []byte(fmt.Sprintf("%s_value_%d", test_cf_names[h_idx], k_idx)))
}
}
}

// WriteBatch DeleteCF (DeleteRangeCF not implemented)
{
batch := NewWriteBatch()
batch.DeleteCF(cf_handles[1], []byte(test_cf_names[1]+"_key_0"))
batch.DeleteCF(cf_handles[1], []byte(test_cf_names[1]+"_key_1"))
ensure.Nil(t, db.Write(wo, batch))
}

// Read back the remaining keys
{
// All keys on "cf2" are still there.
// Only key2 on "cf1" still remains
for h_idx := 1; h_idx <= 2; h_idx++ {
for k_idx := 0; k_idx <= 2; k_idx++ {
data, err := db.GetCF(ro, cf_handles[h_idx], []byte(fmt.Sprintf("%s_key_%d", test_cf_names[h_idx], k_idx)))
ensure.Nil(t, err)
if h_idx == 2 || k_idx == 2 {
ensure.DeepEqual(t, data.Data(), []byte(fmt.Sprintf("%s_value_%d", test_cf_names[h_idx], k_idx)))
} else {
ensure.True(t, len(data.Data()) == 0)
}
}
}
}
}

func TestTransactionDBCRUDColumnFamilies(t *testing.T) {
test_cf_names := []string{"default", "cf1", "cf2"}
db, cf_handles := newTestTransactionDBColumnFamilies(t, "TestOpenTransactionDbColumnFamilies", test_cf_names)
ensure.True(t, 3 == len(cf_handles))
ensure.True(t, len(cf_handles) == 3)
defer db.Close()

var (
Expand Down

0 comments on commit 1164ce9

Please sign in to comment.