Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 33 additions & 8 deletions src/binding/transaction.cpp
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#include <chrono>
#include <sstream>
#include <thread>
#include "database.h"
Expand Down Expand Up @@ -140,7 +141,12 @@ napi_value Transaction::Abort(napi_env env, napi_callback_info info) {
/**
* State for the `Commit` async work.
*/
typedef BaseAsyncState<std::shared_ptr<TransactionHandle>> TransactionCommitState;
struct TransactionCommitState final : BaseAsyncState<std::shared_ptr<TransactionHandle>> {
TransactionCommitState(napi_env env, std::shared_ptr<TransactionHandle> txnHandle)
: BaseAsyncState<std::shared_ptr<TransactionHandle>>(env, txnHandle), timestamp(0) {}

uint64_t timestamp;
};

/**
* Commits the transaction.
Expand Down Expand Up @@ -186,8 +192,14 @@ napi_value Transaction::Commit(napi_env env, napi_callback_info info) {
if (!state->handle || !state->handle->dbHandle || !state->handle->dbHandle->opened() || state->handle->dbHandle->isCancelled()) {
state->status = rocksdb::Status::Aborted("Database closed during transaction commit operation");
} else {
// set current timestamp before committing
auto now = std::chrono::system_clock::now();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if this should be part of this PR or a subsequent PR for transaction log, but:

  • The timestamp should be unique; we need to track the last commit timestamp used, and if it is identical to (or greater than) now, we need to increment it by the smallest increment possible in a double representation (which will require a mutex).
  • There should be an option to specify the commit timestamp, presumably from the TransactionOptions that are passed in. This is important for situations like replication where we are replicated the state of transaction that has already been committed.

auto timestamp = std::chrono::duration_cast<std::chrono::microseconds>(now.time_since_epoch()).count();
state->handle->txn->SetCommitTimestamp(timestamp);

state->status = state->handle->txn->Commit();
if (state->status.ok()) {
state->timestamp = timestamp;
DEBUG_LOG("Transaction::Commit emitted committed event\n")
state->handle->state = TransactionState::Committed;
state->handle->dbHandle->descriptor->notify(env, "committed", nullptr);
Expand All @@ -207,17 +219,20 @@ napi_value Transaction::Commit(napi_env env, napi_callback_info info) {
if (state->status.ok()) {
DEBUG_LOG("Transaction::Commit complete closing handle=%p\n", state->handle.get())

// BUG!
if (state->handle) {
state->handle->close();
} else {
DEBUG_LOG("Transaction::Commit complete, but handle is null!\n")
}

napi_value result;
double milliseconds = static_cast<double>(state->timestamp) / 1000.0;
NAPI_STATUS_THROWS_VOID(::napi_create_double(env, milliseconds, &result))

DEBUG_LOG("Transaction::Commit complete calling resolve\n")
napi_value resolve;
NAPI_STATUS_THROWS_VOID(::napi_get_reference_value(env, state->resolveRef, &resolve))
NAPI_STATUS_THROWS_VOID(::napi_call_function(env, global, resolve, 0, nullptr, nullptr))
NAPI_STATUS_THROWS_VOID(::napi_call_function(env, global, resolve, 1, &result, nullptr))
} else {
napi_value reject;
napi_value error;
Expand Down Expand Up @@ -257,6 +272,11 @@ napi_value Transaction::CommitSync(napi_env env, napi_callback_info info) {
}
(*txnHandle)->state = TransactionState::Committing;

// set current timestamp before committing
auto now = std::chrono::system_clock::now();
auto timestamp = std::chrono::duration_cast<std::chrono::microseconds>(now.time_since_epoch()).count();
(*txnHandle)->txn->SetCommitTimestamp(timestamp);

rocksdb::Status status = (*txnHandle)->txn->Commit();
if (status.ok()) {
DEBUG_LOG("Transaction::CommitSync emitted committed event\n")
Expand All @@ -265,13 +285,18 @@ napi_value Transaction::CommitSync(napi_env env, napi_callback_info info) {

DEBUG_LOG("Transaction::CommitSync closing txnHandle=%p\n", (*txnHandle).get())
(*txnHandle)->close();
} else {
napi_value error;
ROCKSDB_CREATE_ERROR_LIKE_VOID(error, status, "Transaction commit failed")
NAPI_STATUS_THROWS(::napi_throw(env, error))

// Return the timestamp as milliseconds (convert from microseconds)
napi_value result;
double milliseconds = static_cast<double>(timestamp) / 1000.0;
NAPI_STATUS_THROWS(::napi_create_double(env, milliseconds, &result))
return result;
}

NAPI_RETURN_UNDEFINED()
napi_value error;
ROCKSDB_CREATE_ERROR_LIKE_VOID(error, status, "Transaction commit failed")
NAPI_STATUS_THROWS(::napi_throw(env, error))
return nullptr;
}

/**
Expand Down
4 changes: 2 additions & 2 deletions src/load-binding.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ export type NativeTransaction = {
id: number;
new(context: NativeDatabase, options?: TransactionOptions): NativeTransaction;
abort(): void;
commit(resolve: () => void, reject: (err: Error) => void): void;
commitSync(): void;
commit(resolve: (timestamp: number) => void, reject: (err: Error) => void): void;
commitSync(): number;
get(key: Key, resolve: (value: Buffer) => void, reject: (err: Error) => void): number;
getCount(options?: RangeOptions): number;
getSync(key: Key): Buffer;
Expand Down
8 changes: 4 additions & 4 deletions src/transaction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@ export class Transaction extends DBI {
/**
* Commit the transaction.
*/
async commit(): Promise<void> {
async commit(): Promise<number> {
try {
await new Promise<void>((resolve, reject) => {
return await new Promise<number>((resolve, reject) => {
this.notify('beforecommit');
this.#txn.commit(resolve, reject);
});
Expand All @@ -45,10 +45,10 @@ export class Transaction extends DBI {
}
}

commitSync(): void {
commitSync(): number {
try {
this.notify('beforecommit');
this.#txn.commitSync();
return this.#txn.commitSync();
} finally {
this.notify('aftercommit', {
next: null,
Expand Down
88 changes: 86 additions & 2 deletions test/transactions.test.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
import { assert, describe, expect, it } from 'vitest';
import { rimraf } from 'rimraf';
import { RocksDatabase } from '../src/index.js';
import { RocksDatabase, Transaction } from '../src/index.js';
import { generateDBPath } from './lib/util.js';
import type { Transaction } from '../src/transaction.js';
import { withResolvers } from '../src/util.js';

const testOptions = [
Expand Down Expand Up @@ -476,6 +475,21 @@ for (const { name, options, txnOptions } of testOptions) {
await rimraf(dbPath);
}
});

it(`${name} async should return the callback's value`, async () => {
let db: RocksDatabase | null = null;
const dbPath = generateDBPath();

try {
db = RocksDatabase.open(dbPath, options);
await expect(db.transaction(async (_txn: Transaction) => {
return 'foo'
}, txnOptions)).resolves.toBe('foo');
} finally {
db?.close();
await rimraf(dbPath);
}
});
});

describe(`transactionSync() (${name})`, () => {
Expand Down Expand Up @@ -804,6 +818,36 @@ for (const { name, options, txnOptions } of testOptions) {
await rimraf(dbPath);
}
});

it(`${name} sync should return the callback's value`, async () => {
let db: RocksDatabase | null = null;
const dbPath = generateDBPath();

try {
db = RocksDatabase.open(dbPath, options);
expect(db.transactionSync((_txn: Transaction) => {
return 'foo'
}, txnOptions)).toBe('foo');
} finally {
db?.close();
await rimraf(dbPath);
}
});

it(`${name} sync should return the callback's promise`, async () => {
let db: RocksDatabase | null = null;
const dbPath = generateDBPath();

try {
db = RocksDatabase.open(dbPath, options);
await expect(db.transactionSync(async (_txn: Transaction) => {
return 'foo'
}, txnOptions)).resolves.toBe('foo');
} finally {
db?.close();
await rimraf(dbPath);
}
});
});

describe(`Error handling (${name})`, () => {
Expand Down Expand Up @@ -834,3 +878,43 @@ for (const { name, options, txnOptions } of testOptions) {
});
});
}

describe('Transaction', () => {
it('should return the commit timestamp sync', async () => {
let db: RocksDatabase | null = null;
const dbPath = generateDBPath();

try {
db = RocksDatabase.open(dbPath);
const start = Date.now();
const txn = new Transaction(db.store);
txn.putSync('foo', 'bar');
const ts = txn.commitSync();
expect(ts).toBeGreaterThan(start);
expect(ts).toBeLessThanOrEqual(Date.now() + 1);
expect(db.getSync('foo')).toBe('bar');
} finally {
db?.close();
await rimraf(dbPath);
}
});

it('should resolve the commit timestamp async', async () => {
let db: RocksDatabase | null = null;
const dbPath = generateDBPath();

try {
db = RocksDatabase.open(dbPath);
const start = Date.now();
const txn = new Transaction(db.store);
txn.putSync('foo', 'bar');
const ts = await txn.commit();
expect(ts).toBeGreaterThan(start);
expect(ts).toBeLessThanOrEqual(Date.now() + 1);
expect(db.getSync('foo')).toBe('bar');
} finally {
db?.close();
await rimraf(dbPath);
}
});
});
Loading