From 1164ce92b8741d3f1bd8903d9e7c0736a07dd2eb Mon Sep 17 00:00:00 2001 From: Yuanyuan Zhao Date: Wed, 14 Dec 2022 22:41:24 -0500 Subject: [PATCH] Implements TransactionDB with WriteBatch. --- transactiondb.go | 11 ++++++++ transactiondb_test.go | 64 ++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 74 insertions(+), 1 deletion(-) diff --git a/transactiondb.go b/transactiondb.go index a83db05..e8a7309 100644 --- a/transactiondb.go +++ b/transactiondb.go @@ -213,6 +213,17 @@ func (db *TransactionDB) PutCF(opts *WriteOptions, cf *ColumnFamilyHandle, key, return nil } +// Write writes a WriteBatch to the database +func (db *TransactionDB) Write(opts *WriteOptions, batch *WriteBatch) error { + var cErr *C.char + C.rocksdb_transactiondb_write(db.c, opts.c, batch.c, &cErr) + if cErr != nil { + defer C.rocksdb_free(unsafe.Pointer(cErr)) + return errors.New(C.GoString(cErr)) + } + return nil +} + // Delete removes the data associated with the key from the database. func (db *TransactionDB) Delete(opts *WriteOptions, key []byte) error { var ( diff --git a/transactiondb_test.go b/transactiondb_test.go index 490f240..3017ca4 100644 --- a/transactiondb_test.go +++ b/transactiondb_test.go @@ -1,6 +1,7 @@ package gorocksdb import ( + "fmt" "io/ioutil" "testing" @@ -135,10 +136,71 @@ func TestTransactionDBCRUD(t *testing.T) { } +func TestTransactionDBWriteBatchColumnFamilies(t *testing.T) { + test_cf_names := []string{"default", "cf1", "cf2"} + db, cf_handles := newTestTransactionDBColumnFamilies(t, "TestOpenTransactionDbColumnFamilies", test_cf_names) + ensure.True(t, len(cf_handles) == 3) + defer db.Close() + + var ( + wo = NewDefaultWriteOptions() + ro = NewDefaultReadOptions() + ) + + // WriteBatch PutCF + { + batch := NewWriteBatch() + for h_idx := 1; h_idx <= 2; h_idx++ { + for k_idx := 0; k_idx <= 2; k_idx++ { + batch.PutCF(cf_handles[h_idx], []byte(fmt.Sprintf("%s_key_%d", test_cf_names[h_idx], k_idx)), + []byte(fmt.Sprintf("%s_value_%d", test_cf_names[h_idx], k_idx))) + } + } + ensure.Nil(t, db.Write(wo, batch)) + batch.Destroy() + } + + // Read back + { + for h_idx := 1; h_idx <= 2; h_idx++ { + for k_idx := 0; k_idx <= 2; k_idx++ { + data, err := db.GetCF(ro, cf_handles[h_idx], []byte(fmt.Sprintf("%s_key_%d", test_cf_names[h_idx], k_idx))) + ensure.Nil(t, err) + ensure.DeepEqual(t, data.Data(), []byte(fmt.Sprintf("%s_value_%d", test_cf_names[h_idx], k_idx))) + } + } + } + + // WriteBatch DeleteCF (DeleteRangeCF not implemented) + { + batch := NewWriteBatch() + batch.DeleteCF(cf_handles[1], []byte(test_cf_names[1]+"_key_0")) + batch.DeleteCF(cf_handles[1], []byte(test_cf_names[1]+"_key_1")) + ensure.Nil(t, db.Write(wo, batch)) + } + + // Read back the remaining keys + { + // All keys on "cf2" are still there. + // Only key2 on "cf1" still remains + for h_idx := 1; h_idx <= 2; h_idx++ { + for k_idx := 0; k_idx <= 2; k_idx++ { + data, err := db.GetCF(ro, cf_handles[h_idx], []byte(fmt.Sprintf("%s_key_%d", test_cf_names[h_idx], k_idx))) + ensure.Nil(t, err) + if h_idx == 2 || k_idx == 2 { + ensure.DeepEqual(t, data.Data(), []byte(fmt.Sprintf("%s_value_%d", test_cf_names[h_idx], k_idx))) + } else { + ensure.True(t, len(data.Data()) == 0) + } + } + } + } +} + func TestTransactionDBCRUDColumnFamilies(t *testing.T) { test_cf_names := []string{"default", "cf1", "cf2"} db, cf_handles := newTestTransactionDBColumnFamilies(t, "TestOpenTransactionDbColumnFamilies", test_cf_names) - ensure.True(t, 3 == len(cf_handles)) + ensure.True(t, len(cf_handles) == 3) defer db.Close() var (